From b6e6308bdcfaa2cfa86da7521bc18569a094510f Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 13 Mar 2017 17:03:29 +0100 Subject: [PATCH] wait for the pods from the previous rolling update --- pkg/cluster/pod.go | 29 ++++++++++++++++++++--------- pkg/cluster/sync.go | 22 ++++++++++++++++++++-- pkg/cluster/util.go | 2 +- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 701b699b5..99c4bb730 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -94,6 +94,24 @@ func (c *Cluster) deletePod(pod *v1.Pod) error { return nil } +func (c *Cluster) unregisterPodSubscriber(podName spec.PodName) { + if _, ok := c.podSubscribers[podName]; !ok { + panic("Subscriber for Pod '" + podName.String() + "' is not found") + } + + close(c.podSubscribers[podName]) + delete(c.podSubscribers, podName) +} + +func (c *Cluster) registerPodSubscriber(podName spec.PodName) chan spec.PodEvent { + ch := make(chan spec.PodEvent) + if _, ok := c.podSubscribers[podName]; ok { + panic("Pod '" + podName.String() + "' is already subscribed") + } + c.podSubscribers[podName] = ch + return ch +} + func (c *Cluster) recreatePod(pod v1.Pod, spiloRole string) error { podName := spec.PodName{ Namespace: pod.Namespace, @@ -105,15 +123,8 @@ func (c *Cluster) recreatePod(pod v1.Pod, spiloRole string) error { OrphanDependents: &orphanDependents, } - ch := make(chan spec.PodEvent) - if _, ok := c.podSubscribers[podName]; ok { - panic("Pod '" + podName.String() + "' is already subscribed") - } - c.podSubscribers[podName] = ch - defer func() { - close(ch) - delete(c.podSubscribers, podName) - }() + ch := c.registerPodSubscriber(podName) + defer c.unregisterPodSubscriber(podName) if err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil { return fmt.Errorf("Can't delete Pod: %s", err) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index daba17aef..bb6450612 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -5,6 +5,7 @@ import ( "k8s.io/client-go/pkg/api/v1" + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" "github.bus.zalan.do/acid/postgres-operator/pkg/util" "github.bus.zalan.do/acid/postgres-operator/pkg/util/resources" ) @@ -136,17 +137,34 @@ func (c *Cluster) syncPods() error { return fmt.Errorf("Can't get list of Pods: %s", err) } if int32(len(pods.Items)) != *curSs.Spec.Replicas { + //TODO: wait for Pods being created by StatefulSet return fmt.Errorf("Number of existing Pods does not match number of replicas of the StatefulSet") } + //First check if we have left overs from the previous rolling update + for _, pod := range pods.Items { + podRole := util.PodSpiloRole(&pod) + podName := spec.PodName{ + Namespace: pod.Namespace, + Name: pod.Name, + } + + if podMatchesTemplate(&pod, curSs) && pod.Status.Phase == v1.PodPending { + c.logger.Infof("Waiting for left over Pod '%s'", podName) + ch := c.registerPodSubscriber(podName) + c.waitForPodLabel(ch, podRole) + c.unregisterPodSubscriber(podName) + } + } + for _, pod := range pods.Items { if podMatchesTemplate(&pod, curSs) { c.logger.Infof("Pod '%s' matches StatefulSet pod template", util.NameFromMeta(pod.ObjectMeta)) continue + } else { + c.logger.Infof("Pod '%s' does not match StatefulSet pod template. Pod needs to be recreated", util.NameFromMeta(pod.ObjectMeta)) } - c.logger.Infof("Pod '%s' does not match StatefulSet pod template and needs to be deleted.", util.NameFromMeta(pod.ObjectMeta)) - if util.PodSpiloRole(&pod) == "master" { //TODO: do manual failover first } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index f5fb0d716..ee788e670 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -136,7 +136,7 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, spiloRole string select { case podEvent := <-podEvents: role := util.PodSpiloRole(podEvent.CurPod) - if role == spiloRole { + if role == spiloRole { // TODO: newly-created Pods are always replicas => check against empty string only return nil } case <-time.After(constants.PodLabelWaitTimeout):