resolve conflict

This commit is contained in:
Felix Kunde 2022-05-17 18:31:31 +02:00
parent 268a86a045
commit 9d5270beae
3 changed files with 29 additions and 18 deletions

View File

@ -77,7 +77,7 @@ type Cluster struct {
pgUsers map[string]spec.PgUser
pgUsersCache map[string]spec.PgUser
systemUsers map[string]spec.PgUser
podSubscribers map[spec.NamespacedName]chan PodEvent
podSubscribers map[spec.NamespacedName]PodSubscriber
podSubscribersMu sync.RWMutex
pgDb *sql.DB
mu sync.Mutex
@ -98,6 +98,11 @@ type Cluster struct {
currentMajorVersion int
}
type PodSubscriber struct {
podEvents chan PodEvent
stopEvent chan struct{}
}
type compareStatefulsetResult struct {
match bool
replace bool
@ -127,7 +132,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
Postgresql: pgSpec,
pgUsers: make(map[string]spec.PgUser),
systemUsers: make(map[string]spec.PgUser),
podSubscribers: make(map[spec.NamespacedName]chan PodEvent),
podSubscribers: make(map[spec.NamespacedName]PodSubscriber),
kubeResources: kubeResources{
Secrets: make(map[types.UID]*v1.Secret),
Services: make(map[PostgresRole]*v1.Service),
@ -1037,10 +1042,10 @@ func (c *Cluster) processPodEvent(obj interface{}) error {
subscriber, ok := c.podSubscribers[spec.NamespacedName(event.PodName)]
if ok {
select {
case subscriber <- event:
case <-subscriber.stopEvent:
c.logger.Debugf("ignoring pod event %s for pod %q", event.EventType, event.PodName)
default:
// ending up here when there is no receiver on the channel (i.e. waitForPodLabel finished)
// avoids blocking channel: https://gobyexample.com/non-blocking-channel-operations
subscriber.podEvents <- event
}
}
// hold lock for the time of processing the event to avoid race condition

View File

@ -136,14 +136,14 @@ func (c *Cluster) deletePods() error {
func (c *Cluster) deletePod(podName spec.NamespacedName) error {
c.setProcessName("deleting pod %q", podName)
ch := c.registerPodSubscriber(podName)
subscriber := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName)
if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil {
return err
}
return c.waitForPodDeletion(ch)
return c.waitForPodDeletion(subscriber.podEvents)
}
func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) {
@ -151,27 +151,31 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) {
c.podSubscribersMu.Lock()
defer c.podSubscribersMu.Unlock()
ch, ok := c.podSubscribers[podName]
subscriber, ok := c.podSubscribers[podName]
if !ok {
panic("subscriber for pod '" + podName.String() + "' is not found")
}
delete(c.podSubscribers, podName)
close(ch)
close(subscriber.podEvents)
close(subscriber.stopEvent)
}
func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan PodEvent {
func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) PodSubscriber {
c.logger.Debugf("subscribing to pod %q", podName)
c.podSubscribersMu.Lock()
defer c.podSubscribersMu.Unlock()
ch := make(chan PodEvent)
var subscriber PodSubscriber
subscriber.podEvents = make(chan PodEvent)
subscriber.stopEvent = make(chan struct{})
if _, ok := c.podSubscribers[podName]; ok {
panic("pod '" + podName.String() + "' is already subscribed")
}
c.podSubscribers[podName] = ch
c.podSubscribers[podName] = subscriber
return ch
return subscriber
}
func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) {
@ -401,7 +405,7 @@ func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error)
func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
stopCh := make(chan struct{})
ch := c.registerPodSubscriber(podName)
subscriber := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName)
defer close(stopCh)
@ -420,10 +424,10 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
return nil, fmt.Errorf("could not delete pod: %v", err)
}
if err := c.waitForPodDeletion(ch); err != nil {
if err := c.waitForPodDeletion(subscriber.podEvents); err != nil {
return nil, err
}
pod, err := c.waitForPodLabel(ch, stopCh, nil)
pod, err := c.waitForPodLabel(subscriber, stopCh, nil)
if err != nil {
return nil, err
}

View File

@ -316,18 +316,20 @@ func (c *Cluster) annotationsSet(annotations map[string]string) map[string]strin
return nil
}
func (c *Cluster) waitForPodLabel(podEvents chan PodEvent, stopCh chan struct{}, role *PostgresRole) (*v1.Pod, error) {
func (c *Cluster) waitForPodLabel(subscriber PodSubscriber, stopCh chan struct{}, role *PostgresRole) (*v1.Pod, error) {
timeout := time.After(c.OpConfig.PodLabelWaitTimeout)
for {
select {
case podEvent := <-podEvents:
case podEvent := <-subscriber.podEvents:
podRole := PostgresRole(podEvent.CurPod.Labels[c.OpConfig.PodRoleLabel])
if role == nil {
if podRole == Master || podRole == Replica {
subscriber.stopEvent <- struct{}{}
return podEvent.CurPod, nil
}
} else if *role == podRole {
subscriber.stopEvent <- struct{}{}
return podEvent.CurPod, nil
}
case <-timeout: