pass only pods to recreatePods function
This commit is contained in:
		
							parent
							
								
									f2ce6b9a22
								
							
						
					
					
						commit
						1f1fcd8657
					
				| 
						 | 
					@ -49,7 +49,7 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// markRollingUpdateFlagForPod sets the indicator for the rolling update requirement
 | 
					// markRollingUpdateFlagForPod sets the indicator for the rolling update requirement
 | 
				
			||||||
// in the Pod annotation.
 | 
					// in the Pod annotation.
 | 
				
			||||||
func (c *Cluster) markRollingUpdateFlagForPod(pod v1.Pod, msg string) error {
 | 
					func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error {
 | 
				
			||||||
	c.logger.Debugf("mark rolling update annotation for %s: reason %s", pod.Name, msg)
 | 
						c.logger.Debugf("mark rolling update annotation for %s: reason %s", pod.Name, msg)
 | 
				
			||||||
	flag := make(map[string]string)
 | 
						flag := make(map[string]string)
 | 
				
			||||||
	flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true)
 | 
						flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true)
 | 
				
			||||||
| 
						 | 
					@ -80,18 +80,6 @@ func (c *Cluster) markRollingUpdateFlagForPod(pod v1.Pod, msg string) error {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// markRollingUpdateFlagForPods sets the indicator for the rolling update requirement
 | 
					 | 
				
			||||||
// on pods that do not have it yet
 | 
					 | 
				
			||||||
func (c *Cluster) markRollingUpdateFlagForPods(pods []v1.Pod, msg string) error {
 | 
					 | 
				
			||||||
	for _, pod := range pods {
 | 
					 | 
				
			||||||
		if err := c.markRollingUpdateFlagForPod(pod, msg); err != nil {
 | 
					 | 
				
			||||||
			return fmt.Errorf("marking rolling update flag failed for pod %q: %v", pod.Name, err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the passed
 | 
					// getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the passed
 | 
				
			||||||
func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) {
 | 
					func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) {
 | 
				
			||||||
	anno := pod.GetAnnotations()
 | 
						anno := pod.GetAnnotations()
 | 
				
			||||||
| 
						 | 
					@ -100,6 +88,7 @@ func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) {
 | 
				
			||||||
	stringFlag, exists := anno[rollingUpdatePodAnnotationKey]
 | 
						stringFlag, exists := anno[rollingUpdatePodAnnotationKey]
 | 
				
			||||||
	if exists {
 | 
						if exists {
 | 
				
			||||||
		var err error
 | 
							var err error
 | 
				
			||||||
 | 
							c.logger.Debugf("found rolling update flag on pod %q", pod.Name)
 | 
				
			||||||
		if flag, err = strconv.ParseBool(stringFlag); err != nil {
 | 
							if flag, err = strconv.ParseBool(stringFlag); err != nil {
 | 
				
			||||||
			c.logger.Warnf("error when parsing %q annotation for the pod %q: expected boolean value, got %q\n",
 | 
								c.logger.Warnf("error when parsing %q annotation for the pod %q: expected boolean value, got %q\n",
 | 
				
			||||||
				rollingUpdatePodAnnotationKey,
 | 
									rollingUpdatePodAnnotationKey,
 | 
				
			||||||
| 
						 | 
					@ -111,31 +100,6 @@ func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) {
 | 
				
			||||||
	return flag
 | 
						return flag
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// countPodsWithRollingUpdateFlag returns the value of the rollingUpdate flag from the passed pod
 | 
					 | 
				
			||||||
func (c *Cluster) countPodsWithRollingUpdateFlag() (int, int) {
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	pods, err := c.listPods()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		c.logger.Warnf("could not count pods with rolling update flag")
 | 
					 | 
				
			||||||
		return 0, 0
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	desiredPodCount := c.Spec.NumberOfInstances
 | 
					 | 
				
			||||||
	actualPodCount := len(pods)
 | 
					 | 
				
			||||||
	podsToRollCount := int(desiredPodCount) - actualPodCount
 | 
					 | 
				
			||||||
	if podsToRollCount < 0 {
 | 
					 | 
				
			||||||
		podsToRollCount = 0
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for _, pod := range pods {
 | 
					 | 
				
			||||||
		if c.getRollingUpdateFlagFromPod(&pod) {
 | 
					 | 
				
			||||||
			podsToRollCount++
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return podsToRollCount, actualPodCount
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (c *Cluster) deletePods() error {
 | 
					func (c *Cluster) deletePods() error {
 | 
				
			||||||
	c.logger.Debugln("deleting pods")
 | 
						c.logger.Debugln("deleting pods")
 | 
				
			||||||
	pods, err := c.listPods()
 | 
						pods, err := c.listPods()
 | 
				
			||||||
| 
						 | 
					@ -372,7 +336,18 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
 | 
				
			||||||
	defer c.unregisterPodSubscriber(podName)
 | 
						defer c.unregisterPodSubscriber(podName)
 | 
				
			||||||
	stopChan := make(chan struct{})
 | 
						stopChan := make(chan struct{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil {
 | 
						err := retryutil.Retry(1*time.Second, 5*time.Second,
 | 
				
			||||||
 | 
							func() (bool, error) {
 | 
				
			||||||
 | 
								err2 := c.KubeClient.Pods(podName.Namespace).Delete(
 | 
				
			||||||
 | 
									context.TODO(),
 | 
				
			||||||
 | 
									podName.Name,
 | 
				
			||||||
 | 
									c.deleteOptions)
 | 
				
			||||||
 | 
								if err2 != nil {
 | 
				
			||||||
 | 
									return false, err2
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
		return nil, fmt.Errorf("could not delete pod: %v", err)
 | 
							return nil, fmt.Errorf("could not delete pod: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -387,7 +362,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
 | 
				
			||||||
	return pod, nil
 | 
						return pod, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
 | 
					func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/*
 | 
						/*
 | 
				
			||||||
	 Operator should not re-create pods if there is at least one replica being bootstrapped
 | 
						 Operator should not re-create pods if there is at least one replica being bootstrapped
 | 
				
			||||||
| 
						 | 
					@ -397,11 +372,11 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
 | 
				
			||||||
	 after this check succeeds but before a pod is re-created
 | 
						 after this check succeeds but before a pod is re-created
 | 
				
			||||||
	*/
 | 
						*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, pod := range pods.Items {
 | 
						for _, pod := range pods {
 | 
				
			||||||
		c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP)
 | 
							c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, pod := range pods.Items {
 | 
						for _, pod := range pods {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		var state string
 | 
							var state string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -431,41 +406,29 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
 | 
				
			||||||
	return true
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Cluster) recreatePods() error {
 | 
					func (c *Cluster) recreatePods(pods []v1.Pod) error {
 | 
				
			||||||
	c.setProcessName("starting to recreate pods")
 | 
						c.setProcessName("starting to recreate pods")
 | 
				
			||||||
	ls := c.labelsSet(false)
 | 
						c.logger.Infof("there are %d pods in the cluster to recreate", len(pods))
 | 
				
			||||||
	namespace := c.Namespace
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	listOptions := metav1.ListOptions{
 | 
					 | 
				
			||||||
		LabelSelector: ls.String(),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("could not get the list of pods: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !c.isSafeToRecreatePods(pods) {
 | 
						if !c.isSafeToRecreatePods(pods) {
 | 
				
			||||||
		return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized")
 | 
							return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var (
 | 
						var (
 | 
				
			||||||
		masterPod, newMasterPod, newPod *v1.Pod
 | 
							masterPod, newMasterPod *v1.Pod
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	replicas := make([]spec.NamespacedName, 0)
 | 
						replicas := make([]spec.NamespacedName, 0)
 | 
				
			||||||
	for i, pod := range pods.Items {
 | 
						for _, pod := range pods {
 | 
				
			||||||
		// only recreate pod if rolling update flag is true
 | 
					 | 
				
			||||||
		if c.getRollingUpdateFlagFromPod(&pod) {
 | 
					 | 
				
			||||||
		role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
							role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if role == Master {
 | 
							if role == Master {
 | 
				
			||||||
				masterPod = &pods.Items[i]
 | 
								masterPod = &pod
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			podName := util.NameFromMeta(pods.Items[i].ObjectMeta)
 | 
							podName := util.NameFromMeta(pod.ObjectMeta)
 | 
				
			||||||
			if newPod, err = c.recreatePod(podName); err != nil {
 | 
							newPod, err := c.recreatePod(podName)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
			return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
 | 
								return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica {
 | 
							if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica {
 | 
				
			||||||
| 
						 | 
					@ -474,7 +437,6 @@ func (c *Cluster) recreatePods() error {
 | 
				
			||||||
			newMasterPod = newPod
 | 
								newMasterPod = newPod
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if masterPod != nil {
 | 
						if masterPod != nil {
 | 
				
			||||||
		// failover if we have not observed a master pod when re-creating former replicas
 | 
							// failover if we have not observed a master pod when re-creating former replicas
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -279,6 +279,7 @@ func (c *Cluster) mustUpdatePodsAfterLazyUpdate(desiredSset *appsv1.StatefulSet)
 | 
				
			||||||
func (c *Cluster) syncStatefulSet() error {
 | 
					func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
	var (
 | 
						var (
 | 
				
			||||||
		podsRollingUpdateRequired bool
 | 
							podsRollingUpdateRequired bool
 | 
				
			||||||
 | 
							podsToRecreate            []v1.Pod
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pods, err := c.listPods()
 | 
						pods, err := c.listPods()
 | 
				
			||||||
| 
						 | 
					@ -307,19 +308,25 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		podsRollingUpdateRequired = (len(pods) > 0)
 | 
							podsRollingUpdateRequired = (len(pods) > 0)
 | 
				
			||||||
		if podsRollingUpdateRequired {
 | 
							if podsRollingUpdateRequired {
 | 
				
			||||||
			if err = c.markRollingUpdateFlagForPods(pods, "pods from previous statefulset"); err != nil {
 | 
								for _, pod := range pods {
 | 
				
			||||||
				c.logger.Warnf("updating rolling update flag for existing pods failed: %v", err)
 | 
									if err = c.markRollingUpdateFlagForPod(&pod, "pod from previous statefulset"); err != nil {
 | 
				
			||||||
 | 
										c.logger.Warnf("updating rolling update flag for existing pod failed: %v", err)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									podsToRecreate = append(podsToRecreate, pod)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
 | 
							c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		// check if there are still pods with a rolling update flag
 | 
							// check if there are still pods with a rolling update flag
 | 
				
			||||||
		// default value for flag depends on a potentially cached StatefulSet
 | 
							for _, pod := range pods {
 | 
				
			||||||
		podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag()
 | 
								if c.getRollingUpdateFlagFromPod(&pod) {
 | 
				
			||||||
 | 
									podsToRecreate = append(podsToRecreate, pod)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if podsToRollCount > 0 {
 | 
							if len(podsToRecreate) > 0 {
 | 
				
			||||||
			c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount)
 | 
								c.logger.Debugf("%d / %d pod(s) still need to be rotated", len(podsToRecreate), len(pods))
 | 
				
			||||||
			podsRollingUpdateRequired = true
 | 
								podsRollingUpdateRequired = true
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -335,8 +342,11 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
		if !cmp.match {
 | 
							if !cmp.match {
 | 
				
			||||||
			if cmp.rollingUpdate && !podsRollingUpdateRequired {
 | 
								if cmp.rollingUpdate && !podsRollingUpdateRequired {
 | 
				
			||||||
				podsRollingUpdateRequired = cmp.rollingUpdate
 | 
									podsRollingUpdateRequired = cmp.rollingUpdate
 | 
				
			||||||
				if err = c.markRollingUpdateFlagForPods(pods, "pod changes"); err != nil {
 | 
									for _, pod := range pods {
 | 
				
			||||||
					return fmt.Errorf("updating rolling update flag for pods failed: %v", err)
 | 
										if err = c.markRollingUpdateFlagForPod(&pod, "pod changes"); err != nil {
 | 
				
			||||||
 | 
											return fmt.Errorf("updating rolling update flag for pod failed: %v", err)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										podsToRecreate = append(podsToRecreate, pod)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -368,9 +378,10 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				if stsImage != effectivePodImage {
 | 
									if stsImage != effectivePodImage {
 | 
				
			||||||
					podsRollingUpdateRequired = true
 | 
										podsRollingUpdateRequired = true
 | 
				
			||||||
					if err = c.markRollingUpdateFlagForPod(pod, "pod not yet restarted due to lazy update"); err != nil {
 | 
										if err = c.markRollingUpdateFlagForPod(&pod, "pod not yet restarted due to lazy update"); err != nil {
 | 
				
			||||||
						c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err)
 | 
											c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
 | 
										podsToRecreate = append(podsToRecreate, pod)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
| 
						 | 
					@ -388,7 +399,7 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
	if podsRollingUpdateRequired {
 | 
						if podsRollingUpdateRequired {
 | 
				
			||||||
		c.logger.Debugln("performing rolling update")
 | 
							c.logger.Debugln("performing rolling update")
 | 
				
			||||||
		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
 | 
							c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
 | 
				
			||||||
		if err := c.recreatePods(); err != nil {
 | 
							if err := c.recreatePods(podsToRecreate); err != nil {
 | 
				
			||||||
			return fmt.Errorf("could not recreate pods: %v", err)
 | 
								return fmt.Errorf("could not recreate pods: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
 | 
							c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue