From fcb74e9050df50b28c070c4b175c0d901bec7c0b Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 17 May 2022 17:22:55 +0200 Subject: [PATCH] reflect code review --- pkg/cluster/cluster.go | 7 +++++-- pkg/cluster/pod.go | 4 ++-- pkg/cluster/util.go | 4 ++-- pkg/controller/controller.go | 3 +-- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 813bec297..a51c9871e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1032,16 +1032,19 @@ func (c *Cluster) processPodEvent(obj interface{}) error { return fmt.Errorf("could not cast to PodEvent") } + // can only take lock when (un)registerPodSubscriber is finshed c.podSubscribersMu.RLock() subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)] if ok { select { case subscriber <- event: default: - // we end up here when there is no receiver of the channel - // avoiding a deadlock: https://gobyexample.com/non-blocking-channel-operations + // ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished) + // avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations } } + // hold lock for the time of processing the event to avoid race condition + // with unregisterPodSubscriber closing the channel (see #1876) c.podSubscribersMu.RUnlock() return nil diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 156d5dce6..1e24565d8 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -67,7 +67,7 @@ func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error { return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err) } - err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + err = retryutil.Retry(1*time.Second, 5*time.Second, func() (bool, error) { _, err2 := c.KubeClient.Pods(pod.Namespace).Patch( context.TODO(), @@ -405,7 +405,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { defer c.unregisterPodSubscriber(podName) defer close(stopCh) - err := retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.PodDeletionWaitTimeout, + err := retryutil.Retry(1*time.Second, 5*time.Second, func() (bool, error) { err2 := c.KubeClient.Pods(podName.Namespace).Delete( context.TODO(), diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 0f71d2d64..0bfda78bf 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -316,7 +316,7 @@ func (c *Cluster) annotationsSet(annotations map[string]string) map[string]strin return nil } -func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopChan chan struct{}, role *PostgresRole) (*v1.Pod, error) { +func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopCh chan struct{}, role *PostgresRole) (*v1.Pod, error) { timeout := time.After(c.OpConfig.PodLabelWaitTimeout) for { select { @@ -332,7 +332,7 @@ func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopChan chan struct{ } case <-timeout: return nil, fmt.Errorf("pod label wait timeout") - case <-stopChan: + case <-stopCh: return nil, fmt.Errorf("pod label wait cancelled") } } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index c2dfd0a5c..e46b9ee44 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -455,14 +455,13 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { go c.runPodInformer(stopCh, wg) go c.runPostgresqlInformer(stopCh, wg) go c.clusterResync(stopCh, wg) + go c.apiserver.Run(stopCh, wg) go c.kubeNodesInformer(stopCh, wg) if c.opConfig.EnablePostgresTeamCRD { go c.runPostgresTeamInformer(stopCh, wg) } - go c.apiserver.Run(stopCh, wg) - c.logger.Info("started working in background") }