removing inner goroutine in cluster.Switchover and resolve race between processPodEvent and unregisterPodSubscriber
This commit is contained in:
		
							parent
							
								
									a77d5df158
								
							
						
					
					
						commit
						3641911786
					
				|  | @ -1034,10 +1034,10 @@ func (c *Cluster) processPodEvent(obj interface{}) error { | |||
| 
 | ||||
| 	c.podSubscribersMu.RLock() | ||||
| 	subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)] | ||||
| 	c.podSubscribersMu.RUnlock() | ||||
| 	if ok { | ||||
| 		subscriber <- event | ||||
| 	} | ||||
| 	c.podSubscribersMu.RUnlock() | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
|  | @ -1501,34 +1501,16 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e | |||
| 	var err error | ||||
| 	c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) | ||||
| 	c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) | ||||
| 
 | ||||
| 	var wg sync.WaitGroup | ||||
| 
 | ||||
| 	podLabelErr := make(chan error) | ||||
| 	stopCh := make(chan struct{}) | ||||
| 
 | ||||
| 	wg.Add(1) | ||||
| 
 | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
| 		ch := c.registerPodSubscriber(candidate) | ||||
| 		defer c.unregisterPodSubscriber(candidate) | ||||
| 
 | ||||
| 		role := Master | ||||
| 
 | ||||
| 		select { | ||||
| 		case <-stopCh: | ||||
| 		case podLabelErr <- func() (err2 error) { | ||||
| 			_, err2 = c.waitForPodLabel(ch, stopCh, &role) | ||||
| 			return | ||||
| 		}(): | ||||
| 		} | ||||
| 	}() | ||||
| 	ch := c.registerPodSubscriber(candidate) | ||||
| 	defer c.unregisterPodSubscriber(candidate) | ||||
| 	defer close(stopCh) | ||||
| 
 | ||||
| 	if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { | ||||
| 		c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) | ||||
| 		c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) | ||||
| 		if err = <-podLabelErr; err != nil { | ||||
| 		_, err = c.waitForPodLabel(ch, stopCh, nil) | ||||
| 		if err != nil { | ||||
| 			err = fmt.Errorf("could not get master pod label: %v", err) | ||||
| 		} | ||||
| 	} else { | ||||
|  | @ -1536,14 +1518,6 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e | |||
| 		c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err) | ||||
| 	} | ||||
| 
 | ||||
| 	// signal the role label waiting goroutine to close the shop and go home
 | ||||
| 	close(stopCh) | ||||
| 	// wait until the goroutine terminates, since unregisterPodSubscriber
 | ||||
| 	// must be called before the outer return; otherwise we risk subscribing to the same pod twice.
 | ||||
| 	wg.Wait() | ||||
| 	// close the label waiting channel no sooner than the waiting goroutine terminates.
 | ||||
| 	close(podLabelErr) | ||||
| 
 | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -67,7 +67,7 @@ func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error { | |||
| 		return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	err = retryutil.Retry(c.OpConfig.PatroniAPICheckInterval, c.OpConfig.PatroniAPICheckTimeout, | ||||
| 	err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, | ||||
| 		func() (bool, error) { | ||||
| 			_, err2 := c.KubeClient.Pods(pod.Namespace).Patch( | ||||
| 				context.TODO(), | ||||
|  | @ -149,14 +149,14 @@ func (c *Cluster) deletePod(podName spec.NamespacedName) error { | |||
| func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { | ||||
| 	c.logger.Debugf("unsubscribing from pod %q events", podName) | ||||
| 	c.podSubscribersMu.Lock() | ||||
| 	defer c.podSubscribersMu.Unlock() | ||||
| 
 | ||||
| 	if _, ok := c.podSubscribers[podName]; !ok { | ||||
| 	ch, ok := c.podSubscribers[podName] | ||||
| 	if !ok { | ||||
| 		panic("subscriber for pod '" + podName.String() + "' is not found") | ||||
| 	} | ||||
| 
 | ||||
| 	close(c.podSubscribers[podName]) | ||||
| 	delete(c.podSubscribers, podName) | ||||
| 	c.podSubscribersMu.Unlock() | ||||
| 	close(ch) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan PodEvent { | ||||
|  | @ -399,11 +399,12 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { | ||||
| 	stopCh := make(chan struct{}) | ||||
| 	ch := c.registerPodSubscriber(podName) | ||||
| 	defer c.unregisterPodSubscriber(podName) | ||||
| 	stopChan := make(chan struct{}) | ||||
| 	defer close(stopCh) | ||||
| 
 | ||||
| 	err := retryutil.Retry(c.OpConfig.PatroniAPICheckInterval, c.OpConfig.PatroniAPICheckTimeout, | ||||
| 	err := retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.PodDeletionWaitTimeout, | ||||
| 		func() (bool, error) { | ||||
| 			err2 := c.KubeClient.Pods(podName.Namespace).Delete( | ||||
| 				context.TODO(), | ||||
|  | @ -421,7 +422,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { | |||
| 	if err := c.waitForPodDeletion(ch); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	pod, err := c.waitForPodLabel(ch, stopChan, nil) | ||||
| 	pod, err := c.waitForPodLabel(ch, stopCh, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | @ -446,7 +447,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp | |||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		podName := util.NameFromMeta(pod.ObjectMeta) | ||||
| 		podName := util.NameFromMeta(pods[i].ObjectMeta) | ||||
| 		newPod, err := c.recreatePod(podName) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) | ||||
|  |  | |||
|  | @ -451,17 +451,18 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | |||
| 		panic("could not acquire initial list of clusters") | ||||
| 	} | ||||
| 
 | ||||
| 	wg.Add(5) | ||||
| 	wg.Add(5 + util.Bool2Int(c.opConfig.EnablePostgresTeamCRD)) | ||||
| 	go c.runPodInformer(stopCh, wg) | ||||
| 	go c.runPostgresqlInformer(stopCh, wg) | ||||
| 	go c.clusterResync(stopCh, wg) | ||||
| 	go c.apiserver.Run(stopCh, wg) | ||||
| 	go c.kubeNodesInformer(stopCh, wg) | ||||
| 
 | ||||
| 	if c.opConfig.EnablePostgresTeamCRD { | ||||
| 		go c.runPostgresTeamInformer(stopCh, wg) | ||||
| 	} | ||||
| 
 | ||||
| 	go c.apiserver.Run(stopCh, wg) | ||||
| 
 | ||||
| 	c.logger.Info("started working in background") | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -324,12 +324,20 @@ func testNil(values ...*int32) bool { | |||
| 	return false | ||||
| } | ||||
| 
 | ||||
| // Convert int to IntOrString type
 | ||||
| // ToIntStr convert int to IntOrString type
 | ||||
| func ToIntStr(val int) *intstr.IntOrString { | ||||
| 	b := intstr.FromInt(val) | ||||
| 	return &b | ||||
| } | ||||
| 
 | ||||
| // Bool2Int converts bool to int
 | ||||
| func Bool2Int(flag bool) int { | ||||
| 	if flag { | ||||
| 		return 1 | ||||
| 	} | ||||
| 	return 0 | ||||
| } | ||||
| 
 | ||||
| // Get int from IntOrString and return max int if string
 | ||||
| func IntFromIntStr(intOrStr intstr.IntOrString) int { | ||||
| 	if intOrStr.Type == 1 { | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue