Merge branch 'master' of github.com:zalando/postgres-operator
This commit is contained in:
		
						commit
						7307925eca
					
				| 
						 | 
				
			
			@ -4,7 +4,7 @@ watch -c "
 | 
			
		|||
kubectl get postgresql --all-namespaces
 | 
			
		||||
echo
 | 
			
		||||
echo -n 'Rolling upgrade pending: '
 | 
			
		||||
kubectl get statefulset -o jsonpath='{.items..metadata.annotations.zalando-postgres-operator-rolling-update-required}'
 | 
			
		||||
kubectl get pods -o jsonpath='{.items[].metadata.annotations.zalando-postgres-operator-rolling-update-required}'
 | 
			
		||||
echo
 | 
			
		||||
echo
 | 
			
		||||
echo 'Pods'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -211,7 +211,7 @@ class K8s:
 | 
			
		|||
        self.wait_for_logical_backup_job(expected_num_of_jobs=1)
 | 
			
		||||
 | 
			
		||||
    def delete_operator_pod(self, step="Delete operator pod"):
 | 
			
		||||
             # patching the pod template in the deployment restarts the operator pod
 | 
			
		||||
        # patching the pod template in the deployment restarts the operator pod
 | 
			
		||||
        self.api.apps_v1.patch_namespaced_deployment("postgres-operator", "default", {"spec": {"template": {"metadata": {"annotations": {"step": "{}-{}".format(step, time.time())}}}}})
 | 
			
		||||
        self.wait_for_operator_pod_start()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -219,8 +219,8 @@ class K8s:
 | 
			
		|||
        self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
 | 
			
		||||
        self.delete_operator_pod(step=step)
 | 
			
		||||
 | 
			
		||||
    def patch_statefulset(self, data, name="acid-minimal-cluster", namespace="default"):
 | 
			
		||||
        self.api.apps_v1.patch_namespaced_stateful_set(name, namespace, data)
 | 
			
		||||
    def patch_pod(self, data, pod_name, namespace="default"):
 | 
			
		||||
        self.api.core_v1.patch_namespaced_pod(pod_name, namespace, data)
 | 
			
		||||
 | 
			
		||||
    def create_with_kubectl(self, path):
 | 
			
		||||
        return subprocess.run(
 | 
			
		||||
| 
						 | 
				
			
			@ -280,19 +280,21 @@ class K8s:
 | 
			
		|||
            return None
 | 
			
		||||
        return pod.items[0].spec.containers[0].image
 | 
			
		||||
 | 
			
		||||
    def get_cluster_leader_pod(self, pg_cluster_name, namespace='default'):
 | 
			
		||||
        labels = {
 | 
			
		||||
            'application': 'spilo',
 | 
			
		||||
            'cluster-name': pg_cluster_name,
 | 
			
		||||
            'spilo-role': 'master',
 | 
			
		||||
        }
 | 
			
		||||
    def get_cluster_pod(self, role, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
 | 
			
		||||
        labels = labels + ',spilo-role=' + role
 | 
			
		||||
 | 
			
		||||
        pods = self.api.core_v1.list_namespaced_pod(
 | 
			
		||||
                namespace, label_selector=to_selector(labels)).items
 | 
			
		||||
                namespace, label_selector=labels).items
 | 
			
		||||
 | 
			
		||||
        if pods:
 | 
			
		||||
            return pods[0]
 | 
			
		||||
 | 
			
		||||
    def get_cluster_leader_pod(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
 | 
			
		||||
        return self.get_cluster_pod('master', labels, namespace)
 | 
			
		||||
 | 
			
		||||
    def get_cluster_replica_pod(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
 | 
			
		||||
        return self.get_cluster_pod('replica', labels, namespace)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class K8sBase:
 | 
			
		||||
    '''
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -168,12 +168,25 @@ class EndToEndTestCase(unittest.TestCase):
 | 
			
		|||
                "additional_pod_capabilities": ','.join(capabilities),
 | 
			
		||||
            },
 | 
			
		||||
        }
 | 
			
		||||
        self.k8s.update_config(patch_capabilities)
 | 
			
		||||
        self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"},
 | 
			
		||||
                             "Operator does not get in sync")
 | 
			
		||||
 | 
			
		||||
        self.eventuallyEqual(lambda: self.k8s.count_pods_with_container_capabilities(capabilities, cluster_label),
 | 
			
		||||
                             2, "Container capabilities not updated")
 | 
			
		||||
        # get node and replica (expected target of new master)
 | 
			
		||||
        _, replica_nodes = self.k8s.get_pg_nodes(cluster_label)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            self.k8s.update_config(patch_capabilities)
 | 
			
		||||
            self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"},
 | 
			
		||||
                                "Operator does not get in sync")
 | 
			
		||||
 | 
			
		||||
            # changed security context of postrges container should trigger a rolling update
 | 
			
		||||
            self.k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label)
 | 
			
		||||
            self.k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
 | 
			
		||||
 | 
			
		||||
            self.eventuallyEqual(lambda: self.k8s.count_pods_with_container_capabilities(capabilities, cluster_label),
 | 
			
		||||
                                2, "Container capabilities not updated")
 | 
			
		||||
 | 
			
		||||
        except timeout_decorator.TimeoutError:
 | 
			
		||||
            print('Operator log: {}'.format(k8s.get_operator_log()))
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
 | 
			
		||||
    def test_additional_teams_and_members(self):
 | 
			
		||||
| 
						 | 
				
			
			@ -212,7 +225,7 @@ class EndToEndTestCase(unittest.TestCase):
 | 
			
		|||
        # make sure we let one sync pass and the new user being added
 | 
			
		||||
        time.sleep(15)
 | 
			
		||||
 | 
			
		||||
        leader = self.k8s.get_cluster_leader_pod('acid-minimal-cluster')
 | 
			
		||||
        leader = self.k8s.get_cluster_leader_pod()
 | 
			
		||||
        user_query = """
 | 
			
		||||
            SELECT usename
 | 
			
		||||
              FROM pg_catalog.pg_user
 | 
			
		||||
| 
						 | 
				
			
			@ -392,7 +405,7 @@ class EndToEndTestCase(unittest.TestCase):
 | 
			
		|||
        # credentials.
 | 
			
		||||
        db_list = []
 | 
			
		||||
 | 
			
		||||
        leader = k8s.get_cluster_leader_pod('acid-minimal-cluster')
 | 
			
		||||
        leader = k8s.get_cluster_leader_pod()
 | 
			
		||||
        schemas_query = """
 | 
			
		||||
            select schema_name
 | 
			
		||||
            from information_schema.schemata
 | 
			
		||||
| 
						 | 
				
			
			@ -611,7 +624,7 @@ class EndToEndTestCase(unittest.TestCase):
 | 
			
		|||
            k8s.update_config(unpatch_lazy_spilo_upgrade, step="patch lazy upgrade")
 | 
			
		||||
 | 
			
		||||
            # at this point operator will complete the normal rolling upgrade
 | 
			
		||||
            # so we additonally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works
 | 
			
		||||
            # so we additionally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works
 | 
			
		||||
            self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0),
 | 
			
		||||
                                 conf_image, "Rolling upgrade was not executed",
 | 
			
		||||
                                 50, 3)
 | 
			
		||||
| 
						 | 
				
			
			@ -750,12 +763,6 @@ class EndToEndTestCase(unittest.TestCase):
 | 
			
		|||
 | 
			
		||||
        self.eventuallyTrue(verify_pod_limits, "Pod limits where not adjusted")
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def setUp(cls):
 | 
			
		||||
        # cls.k8s.update_config({}, step="Setup")
 | 
			
		||||
        cls.k8s.patch_statefulset({"meta": {"annotations": {"zalando-postgres-operator-rolling-update-required": False}}})
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
 | 
			
		||||
    def test_multi_namespace_support(self):
 | 
			
		||||
        '''
 | 
			
		||||
| 
						 | 
				
			
			@ -784,6 +791,139 @@ class EndToEndTestCase(unittest.TestCase):
 | 
			
		|||
                "acid.zalan.do", "v1", self.test_namespace, "postgresqls", "acid-test-cluster")
 | 
			
		||||
            time.sleep(5)
 | 
			
		||||
 | 
			
		||||
    @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
 | 
			
		||||
    def test_rolling_update_flag(self):
 | 
			
		||||
        '''
 | 
			
		||||
            Add rolling update flag to only the master and see it failing over
 | 
			
		||||
        '''
 | 
			
		||||
        k8s = self.k8s
 | 
			
		||||
        cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
 | 
			
		||||
 | 
			
		||||
        # verify we are in good state from potential previous tests
 | 
			
		||||
        self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
 | 
			
		||||
 | 
			
		||||
        # get node and replica (expected target of new master)
 | 
			
		||||
        _, replica_nodes = k8s.get_pg_nodes(cluster_label)
 | 
			
		||||
 | 
			
		||||
        # rolling update annotation
 | 
			
		||||
        flag = {
 | 
			
		||||
            "metadata": {
 | 
			
		||||
                "annotations": {
 | 
			
		||||
                    "zalando-postgres-operator-rolling-update-required": "true",
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            podsList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label)
 | 
			
		||||
            for pod in podsList.items:
 | 
			
		||||
                # add flag only to the master to make it appear to the operator as a leftover from a rolling update
 | 
			
		||||
                if pod.metadata.labels.get('spilo-role') == 'master':
 | 
			
		||||
                    old_creation_timestamp = pod.metadata.creation_timestamp
 | 
			
		||||
                    k8s.patch_pod(flag, pod.metadata.name, pod.metadata.namespace)
 | 
			
		||||
                else:
 | 
			
		||||
                    # remember replica name to check if operator does a switchover
 | 
			
		||||
                    switchover_target = pod.metadata.name
 | 
			
		||||
 | 
			
		||||
            # do not wait until the next sync
 | 
			
		||||
            k8s.delete_operator_pod()
 | 
			
		||||
 | 
			
		||||
            # operator should now recreate the master pod and do a switchover before
 | 
			
		||||
            k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label)
 | 
			
		||||
 | 
			
		||||
            # check if the former replica is now the new master
 | 
			
		||||
            leader = k8s.get_cluster_leader_pod()
 | 
			
		||||
            self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover")
 | 
			
		||||
 | 
			
		||||
            # check that the old master has been recreated
 | 
			
		||||
            k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
 | 
			
		||||
            replica = k8s.get_cluster_replica_pod()
 | 
			
		||||
            self.assertTrue(replica.metadata.creation_timestamp > old_creation_timestamp, "Old master pod was not recreated")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        except timeout_decorator.TimeoutError:
 | 
			
		||||
            print('Operator log: {}'.format(k8s.get_operator_log()))
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
 | 
			
		||||
    def test_rolling_update_label_timeout(self):
 | 
			
		||||
        '''
 | 
			
		||||
            Simulate case when replica does not receive label in time and rolling update does not finish
 | 
			
		||||
        '''
 | 
			
		||||
        k8s = self.k8s
 | 
			
		||||
        cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
 | 
			
		||||
        flag = "zalando-postgres-operator-rolling-update-required"
 | 
			
		||||
 | 
			
		||||
        # verify we are in good state from potential previous tests
 | 
			
		||||
        self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
 | 
			
		||||
 | 
			
		||||
        # get node and replica (expected target of new master)
 | 
			
		||||
        _, replica_nodes = k8s.get_pg_nodes(cluster_label)
 | 
			
		||||
 | 
			
		||||
        # rolling update annotation
 | 
			
		||||
        rolling_update_patch = {
 | 
			
		||||
            "metadata": {
 | 
			
		||||
                "annotations": {
 | 
			
		||||
                    flag: "true",
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        # make pod_label_wait_timeout so short that rolling update fails on first try
 | 
			
		||||
        # temporarily lower resync interval to reduce waiting for further tests
 | 
			
		||||
        # pods should get healthy in the meantime
 | 
			
		||||
        patch_resync_config = {
 | 
			
		||||
            "data": {
 | 
			
		||||
                "pod_label_wait_timeout": "2s",
 | 
			
		||||
                "resync_period": "20s",
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # patch both pods for rolling update
 | 
			
		||||
            podList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label)
 | 
			
		||||
            for pod in podList.items:
 | 
			
		||||
                k8s.patch_pod(rolling_update_patch, pod.metadata.name, pod.metadata.namespace)
 | 
			
		||||
                if pod.metadata.labels.get('spilo-role') == 'replica':
 | 
			
		||||
                    switchover_target = pod.metadata.name
 | 
			
		||||
 | 
			
		||||
            # update config and restart operator
 | 
			
		||||
            k8s.update_config(patch_resync_config, "update resync interval and pod_label_wait_timeout")
 | 
			
		||||
 | 
			
		||||
            # operator should now recreate the replica pod first and do a switchover after
 | 
			
		||||
            k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
 | 
			
		||||
 | 
			
		||||
            # pod_label_wait_timeout should have been exceeded hence the rolling update is continued on next sync
 | 
			
		||||
            # check if the cluster state is "SyncFailed"
 | 
			
		||||
            self.eventuallyEqual(lambda: k8s.pg_get_status(), "SyncFailed", "Expected SYNC event to fail")
 | 
			
		||||
 | 
			
		||||
            # wait for next sync, replica should be running normally by now and be ready for switchover
 | 
			
		||||
            k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label)
 | 
			
		||||
 | 
			
		||||
            # check if the former replica is now the new master
 | 
			
		||||
            leader = k8s.get_cluster_leader_pod()
 | 
			
		||||
            self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover")
 | 
			
		||||
 | 
			
		||||
            # wait for the old master to get restarted
 | 
			
		||||
            k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
 | 
			
		||||
 | 
			
		||||
            # status should again be "SyncFailed" but turn into "Running" on the next sync
 | 
			
		||||
            time.sleep(10)
 | 
			
		||||
            self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Expected running cluster after two syncs")
 | 
			
		||||
 | 
			
		||||
            # revert config changes
 | 
			
		||||
            patch_resync_config = {
 | 
			
		||||
                "data": {
 | 
			
		||||
                    "pod_label_wait_timeout": "10m",
 | 
			
		||||
                    "resync_period": "30m",
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            k8s.update_config(patch_resync_config, "revert resync interval and pod_label_wait_timeout")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        except timeout_decorator.TimeoutError:
 | 
			
		||||
            print('Operator log: {}'.format(k8s.get_operator_log()))
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
 | 
			
		||||
    def test_zz_node_readiness_label(self):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1309,7 +1309,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
 | 
			
		|||
			err = fmt.Errorf("could not get master pod label: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		err = fmt.Errorf("could not switch over: %v", err)
 | 
			
		||||
		err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// signal the role label waiting goroutine to close the shop and go home
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,7 +6,6 @@ import (
 | 
			
		|||
	"fmt"
 | 
			
		||||
	"path"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"github.com/sirupsen/logrus"
 | 
			
		||||
| 
						 | 
				
			
			@ -1279,7 +1278,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	stsAnnotations := make(map[string]string)
 | 
			
		||||
	stsAnnotations[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(false)
 | 
			
		||||
	stsAnnotations = c.AnnotationsToPropagate(c.annotationsSet(nil))
 | 
			
		||||
 | 
			
		||||
	statefulSet := &appsv1.StatefulSet{
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -86,10 +86,12 @@ func (c *Cluster) majorVersionUpgrade() error {
 | 
			
		|||
			podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name}
 | 
			
		||||
			c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
 | 
			
		||||
			upgradeCommand := fmt.Sprintf("/usr/bin/python3 /scripts/inplace_upgrade.py %d 2>&1 | tee last_upgrade.log", numberOfPods)
 | 
			
		||||
			_, err := c.ExecCommand(podName, "/bin/su", "postgres", "-c", upgradeCommand)
 | 
			
		||||
			result, err := c.ExecCommand(podName, "/bin/su", "postgres", "-c", upgradeCommand)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			c.logger.Infof("upgrade action triggered and command completed: %s", result[:50])
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,11 +4,13 @@ import (
 | 
			
		|||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	appsv1 "k8s.io/api/apps/v1"
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
 | 
			
		||||
	"github.com/zalando/postgres-operator/pkg/spec"
 | 
			
		||||
	"github.com/zalando/postgres-operator/pkg/util"
 | 
			
		||||
| 
						 | 
				
			
			@ -46,6 +48,64 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) {
 | 
			
		|||
	return pods.Items, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// markRollingUpdateFlagForPod sets the indicator for the rolling update requirement
 | 
			
		||||
// in the Pod annotation.
 | 
			
		||||
func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error {
 | 
			
		||||
	// no need to patch pod if annotation is already there
 | 
			
		||||
	if c.getRollingUpdateFlagFromPod(pod) {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	c.logger.Debugf("mark rolling update annotation for %s: reason %s", pod.Name, msg)
 | 
			
		||||
	flag := make(map[string]string)
 | 
			
		||||
	flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true)
 | 
			
		||||
 | 
			
		||||
	patchData, err := metaAnnotationsPatch(flag)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = retryutil.Retry(1*time.Second, 5*time.Second,
 | 
			
		||||
		func() (bool, error) {
 | 
			
		||||
			_, err2 := c.KubeClient.Pods(pod.Namespace).Patch(
 | 
			
		||||
				context.TODO(),
 | 
			
		||||
				pod.Name,
 | 
			
		||||
				types.MergePatchType,
 | 
			
		||||
				[]byte(patchData),
 | 
			
		||||
				metav1.PatchOptions{},
 | 
			
		||||
				"")
 | 
			
		||||
			if err2 != nil {
 | 
			
		||||
				return false, err2
 | 
			
		||||
			}
 | 
			
		||||
			return true, nil
 | 
			
		||||
		})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("could not patch pod rolling update flag %q: %v", patchData, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the given pod
 | 
			
		||||
func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) {
 | 
			
		||||
	anno := pod.GetAnnotations()
 | 
			
		||||
	flag = false
 | 
			
		||||
 | 
			
		||||
	stringFlag, exists := anno[rollingUpdatePodAnnotationKey]
 | 
			
		||||
	if exists {
 | 
			
		||||
		var err error
 | 
			
		||||
		c.logger.Debugf("found rolling update flag on pod %q", pod.Name)
 | 
			
		||||
		if flag, err = strconv.ParseBool(stringFlag); err != nil {
 | 
			
		||||
			c.logger.Warnf("error when parsing %q annotation for the pod %q: expected boolean value, got %q\n",
 | 
			
		||||
				rollingUpdatePodAnnotationKey,
 | 
			
		||||
				types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name},
 | 
			
		||||
				stringFlag)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return flag
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) deletePods() error {
 | 
			
		||||
	c.logger.Debugln("deleting pods")
 | 
			
		||||
	pods, err := c.listPods()
 | 
			
		||||
| 
						 | 
				
			
			@ -282,7 +342,18 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
 | 
			
		|||
	defer c.unregisterPodSubscriber(podName)
 | 
			
		||||
	stopChan := make(chan struct{})
 | 
			
		||||
 | 
			
		||||
	if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil {
 | 
			
		||||
	err := retryutil.Retry(1*time.Second, 5*time.Second,
 | 
			
		||||
		func() (bool, error) {
 | 
			
		||||
			err2 := c.KubeClient.Pods(podName.Namespace).Delete(
 | 
			
		||||
				context.TODO(),
 | 
			
		||||
				podName.Name,
 | 
			
		||||
				c.deleteOptions)
 | 
			
		||||
			if err2 != nil {
 | 
			
		||||
				return false, err2
 | 
			
		||||
			}
 | 
			
		||||
			return true, nil
 | 
			
		||||
		})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("could not delete pod: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -297,7 +368,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
 | 
			
		|||
	return pod, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
 | 
			
		||||
func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool {
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
	 Operator should not re-create pods if there is at least one replica being bootstrapped
 | 
			
		||||
| 
						 | 
				
			
			@ -306,20 +377,17 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
 | 
			
		|||
	 XXX operator cannot forbid replica re-init, so we might still fail if re-init is started
 | 
			
		||||
	 after this check succeeds but before a pod is re-created
 | 
			
		||||
	*/
 | 
			
		||||
 | 
			
		||||
	for _, pod := range pods.Items {
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
		c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, pod := range pods.Items {
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
 | 
			
		||||
		var data patroni.MemberData
 | 
			
		||||
 | 
			
		||||
		err := retryutil.Retry(1*time.Second, 5*time.Second,
 | 
			
		||||
			func() (bool, error) {
 | 
			
		||||
 | 
			
		||||
				var err error
 | 
			
		||||
 | 
			
		||||
				data, err = c.patroni.GetMemberData(&pod)
 | 
			
		||||
 | 
			
		||||
				if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -336,47 +404,39 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
 | 
			
		|||
			c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) recreatePods() error {
 | 
			
		||||
func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error {
 | 
			
		||||
	c.setProcessName("starting to recreate pods")
 | 
			
		||||
	ls := c.labelsSet(false)
 | 
			
		||||
	namespace := c.Namespace
 | 
			
		||||
 | 
			
		||||
	listOptions := metav1.ListOptions{
 | 
			
		||||
		LabelSelector: ls.String(),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("could not get the list of pods: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))
 | 
			
		||||
	c.logger.Infof("there are %d pods in the cluster to recreate", len(pods))
 | 
			
		||||
 | 
			
		||||
	if !c.isSafeToRecreatePods(pods) {
 | 
			
		||||
		return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var (
 | 
			
		||||
		masterPod, newMasterPod, newPod *v1.Pod
 | 
			
		||||
		masterPod, newMasterPod *v1.Pod
 | 
			
		||||
	)
 | 
			
		||||
	replicas := make([]spec.NamespacedName, 0)
 | 
			
		||||
	for i, pod := range pods.Items {
 | 
			
		||||
	replicas := switchoverCandidates
 | 
			
		||||
 | 
			
		||||
	for i, pod := range pods {
 | 
			
		||||
		role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
			
		||||
 | 
			
		||||
		if role == Master {
 | 
			
		||||
			masterPod = &pods.Items[i]
 | 
			
		||||
			masterPod = &pods[i]
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		podName := util.NameFromMeta(pods.Items[i].ObjectMeta)
 | 
			
		||||
		if newPod, err = c.recreatePod(podName); err != nil {
 | 
			
		||||
		podName := util.NameFromMeta(pod.ObjectMeta)
 | 
			
		||||
		newPod, err := c.recreatePod(podName)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
 | 
			
		||||
		}
 | 
			
		||||
		if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica {
 | 
			
		||||
 | 
			
		||||
		newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel])
 | 
			
		||||
		if newRole == Replica {
 | 
			
		||||
			replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta))
 | 
			
		||||
		} else if newRole == Master {
 | 
			
		||||
			newMasterPod = newPod
 | 
			
		||||
| 
						 | 
				
			
			@ -384,7 +444,9 @@ func (c *Cluster) recreatePods() error {
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	if masterPod != nil {
 | 
			
		||||
		// failover if we have not observed a master pod when re-creating former replicas.
 | 
			
		||||
		// switchover if
 | 
			
		||||
		// 1. we have not observed a new master pod when re-creating former replicas
 | 
			
		||||
		// 2. we know possible switchover targets even when no replicas were recreated
 | 
			
		||||
		if newMasterPod == nil && len(replicas) > 0 {
 | 
			
		||||
			if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil {
 | 
			
		||||
				c.logger.Warningf("could not perform switch over: %v", err)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,7 +19,7 @@ import (
 | 
			
		|||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	rollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update-required"
 | 
			
		||||
	rollingUpdatePodAnnotationKey = "zalando-postgres-operator-rolling-update-required"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) listResources() error {
 | 
			
		||||
| 
						 | 
				
			
			@ -147,79 +147,6 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
 | 
			
		|||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// setRollingUpdateFlagForStatefulSet sets the indicator or the rolling update requirement
 | 
			
		||||
// in the StatefulSet annotation.
 | 
			
		||||
func (c *Cluster) setRollingUpdateFlagForStatefulSet(sset *appsv1.StatefulSet, val bool, msg string) {
 | 
			
		||||
	anno := sset.GetAnnotations()
 | 
			
		||||
	if anno == nil {
 | 
			
		||||
		anno = make(map[string]string)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	anno[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val)
 | 
			
		||||
	sset.SetAnnotations(anno)
 | 
			
		||||
	c.logger.Debugf("set statefulset's rolling update annotation to %t: caller/reason %s", val, msg)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// applyRollingUpdateFlagforStatefulSet sets the rolling update flag for the cluster's StatefulSet
 | 
			
		||||
// and applies that setting to the actual running cluster.
 | 
			
		||||
func (c *Cluster) applyRollingUpdateFlagforStatefulSet(val bool) error {
 | 
			
		||||
	c.setRollingUpdateFlagForStatefulSet(c.Statefulset, val, "applyRollingUpdateFlag")
 | 
			
		||||
	sset, err := c.updateStatefulSetAnnotations(c.Statefulset.GetAnnotations())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	c.Statefulset = sset
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getRollingUpdateFlagFromStatefulSet returns the value of the rollingUpdate flag from the passed
 | 
			
		||||
// StatefulSet, reverting to the default value in case of errors
 | 
			
		||||
func (c *Cluster) getRollingUpdateFlagFromStatefulSet(sset *appsv1.StatefulSet, defaultValue bool) (flag bool) {
 | 
			
		||||
	anno := sset.GetAnnotations()
 | 
			
		||||
	flag = defaultValue
 | 
			
		||||
 | 
			
		||||
	stringFlag, exists := anno[rollingUpdateStatefulsetAnnotationKey]
 | 
			
		||||
	if exists {
 | 
			
		||||
		var err error
 | 
			
		||||
		if flag, err = strconv.ParseBool(stringFlag); err != nil {
 | 
			
		||||
			c.logger.Warnf("error when parsing %q annotation for the statefulset %q: expected boolean value, got %q\n",
 | 
			
		||||
				rollingUpdateStatefulsetAnnotationKey,
 | 
			
		||||
				types.NamespacedName{Namespace: sset.Namespace, Name: sset.Name},
 | 
			
		||||
				stringFlag)
 | 
			
		||||
			flag = defaultValue
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return flag
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// mergeRollingUpdateFlagUsingCache returns the value of the rollingUpdate flag from the passed
 | 
			
		||||
// statefulset, however, the value can be cleared if there is a cached flag in the cluster that
 | 
			
		||||
// is set to false (the discrepancy could be a result of a failed StatefulSet update)
 | 
			
		||||
func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *appsv1.StatefulSet) bool {
 | 
			
		||||
	var (
 | 
			
		||||
		cachedStatefulsetExists, clearRollingUpdateFromCache, podsRollingUpdateRequired bool
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	if c.Statefulset != nil {
 | 
			
		||||
		// if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update
 | 
			
		||||
		// the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying
 | 
			
		||||
		// on the 'cached' in-memory flag.
 | 
			
		||||
		cachedStatefulsetExists = true
 | 
			
		||||
		clearRollingUpdateFromCache = !c.getRollingUpdateFlagFromStatefulSet(c.Statefulset, true)
 | 
			
		||||
		c.logger.Debugf("cached StatefulSet value exists, rollingUpdate flag is %t", clearRollingUpdateFromCache)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if podsRollingUpdateRequired = c.getRollingUpdateFlagFromStatefulSet(runningStatefulSet, false); podsRollingUpdateRequired {
 | 
			
		||||
		if cachedStatefulsetExists && clearRollingUpdateFromCache {
 | 
			
		||||
			c.logger.Infof("clearing the rolling update flag based on the cached information")
 | 
			
		||||
			podsRollingUpdateRequired = false
 | 
			
		||||
		} else {
 | 
			
		||||
			c.logger.Infof("found a statefulset with an unfinished rolling update of the pods")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return podsRollingUpdateRequired
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*appsv1.StatefulSet, error) {
 | 
			
		||||
	c.logger.Debugf("patching statefulset annotations")
 | 
			
		||||
	patchData, err := metaAnnotationsPatch(annotations)
 | 
			
		||||
| 
						 | 
				
			
			@ -237,8 +164,8 @@ func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*
 | 
			
		|||
		return nil, fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err)
 | 
			
		||||
	}
 | 
			
		||||
	return result, nil
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) updateStatefulSet(newStatefulSet *appsv1.StatefulSet) error {
 | 
			
		||||
	c.setProcessName("updating statefulset")
 | 
			
		||||
	if c.Statefulset == nil {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -283,22 +283,24 @@ func (c *Cluster) mustUpdatePodsAfterLazyUpdate(desiredSset *appsv1.StatefulSet)
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cluster) syncStatefulSet() error {
 | 
			
		||||
	var (
 | 
			
		||||
		podsRollingUpdateRequired bool
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	podsToRecreate := make([]v1.Pod, 0)
 | 
			
		||||
	switchoverCandidates := make([]spec.NamespacedName, 0)
 | 
			
		||||
 | 
			
		||||
	pods, err := c.listPods()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		c.logger.Infof("could not list pods of the statefulset: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early.
 | 
			
		||||
	sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if !k8sutil.ResourceNotFound(err) {
 | 
			
		||||
			return fmt.Errorf("could not get statefulset: %v", err)
 | 
			
		||||
			return fmt.Errorf("error during reading of statefulset: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		// statefulset does not exist, try to re-create it
 | 
			
		||||
		c.Statefulset = nil
 | 
			
		||||
		c.logger.Infof("could not find the cluster's statefulset")
 | 
			
		||||
		pods, err := c.listPods()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("could not list pods of the statefulset: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		c.logger.Infof("cluster's statefulset does not exist")
 | 
			
		||||
 | 
			
		||||
		sset, err = c.createStatefulSet()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -309,41 +311,63 @@ func (c *Cluster) syncStatefulSet() error {
 | 
			
		|||
			return fmt.Errorf("cluster is not ready: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		podsRollingUpdateRequired = (len(pods) > 0)
 | 
			
		||||
		if podsRollingUpdateRequired {
 | 
			
		||||
			c.logger.Warningf("found pods from the previous statefulset: trigger rolling update")
 | 
			
		||||
			if err := c.applyRollingUpdateFlagforStatefulSet(podsRollingUpdateRequired); err != nil {
 | 
			
		||||
				return fmt.Errorf("could not set rolling update flag for the statefulset: %v", err)
 | 
			
		||||
		if len(pods) > 0 {
 | 
			
		||||
			for _, pod := range pods {
 | 
			
		||||
				if err = c.markRollingUpdateFlagForPod(&pod, "pod from previous statefulset"); err != nil {
 | 
			
		||||
					c.logger.Warnf("marking old pod for rolling update failed: %v", err)
 | 
			
		||||
				}
 | 
			
		||||
				podsToRecreate = append(podsToRecreate, pod)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
 | 
			
		||||
 | 
			
		||||
	} else {
 | 
			
		||||
		podsRollingUpdateRequired = c.mergeRollingUpdateFlagUsingCache(sset)
 | 
			
		||||
		// check if there are still pods with a rolling update flag
 | 
			
		||||
		for _, pod := range pods {
 | 
			
		||||
			if c.getRollingUpdateFlagFromPod(&pod) {
 | 
			
		||||
				podsToRecreate = append(podsToRecreate, pod)
 | 
			
		||||
			} else {
 | 
			
		||||
				role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
			
		||||
				if role == Master {
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				switchoverCandidates = append(switchoverCandidates, util.NameFromMeta(pod.ObjectMeta))
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if len(podsToRecreate) > 0 {
 | 
			
		||||
			c.logger.Debugf("%d / %d pod(s) still need to be rotated", len(podsToRecreate), len(pods))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// statefulset is already there, make sure we use its definition in order to compare with the spec.
 | 
			
		||||
		c.Statefulset = sset
 | 
			
		||||
 | 
			
		||||
		desiredSS, err := c.generateStatefulSet(&c.Spec)
 | 
			
		||||
		desiredSts, err := c.generateStatefulSet(&c.Spec)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("could not generate statefulset: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "from cache")
 | 
			
		||||
 | 
			
		||||
		cmp := c.compareStatefulSetWith(desiredSS)
 | 
			
		||||
		cmp := c.compareStatefulSetWith(desiredSts)
 | 
			
		||||
		if !cmp.match {
 | 
			
		||||
			if cmp.rollingUpdate && !podsRollingUpdateRequired {
 | 
			
		||||
				podsRollingUpdateRequired = true
 | 
			
		||||
				c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "statefulset changes")
 | 
			
		||||
			if cmp.rollingUpdate {
 | 
			
		||||
				podsToRecreate = make([]v1.Pod, 0)
 | 
			
		||||
				switchoverCandidates = make([]spec.NamespacedName, 0)
 | 
			
		||||
				for _, pod := range pods {
 | 
			
		||||
					if err = c.markRollingUpdateFlagForPod(&pod, "pod changes"); err != nil {
 | 
			
		||||
						return fmt.Errorf("updating rolling update flag for pod failed: %v", err)
 | 
			
		||||
					}
 | 
			
		||||
					podsToRecreate = append(podsToRecreate, pod)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons)
 | 
			
		||||
			c.logStatefulSetChanges(c.Statefulset, desiredSts, false, cmp.reasons)
 | 
			
		||||
 | 
			
		||||
			if !cmp.replace {
 | 
			
		||||
				if err := c.updateStatefulSet(desiredSS); err != nil {
 | 
			
		||||
				if err := c.updateStatefulSet(desiredSts); err != nil {
 | 
			
		||||
					return fmt.Errorf("could not update statefulset: %v", err)
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				if err := c.replaceStatefulSet(desiredSS); err != nil {
 | 
			
		||||
				if err := c.replaceStatefulSet(desiredSts); err != nil {
 | 
			
		||||
					return fmt.Errorf("could not replace statefulset: %v", err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
| 
						 | 
				
			
			@ -351,18 +375,30 @@ func (c *Cluster) syncStatefulSet() error {
 | 
			
		|||
 | 
			
		||||
		c.updateStatefulSetAnnotations(c.AnnotationsToPropagate(c.annotationsSet(c.Statefulset.Annotations)))
 | 
			
		||||
 | 
			
		||||
		if !podsRollingUpdateRequired && !c.OpConfig.EnableLazySpiloUpgrade {
 | 
			
		||||
			// even if desired and actual statefulsets match
 | 
			
		||||
		if len(podsToRecreate) == 0 && !c.OpConfig.EnableLazySpiloUpgrade {
 | 
			
		||||
			// even if the desired and the running statefulsets match
 | 
			
		||||
			// there still may be not up-to-date pods on condition
 | 
			
		||||
			//  (a) the lazy update was just disabled
 | 
			
		||||
			// and
 | 
			
		||||
			//  (b) some of the pods were not restarted when the lazy update was still in place
 | 
			
		||||
			podsRollingUpdateRequired, err = c.mustUpdatePodsAfterLazyUpdate(desiredSS)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return fmt.Errorf("could not list pods of the statefulset: %v", err)
 | 
			
		||||
			for _, pod := range pods {
 | 
			
		||||
				effectivePodImage := pod.Spec.Containers[0].Image
 | 
			
		||||
				stsImage := desiredSts.Spec.Template.Spec.Containers[0].Image
 | 
			
		||||
 | 
			
		||||
				if stsImage != effectivePodImage {
 | 
			
		||||
					if err = c.markRollingUpdateFlagForPod(&pod, "pod not yet restarted due to lazy update"); err != nil {
 | 
			
		||||
						c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err)
 | 
			
		||||
					}
 | 
			
		||||
					podsToRecreate = append(podsToRecreate, pod)
 | 
			
		||||
				} else {
 | 
			
		||||
					role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
 | 
			
		||||
					if role == Master {
 | 
			
		||||
						continue
 | 
			
		||||
					}
 | 
			
		||||
					switchoverCandidates = append(switchoverCandidates, util.NameFromMeta(pod.ObjectMeta))
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Apply special PostgreSQL parameters that can only be set via the Patroni API.
 | 
			
		||||
| 
						 | 
				
			
			@ -374,17 +410,13 @@ func (c *Cluster) syncStatefulSet() error {
 | 
			
		|||
 | 
			
		||||
	// if we get here we also need to re-create the pods (either leftovers from the old
 | 
			
		||||
	// statefulset or those that got their configuration from the outdated statefulset)
 | 
			
		||||
	if podsRollingUpdateRequired {
 | 
			
		||||
	if len(podsToRecreate) > 0 {
 | 
			
		||||
		c.logger.Debugln("performing rolling update")
 | 
			
		||||
		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
 | 
			
		||||
		if err := c.recreatePods(); err != nil {
 | 
			
		||||
		if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil {
 | 
			
		||||
			return fmt.Errorf("could not recreate pods: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		c.logger.Infof("pods have been recreated")
 | 
			
		||||
		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
 | 
			
		||||
		if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil {
 | 
			
		||||
			c.logger.Warningf("could not clear rolling update for the statefulset: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue