wait for the pods from the previous rolling update
This commit is contained in:
		
							parent
							
								
									bbdc2f52a9
								
							
						
					
					
						commit
						b6e6308bdc
					
				|  | @ -94,6 +94,24 @@ func (c *Cluster) deletePod(pod *v1.Pod) error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *Cluster) unregisterPodSubscriber(podName spec.PodName) { | ||||||
|  | 	if _, ok := c.podSubscribers[podName]; !ok { | ||||||
|  | 		panic("Subscriber for Pod '" + podName.String() + "' is not found") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	close(c.podSubscribers[podName]) | ||||||
|  | 	delete(c.podSubscribers, podName) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *Cluster) registerPodSubscriber(podName spec.PodName) chan spec.PodEvent { | ||||||
|  | 	ch := make(chan spec.PodEvent) | ||||||
|  | 	if _, ok := c.podSubscribers[podName]; ok { | ||||||
|  | 		panic("Pod '" + podName.String() + "' is already subscribed") | ||||||
|  | 	} | ||||||
|  | 	c.podSubscribers[podName] = ch | ||||||
|  | 	return ch | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *Cluster) recreatePod(pod v1.Pod, spiloRole string) error { | func (c *Cluster) recreatePod(pod v1.Pod, spiloRole string) error { | ||||||
| 	podName := spec.PodName{ | 	podName := spec.PodName{ | ||||||
| 		Namespace: pod.Namespace, | 		Namespace: pod.Namespace, | ||||||
|  | @ -105,15 +123,8 @@ func (c *Cluster) recreatePod(pod v1.Pod, spiloRole string) error { | ||||||
| 		OrphanDependents: &orphanDependents, | 		OrphanDependents: &orphanDependents, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	ch := make(chan spec.PodEvent) | 	ch := c.registerPodSubscriber(podName) | ||||||
| 	if _, ok := c.podSubscribers[podName]; ok { | 	defer c.unregisterPodSubscriber(podName) | ||||||
| 		panic("Pod '" + podName.String() + "' is already subscribed") |  | ||||||
| 	} |  | ||||||
| 	c.podSubscribers[podName] = ch |  | ||||||
| 	defer func() { |  | ||||||
| 		close(ch) |  | ||||||
| 		delete(c.podSubscribers, podName) |  | ||||||
| 	}() |  | ||||||
| 
 | 
 | ||||||
| 	if err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil { | 	if err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil { | ||||||
| 		return fmt.Errorf("Can't delete Pod: %s", err) | 		return fmt.Errorf("Can't delete Pod: %s", err) | ||||||
|  |  | ||||||
|  | @ -5,6 +5,7 @@ import ( | ||||||
| 
 | 
 | ||||||
| 	"k8s.io/client-go/pkg/api/v1" | 	"k8s.io/client-go/pkg/api/v1" | ||||||
| 
 | 
 | ||||||
|  | 	"github.bus.zalan.do/acid/postgres-operator/pkg/spec" | ||||||
| 	"github.bus.zalan.do/acid/postgres-operator/pkg/util" | 	"github.bus.zalan.do/acid/postgres-operator/pkg/util" | ||||||
| 	"github.bus.zalan.do/acid/postgres-operator/pkg/util/resources" | 	"github.bus.zalan.do/acid/postgres-operator/pkg/util/resources" | ||||||
| ) | ) | ||||||
|  | @ -136,17 +137,34 @@ func (c *Cluster) syncPods() error { | ||||||
| 		return fmt.Errorf("Can't get list of Pods: %s", err) | 		return fmt.Errorf("Can't get list of Pods: %s", err) | ||||||
| 	} | 	} | ||||||
| 	if int32(len(pods.Items)) != *curSs.Spec.Replicas { | 	if int32(len(pods.Items)) != *curSs.Spec.Replicas { | ||||||
|  | 		//TODO: wait for Pods being created by StatefulSet
 | ||||||
| 		return fmt.Errorf("Number of existing Pods does not match number of replicas of the StatefulSet") | 		return fmt.Errorf("Number of existing Pods does not match number of replicas of the StatefulSet") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	//First check if we have left overs from the previous rolling update
 | ||||||
|  | 	for _, pod := range pods.Items { | ||||||
|  | 		podRole := util.PodSpiloRole(&pod) | ||||||
|  | 		podName := spec.PodName{ | ||||||
|  | 			Namespace: pod.Namespace, | ||||||
|  | 			Name:      pod.Name, | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if podMatchesTemplate(&pod, curSs) && pod.Status.Phase == v1.PodPending { | ||||||
|  | 			c.logger.Infof("Waiting for left over Pod '%s'", podName) | ||||||
|  | 			ch := c.registerPodSubscriber(podName) | ||||||
|  | 			c.waitForPodLabel(ch, podRole) | ||||||
|  | 			c.unregisterPodSubscriber(podName) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	for _, pod := range pods.Items { | 	for _, pod := range pods.Items { | ||||||
| 		if podMatchesTemplate(&pod, curSs) { | 		if podMatchesTemplate(&pod, curSs) { | ||||||
| 			c.logger.Infof("Pod '%s' matches StatefulSet pod template", util.NameFromMeta(pod.ObjectMeta)) | 			c.logger.Infof("Pod '%s' matches StatefulSet pod template", util.NameFromMeta(pod.ObjectMeta)) | ||||||
| 			continue | 			continue | ||||||
|  | 		} else { | ||||||
|  | 			c.logger.Infof("Pod '%s' does not match StatefulSet pod template. Pod needs to be recreated", util.NameFromMeta(pod.ObjectMeta)) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		c.logger.Infof("Pod '%s' does not match StatefulSet pod template and needs to be deleted.", util.NameFromMeta(pod.ObjectMeta)) |  | ||||||
| 
 |  | ||||||
| 		if util.PodSpiloRole(&pod) == "master" { | 		if util.PodSpiloRole(&pod) == "master" { | ||||||
| 			//TODO: do manual failover first
 | 			//TODO: do manual failover first
 | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | @ -136,7 +136,7 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, spiloRole string | ||||||
| 		select { | 		select { | ||||||
| 		case podEvent := <-podEvents: | 		case podEvent := <-podEvents: | ||||||
| 			role := util.PodSpiloRole(podEvent.CurPod) | 			role := util.PodSpiloRole(podEvent.CurPod) | ||||||
| 			if role == spiloRole { | 			if role == spiloRole { // TODO: newly-created Pods are always replicas => check against empty string only
 | ||||||
| 				return nil | 				return nil | ||||||
| 			} | 			} | ||||||
| 		case <-time.After(constants.PodLabelWaitTimeout): | 		case <-time.After(constants.PodLabelWaitTimeout): | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue