From 9d5270beaefc26c3df1c2b4eb849aea239b5afa2 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 17 May 2022 18:31:31 +0200 Subject: [PATCH] resolve conflict --- pkg/cluster/cluster.go | 15 ++++++++++----- pkg/cluster/pod.go | 26 +++++++++++++++----------- pkg/cluster/util.go | 6 ++++-- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index a51c9871e..f80cf6942 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -77,7 +77,7 @@ type Cluster struct { pgUsers map[string]spec.PgUser pgUsersCache map[string]spec.PgUser systemUsers map[string]spec.PgUser - podSubscribers map[spec.NamespacedName]chan PodEvent + podSubscribers map[spec.NamespacedName]PodSubscriber podSubscribersMu sync.RWMutex pgDb *sql.DB mu sync.Mutex @@ -98,6 +98,11 @@ type Cluster struct { currentMajorVersion int } +type PodSubscriber struct { + podEvents chan PodEvent + stopEvent chan struct{} +} + type compareStatefulsetResult struct { match bool replace bool @@ -127,7 +132,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres Postgresql: pgSpec, pgUsers: make(map[string]spec.PgUser), systemUsers: make(map[string]spec.PgUser), - podSubscribers: make(map[spec.NamespacedName]chan PodEvent), + podSubscribers: make(map[spec.NamespacedName]PodSubscriber), kubeResources: kubeResources{ Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service), @@ -1037,10 +1042,10 @@ func (c *Cluster) processPodEvent(obj interface{}) error { subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)] if ok { select { - case subscriber <- event: + case <-subscriber.stopEvent: + c.logger.Debugf("ignoring pod event %s for pod %q", event.EventType, event.PodName) default: - // ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished) - // avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations + subscriber.podEvents <- event } } // hold lock for the time of processing the event to avoid race condition diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 1e24565d8..c4752cc85 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -136,14 +136,14 @@ func (c *Cluster) deletePods() error { func (c *Cluster) deletePod(podName spec.NamespacedName) error { c.setProcessName("deleting pod %q", podName) - ch := c.registerPodSubscriber(podName) + subscriber := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil { return err } - return c.waitForPodDeletion(ch) + return c.waitForPodDeletion(subscriber.podEvents) } func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { @@ -151,27 +151,31 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { c.podSubscribersMu.Lock() defer c.podSubscribersMu.Unlock() - ch, ok := c.podSubscribers[podName] + subscriber, ok := c.podSubscribers[podName] if !ok { panic("subscriber for pod '" + podName.String() + "' is not found") } delete(c.podSubscribers, podName) - close(ch) + close(subscriber.podEvents) + close(subscriber.stopEvent) } -func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan PodEvent { +func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) PodSubscriber { c.logger.Debugf("subscribing to pod %q", podName) c.podSubscribersMu.Lock() defer c.podSubscribersMu.Unlock() - ch := make(chan PodEvent) + var subscriber PodSubscriber + subscriber.podEvents = make(chan PodEvent) + subscriber.stopEvent = make(chan struct{}) + if _, ok := c.podSubscribers[podName]; ok { panic("pod '" + podName.String() + "' is already subscribed") } - c.podSubscribers[podName] = ch + c.podSubscribers[podName] = subscriber - return ch + return subscriber } func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) { @@ -401,7 +405,7 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { stopCh := make(chan struct{}) - ch := c.registerPodSubscriber(podName) + subscriber := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) defer close(stopCh) @@ -420,10 +424,10 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { return nil, fmt.Errorf("could not delete pod: %v", err) } - if err := c.waitForPodDeletion(ch); err != nil { + if err := c.waitForPodDeletion(subscriber.podEvents); err != nil { return nil, err } - pod, err := c.waitForPodLabel(ch, stopCh, nil) + pod, err := c.waitForPodLabel(subscriber, stopCh, nil) if err != nil { return nil, err } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 0bfda78bf..d609d5163 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -316,18 +316,20 @@ func (c *Cluster) annotationsSet(annotations map[string]string) map[string]strin return nil } -func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopCh chan struct{}, role *PostgresRole) (*v1.Pod, error) { +func (c *Cluster) waitForPodLabel(subscriber PodSubscriber, stopCh chan struct{}, role *PostgresRole) (*v1.Pod, error) { timeout := time.After(c.OpConfig.PodLabelWaitTimeout) for { select { - case podEvent := <-podEvents: + case podEvent := <-subscriber.podEvents: podRole := PostgresRole(podEvent.CurPod.Labels[c.OpConfig.PodRoleLabel]) if role == nil { if podRole == Master || podRole == Replica { + subscriber.stopEvent <- struct{}{} return podEvent.CurPod, nil } } else if *role == podRole { + subscriber.stopEvent <- struct{}{} return podEvent.CurPod, nil } case <-timeout: