From 36419117861b5742dadf2d9c94c97e6d6a79560e Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 29 Apr 2022 21:24:33 +0200 Subject: [PATCH] removing inner goroutine in cluster.Switchover and resolve race between processPodEvent and unregisterPodSubscriber --- pkg/cluster/cluster.go | 38 ++++++------------------------------ pkg/cluster/pod.go | 19 +++++++++--------- pkg/controller/controller.go | 5 +++-- pkg/util/util.go | 10 +++++++++- 4 files changed, 28 insertions(+), 44 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 85f60b601..76963d060 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1034,10 +1034,10 @@ func (c *Cluster) processPodEvent(obj interface{}) error { c.podSubscribersMu.RLock() subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)] - c.podSubscribersMu.RUnlock() if ok { subscriber <- event } + c.podSubscribersMu.RUnlock() return nil } @@ -1501,34 +1501,16 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e var err error c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) - - var wg sync.WaitGroup - - podLabelErr := make(chan error) stopCh := make(chan struct{}) - - wg.Add(1) - - go func() { - defer wg.Done() - ch := c.registerPodSubscriber(candidate) - defer c.unregisterPodSubscriber(candidate) - - role := Master - - select { - case <-stopCh: - case podLabelErr <- func() (err2 error) { - _, err2 = c.waitForPodLabel(ch, stopCh, &role) - return - }(): - } - }() + ch := c.registerPodSubscriber(candidate) + defer c.unregisterPodSubscriber(candidate) + defer close(stopCh) if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) - if err = <-podLabelErr; err != nil { + _, err = c.waitForPodLabel(ch, stopCh, nil) + if err != nil { err = fmt.Errorf("could not get master pod label: %v", err) } } else { @@ -1536,14 +1518,6 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err) } - // signal the role label waiting goroutine to close the shop and go home - close(stopCh) - // wait until the goroutine terminates, since unregisterPodSubscriber - // must be called before the outer return; otherwise we risk subscribing to the same pod twice. - wg.Wait() - // close the label waiting channel no sooner than the waiting goroutine terminates. - close(podLabelErr) - return err } diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 26c4c332d..a7d4d68aa 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -67,7 +67,7 @@ func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error { return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err) } - err = retryutil.Retry(c.OpConfig.PatroniAPICheckInterval, c.OpConfig.PatroniAPICheckTimeout, + err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { _, err2 := c.KubeClient.Pods(pod.Namespace).Patch( context.TODO(), @@ -149,14 +149,14 @@ func (c *Cluster) deletePod(podName spec.NamespacedName) error { func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { c.logger.Debugf("unsubscribing from pod %q events", podName) c.podSubscribersMu.Lock() - defer c.podSubscribersMu.Unlock() - - if _, ok := c.podSubscribers[podName]; !ok { + ch, ok := c.podSubscribers[podName] + if !ok { panic("subscriber for pod '" + podName.String() + "' is not found") } - close(c.podSubscribers[podName]) delete(c.podSubscribers, podName) + c.podSubscribersMu.Unlock() + close(ch) } func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan PodEvent { @@ -399,11 +399,12 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) } func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { + stopCh := make(chan struct{}) ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) - stopChan := make(chan struct{}) + defer close(stopCh) - err := retryutil.Retry(c.OpConfig.PatroniAPICheckInterval, c.OpConfig.PatroniAPICheckTimeout, + err := retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.PodDeletionWaitTimeout, func() (bool, error) { err2 := c.KubeClient.Pods(podName.Namespace).Delete( context.TODO(), @@ -421,7 +422,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { if err := c.waitForPodDeletion(ch); err != nil { return nil, err } - pod, err := c.waitForPodLabel(ch, stopChan, nil) + pod, err := c.waitForPodLabel(ch, stopCh, nil) if err != nil { return nil, err } @@ -446,7 +447,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp continue } - podName := util.NameFromMeta(pod.ObjectMeta) + podName := util.NameFromMeta(pods[i].ObjectMeta) newPod, err := c.recreatePod(podName) if err != nil { return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index de0dec69f..c2dfd0a5c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -451,17 +451,18 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { panic("could not acquire initial list of clusters") } - wg.Add(5) + wg.Add(5 + util.Bool2Int(c.opConfig.EnablePostgresTeamCRD)) go c.runPodInformer(stopCh, wg) go c.runPostgresqlInformer(stopCh, wg) go c.clusterResync(stopCh, wg) - go c.apiserver.Run(stopCh, wg) go c.kubeNodesInformer(stopCh, wg) if c.opConfig.EnablePostgresTeamCRD { go c.runPostgresTeamInformer(stopCh, wg) } + go c.apiserver.Run(stopCh, wg) + c.logger.Info("started working in background") } diff --git a/pkg/util/util.go b/pkg/util/util.go index 688153b89..63b24ca33 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -324,12 +324,20 @@ func testNil(values ...*int32) bool { return false } -// Convert int to IntOrString type +// ToIntStr convert int to IntOrString type func ToIntStr(val int) *intstr.IntOrString { b := intstr.FromInt(val) return &b } +// Bool2Int converts bool to int +func Bool2Int(flag bool) int { + if flag { + return 1 + } + return 0 +} + // Get int from IntOrString and return max int if string func IntFromIntStr(intOrStr intstr.IntOrString) int { if intOrStr.Type == 1 {