resolve conflict

This commit is contained in:
Felix Kunde 2022-05-18 17:19:27 +02:00
commit 0d0e4081f3
5 changed files with 36 additions and 44 deletions

View File

@ -1032,12 +1032,20 @@ func (c *Cluster) processPodEvent(obj interface{}) error {
return fmt.Errorf("could not cast to PodEvent")
}
// can only take lock when (un)registerPodSubscriber is finshed
c.podSubscribersMu.RLock()
subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)]
c.podSubscribersMu.RUnlock()
if ok {
subscriber <- event
select {
case subscriber <- event:
default:
// ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished)
// avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations
}
}
// hold lock for the time of processing the event to avoid race condition
// with unregisterPodSubscriber closing the channel (see #1876)
c.podSubscribersMu.RUnlock()
return nil
}
@ -1501,34 +1509,16 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
var err error
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
var wg sync.WaitGroup
podLabelErr := make(chan error)
stopCh := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
ch := c.registerPodSubscriber(candidate)
defer c.unregisterPodSubscriber(candidate)
role := Master
select {
case <-stopCh:
case podLabelErr <- func() (err2 error) {
_, err2 = c.waitForPodLabel(ch, stopCh, &role)
return
}():
}
}()
ch := c.registerPodSubscriber(candidate)
defer c.unregisterPodSubscriber(candidate)
defer close(stopCh)
if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil {
c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
if err = <-podLabelErr; err != nil {
_, err = c.waitForPodLabel(ch, stopCh, nil)
if err != nil {
err = fmt.Errorf("could not get master pod label: %v", err)
}
} else {
@ -1536,14 +1526,6 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err)
}
// signal the role label waiting goroutine to close the shop and go home
close(stopCh)
// wait until the goroutine terminates, since unregisterPodSubscriber
// must be called before the outer return; otherwise we risk subscribing to the same pod twice.
wg.Wait()
// close the label waiting channel no sooner than the waiting goroutine terminates.
close(podLabelErr)
return err
}

View File

@ -67,7 +67,7 @@ func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error {
return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err)
}
err = retryutil.Retry(c.OpConfig.PatroniAPICheckInterval, c.OpConfig.PatroniAPICheckTimeout,
err = retryutil.Retry(1*time.Second, 5*time.Second,
func() (bool, error) {
_, err2 := c.KubeClient.Pods(pod.Namespace).Patch(
context.TODO(),
@ -151,12 +151,13 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) {
c.podSubscribersMu.Lock()
defer c.podSubscribersMu.Unlock()
if _, ok := c.podSubscribers[podName]; !ok {
ch, ok := c.podSubscribers[podName]
if !ok {
panic("subscriber for pod '" + podName.String() + "' is not found")
}
close(c.podSubscribers[podName])
delete(c.podSubscribers, podName)
close(ch)
}
func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan PodEvent {
@ -399,11 +400,12 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error)
}
func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
stopCh := make(chan struct{})
ch := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName)
stopChan := make(chan struct{})
defer close(stopCh)
err := retryutil.Retry(c.OpConfig.PatroniAPICheckInterval, c.OpConfig.PatroniAPICheckTimeout,
err := retryutil.Retry(1*time.Second, 5*time.Second,
func() (bool, error) {
err2 := c.KubeClient.Pods(podName.Namespace).Delete(
context.TODO(),
@ -421,7 +423,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
if err := c.waitForPodDeletion(ch); err != nil {
return nil, err
}
pod, err := c.waitForPodLabel(ch, stopChan, nil)
pod, err := c.waitForPodLabel(ch, stopCh, nil)
if err != nil {
return nil, err
}
@ -446,7 +448,7 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
continue
}
podName := util.NameFromMeta(pod.ObjectMeta)
podName := util.NameFromMeta(pods[i].ObjectMeta)
newPod, err := c.recreatePod(podName)
if err != nil {
return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)

View File

@ -316,7 +316,7 @@ func (c *Cluster) annotationsSet(annotations map[string]string) map[string]strin
return nil
}
func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopChan chan struct{}, role *PostgresRole) (*v1.Pod, error) {
func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopCh chan struct{}, role *PostgresRole) (*v1.Pod, error) {
timeout := time.After(c.OpConfig.PodLabelWaitTimeout)
for {
select {
@ -332,7 +332,7 @@ func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopChan chan struct{
}
case <-timeout:
return nil, fmt.Errorf("pod label wait timeout")
case <-stopChan:
case <-stopCh:
return nil, fmt.Errorf("pod label wait cancelled")
}
}

View File

@ -451,7 +451,7 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
panic("could not acquire initial list of clusters")
}
wg.Add(5)
wg.Add(5 + util.Bool2Int(c.opConfig.EnablePostgresTeamCRD))
go c.runPodInformer(stopCh, wg)
go c.runPostgresqlInformer(stopCh, wg)
go c.clusterResync(stopCh, wg)

View File

@ -323,12 +323,20 @@ func testNil(values ...*int32) bool {
return false
}
// Convert int to IntOrString type
// ToIntStr converts int to IntOrString type
func ToIntStr(val int) *intstr.IntOrString {
b := intstr.FromInt(val)
return &b
}
// Bool2Int converts bool to int
func Bool2Int(flag bool) int {
if flag {
return 1
}
return 0
}
// MaxInt32 : Return maximum of two integers provided via pointers. If one value
// is not defined, return the other one. If both are not defined, result is also
// undefined, caller needs to check for that.