rename patch method and fix reading flag on sync
This commit is contained in:
		
							parent
							
								
									965d4423f1
								
							
						
					
					
						commit
						f2ce6b9a22
					
				| 
						 | 
					@ -47,10 +47,10 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) {
 | 
				
			||||||
	return pods.Items, nil
 | 
						return pods.Items, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// enableRollingUpdateFlagForPod sets the indicator for the rolling update requirement
 | 
					// markRollingUpdateFlagForPod sets the indicator for the rolling update requirement
 | 
				
			||||||
// in the Pod annotation.
 | 
					// in the Pod annotation.
 | 
				
			||||||
func (c *Cluster) enableRollingUpdateFlagForPod(pod v1.Pod, msg string) error {
 | 
					func (c *Cluster) markRollingUpdateFlagForPod(pod v1.Pod, msg string) error {
 | 
				
			||||||
	c.logger.Debugf("enable rolling update annotation for %s: reason %s", pod.Name, msg)
 | 
						c.logger.Debugf("mark rolling update annotation for %s: reason %s", pod.Name, msg)
 | 
				
			||||||
	flag := make(map[string]string)
 | 
						flag := make(map[string]string)
 | 
				
			||||||
	flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true)
 | 
						flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -59,13 +59,20 @@ func (c *Cluster) enableRollingUpdateFlagForPod(pod v1.Pod, msg string) error {
 | 
				
			||||||
		return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err)
 | 
							return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	_, err = c.KubeClient.Pods(pod.Namespace).Patch(
 | 
						err = retryutil.Retry(1*time.Second, 5*time.Second,
 | 
				
			||||||
 | 
							func() (bool, error) {
 | 
				
			||||||
 | 
								_, err2 := c.KubeClient.Pods(pod.Namespace).Patch(
 | 
				
			||||||
				context.TODO(),
 | 
									context.TODO(),
 | 
				
			||||||
				pod.Name,
 | 
									pod.Name,
 | 
				
			||||||
				types.MergePatchType,
 | 
									types.MergePatchType,
 | 
				
			||||||
				[]byte(patchData),
 | 
									[]byte(patchData),
 | 
				
			||||||
				metav1.PatchOptions{},
 | 
									metav1.PatchOptions{},
 | 
				
			||||||
				"")
 | 
									"")
 | 
				
			||||||
 | 
								if err2 != nil {
 | 
				
			||||||
 | 
									return false, err2
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return fmt.Errorf("could not patch pod rolling update flag %q: %v", patchData, err)
 | 
							return fmt.Errorf("could not patch pod rolling update flag %q: %v", patchData, err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -73,12 +80,12 @@ func (c *Cluster) enableRollingUpdateFlagForPod(pod v1.Pod, msg string) error {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// enableRollingUpdateFlagForPods sets the indicator for the rolling update requirement
 | 
					// markRollingUpdateFlagForPods sets the indicator for the rolling update requirement
 | 
				
			||||||
// on pods that do not have it yet
 | 
					// on pods that do not have it yet
 | 
				
			||||||
func (c *Cluster) enableRollingUpdateFlagForPods(pods []v1.Pod, msg string) error {
 | 
					func (c *Cluster) markRollingUpdateFlagForPods(pods []v1.Pod, msg string) error {
 | 
				
			||||||
	for _, pod := range pods {
 | 
						for _, pod := range pods {
 | 
				
			||||||
		if err := c.enableRollingUpdateFlagForPod(pod, msg); err != nil {
 | 
							if err := c.markRollingUpdateFlagForPod(pod, msg); err != nil {
 | 
				
			||||||
			return fmt.Errorf("enabling rolling update flag failed for pod %q: %v", pod.Name, err)
 | 
								return fmt.Errorf("marking rolling update flag failed for pod %q: %v", pod.Name, err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -86,10 +93,9 @@ func (c *Cluster) enableRollingUpdateFlagForPods(pods []v1.Pod, msg string) erro
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the passed
 | 
					// getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the passed
 | 
				
			||||||
// reverting to the default value in case of errors
 | 
					func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) {
 | 
				
			||||||
func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod, defaultValue bool) (flag bool) {
 | 
					 | 
				
			||||||
	anno := pod.GetAnnotations()
 | 
						anno := pod.GetAnnotations()
 | 
				
			||||||
	flag = defaultValue
 | 
						flag = false
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stringFlag, exists := anno[rollingUpdatePodAnnotationKey]
 | 
						stringFlag, exists := anno[rollingUpdatePodAnnotationKey]
 | 
				
			||||||
	if exists {
 | 
						if exists {
 | 
				
			||||||
| 
						 | 
					@ -99,7 +105,6 @@ func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod, defaultValue bool) (f
 | 
				
			||||||
				rollingUpdatePodAnnotationKey,
 | 
									rollingUpdatePodAnnotationKey,
 | 
				
			||||||
				types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name},
 | 
									types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name},
 | 
				
			||||||
				stringFlag)
 | 
									stringFlag)
 | 
				
			||||||
			flag = defaultValue
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -107,8 +112,7 @@ func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod, defaultValue bool) (f
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// countPodsWithRollingUpdateFlag returns the value of the rollingUpdate flag from the passed pod
 | 
					// countPodsWithRollingUpdateFlag returns the value of the rollingUpdate flag from the passed pod
 | 
				
			||||||
// reverting to the default value in case of errors
 | 
					func (c *Cluster) countPodsWithRollingUpdateFlag() (int, int) {
 | 
				
			||||||
func (c *Cluster) countPodsWithRollingUpdateFlag(defaultValue bool) (int, int) {
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pods, err := c.listPods()
 | 
						pods, err := c.listPods()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
| 
						 | 
					@ -124,7 +128,7 @@ func (c *Cluster) countPodsWithRollingUpdateFlag(defaultValue bool) (int, int) {
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, pod := range pods {
 | 
						for _, pod := range pods {
 | 
				
			||||||
		if c.getRollingUpdateFlagFromPod(&pod, defaultValue) {
 | 
							if c.getRollingUpdateFlagFromPod(&pod) {
 | 
				
			||||||
			podsToRollCount++
 | 
								podsToRollCount++
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -452,7 +456,7 @@ func (c *Cluster) recreatePods() error {
 | 
				
			||||||
	replicas := make([]spec.NamespacedName, 0)
 | 
						replicas := make([]spec.NamespacedName, 0)
 | 
				
			||||||
	for i, pod := range pods.Items {
 | 
						for i, pod := range pods.Items {
 | 
				
			||||||
		// only recreate pod if rolling update flag is true
 | 
							// only recreate pod if rolling update flag is true
 | 
				
			||||||
		if c.getRollingUpdateFlagFromPod(&pod, false) {
 | 
							if c.getRollingUpdateFlagFromPod(&pod) {
 | 
				
			||||||
			role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
								role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if role == Master {
 | 
								if role == Master {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -147,32 +147,6 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// mergeRollingUpdateFlagUsingCache returns the value of the rollingUpdate flag from the passed
 | 
					 | 
				
			||||||
// statefulset, however, the value can be cleared if there is a cached flag in the cluster that
 | 
					 | 
				
			||||||
// is set to false (the discrepancy could be a result of a failed StatefulSet update)
 | 
					 | 
				
			||||||
func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *appsv1.StatefulSet) bool {
 | 
					 | 
				
			||||||
	var (
 | 
					 | 
				
			||||||
		cachedStatefulsetExists, podsRollingUpdateRequired bool
 | 
					 | 
				
			||||||
	)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if c.Statefulset != nil {
 | 
					 | 
				
			||||||
		// if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update
 | 
					 | 
				
			||||||
		// the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying
 | 
					 | 
				
			||||||
		// on the 'cached' in-memory flag.
 | 
					 | 
				
			||||||
		c.logger.Debugf("cached StatefulSet value exists")
 | 
					 | 
				
			||||||
		cachedStatefulsetExists = true
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag(cachedStatefulsetExists)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if podsToRollCount > 0 {
 | 
					 | 
				
			||||||
		c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount)
 | 
					 | 
				
			||||||
		podsRollingUpdateRequired = true
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return podsRollingUpdateRequired
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*appsv1.StatefulSet, error) {
 | 
					func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*appsv1.StatefulSet, error) {
 | 
				
			||||||
	c.logger.Debugf("patching statefulset annotations")
 | 
						c.logger.Debugf("patching statefulset annotations")
 | 
				
			||||||
	patchData, err := metaAnnotationsPatch(annotations)
 | 
						patchData, err := metaAnnotationsPatch(annotations)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -307,7 +307,7 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		podsRollingUpdateRequired = (len(pods) > 0)
 | 
							podsRollingUpdateRequired = (len(pods) > 0)
 | 
				
			||||||
		if podsRollingUpdateRequired {
 | 
							if podsRollingUpdateRequired {
 | 
				
			||||||
			if err = c.enableRollingUpdateFlagForPods(pods, "pods from previous statefulset"); err != nil {
 | 
								if err = c.markRollingUpdateFlagForPods(pods, "pods from previous statefulset"); err != nil {
 | 
				
			||||||
				c.logger.Warnf("updating rolling update flag for existing pods failed: %v", err)
 | 
									c.logger.Warnf("updating rolling update flag for existing pods failed: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
| 
						 | 
					@ -316,7 +316,7 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		// check if there are still pods with a rolling update flag
 | 
							// check if there are still pods with a rolling update flag
 | 
				
			||||||
		// default value for flag depends on a potentially cached StatefulSet
 | 
							// default value for flag depends on a potentially cached StatefulSet
 | 
				
			||||||
		podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag(c.Statefulset != nil)
 | 
							podsToRollCount, podCount := c.countPodsWithRollingUpdateFlag()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if podsToRollCount > 0 {
 | 
							if podsToRollCount > 0 {
 | 
				
			||||||
			c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount)
 | 
								c.logger.Debugf("%d / %d pods still need to be rotated", podsToRollCount, podCount)
 | 
				
			||||||
| 
						 | 
					@ -335,7 +335,7 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
		if !cmp.match {
 | 
							if !cmp.match {
 | 
				
			||||||
			if cmp.rollingUpdate && !podsRollingUpdateRequired {
 | 
								if cmp.rollingUpdate && !podsRollingUpdateRequired {
 | 
				
			||||||
				podsRollingUpdateRequired = cmp.rollingUpdate
 | 
									podsRollingUpdateRequired = cmp.rollingUpdate
 | 
				
			||||||
				if err = c.enableRollingUpdateFlagForPods(pods, "pod changes"); err != nil {
 | 
									if err = c.markRollingUpdateFlagForPods(pods, "pod changes"); err != nil {
 | 
				
			||||||
					return fmt.Errorf("updating rolling update flag for pods failed: %v", err)
 | 
										return fmt.Errorf("updating rolling update flag for pods failed: %v", err)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
| 
						 | 
					@ -368,7 +368,7 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				if stsImage != effectivePodImage {
 | 
									if stsImage != effectivePodImage {
 | 
				
			||||||
					podsRollingUpdateRequired = true
 | 
										podsRollingUpdateRequired = true
 | 
				
			||||||
					if err = c.enableRollingUpdateFlagForPod(pod, "pod not yet restarted due to lazy update"); err != nil {
 | 
										if err = c.markRollingUpdateFlagForPod(pod, "pod not yet restarted due to lazy update"); err != nil {
 | 
				
			||||||
						c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err)
 | 
											c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
| 
						 | 
					@ -391,7 +391,6 @@ func (c *Cluster) syncStatefulSet() error {
 | 
				
			||||||
		if err := c.recreatePods(); err != nil {
 | 
							if err := c.recreatePods(); err != nil {
 | 
				
			||||||
			return fmt.Errorf("could not recreate pods: %v", err)
 | 
								return fmt.Errorf("could not recreate pods: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		c.logger.Infof("pods have been recreated")
 | 
					 | 
				
			||||||
		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
 | 
							c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue