From eb4a6668dfc81420d0239fe23928735136e28a68 Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Mon, 1 Dec 2025 20:42:12 +0100 Subject: [PATCH 1/2] Use UpdateStatus instead of patch Signed-off-by: Mikkel Oscar Lyderik Larsen --- pkg/cluster/cluster.go | 62 ++++++++++++++++++++---------------- pkg/cluster/sync.go | 21 ++++++------ pkg/cluster/util_test.go | 8 ++++- pkg/controller/postgresql.go | 21 +++++++++--- pkg/util/k8sutil/k8sutil.go | 20 ++---------- 5 files changed, 71 insertions(+), 61 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 7204a92bd..ecb794993 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -32,6 +32,7 @@ import ( v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" @@ -271,26 +272,29 @@ func (c *Cluster) Create() (err error) { ) defer func() { - var ( - pgUpdatedStatus *acidv1.Postgresql - errStatus error - ) - if err == nil { - pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running? - } else { + currentStatus := c.Status.DeepCopy() + pg := c.Postgresql.DeepCopy() + pg.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning + + if err != nil { c.logger.Warningf("cluster created failed: %v", err) - pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed) + pg.Status.PostgresClusterStatus = acidv1.ClusterStatusAddFailed } - if errStatus != nil { - c.logger.Warningf("could not set cluster status: %v", errStatus) - return - } - if pgUpdatedStatus != nil { + + if !equality.Semantic.DeepEqual(currentStatus, pg.Status) { + pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pg) + if err != nil { + c.logger.Warningf("could not set cluster status: %v", err) + return + } c.setSpec(pgUpdatedStatus) } }() - pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating) + pg := c.Postgresql.DeepCopy() + pg.Status.PostgresClusterStatus = acidv1.ClusterStatusCreating + + pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pg) if err != nil { return fmt.Errorf("could not set cluster status: %v", err) } @@ -978,7 +982,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() - c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating) + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdating + + newSpec, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec) + if err != nil { + return fmt.Errorf("could not set cluster status to updating: %w", err) + } if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) { // do not apply any major version related changes yet @@ -987,20 +996,19 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { c.setSpec(newSpec) defer func() { - var ( - pgUpdatedStatus *acidv1.Postgresql - err error - ) + currentStatus := newSpec.Status.DeepCopy() + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning + if updateFailed { - pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed) - } else { - pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed } - if err != nil { - c.logger.Warningf("could not set cluster status: %v", err) - return - } - if pgUpdatedStatus != nil { + + if !equality.Semantic.DeepEqual(currentStatus, newSpec.Status) { + pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec) + if err != nil { + c.logger.Warningf("could not set cluster status: %v", err) + return + } c.setSpec(pgUpdatedStatus) } }() diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 9142f33bb..1a8506155 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -20,6 +20,7 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -43,21 +44,19 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { c.setSpec(newSpec) defer func() { - var ( - pgUpdatedStatus *acidv1.Postgresql - errStatus error - ) if err != nil { c.logger.Warningf("error while syncing cluster state: %v", err) - pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusSyncFailed) + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed } else if !c.Status.Running() { - pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning } - if errStatus != nil { - c.logger.Warningf("could not set cluster status: %v", errStatus) - return - } - if pgUpdatedStatus != nil { + + if !equality.Semantic.DeepEqual(oldSpec.Status, newSpec.Status) { + pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec) + if err != nil { + c.logger.Warningf("could not set cluster status: %v", err) + return + } c.setSpec(pgUpdatedStatus) } }() diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 9cd7dc7e9..2a8b0b90a 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -288,6 +288,12 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, }, } + // add postgresql cluster to fake client + _, err := client.PostgresqlsGetter.Postgresqls(namespace).Create(context.TODO(), &pg, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + cluster := New( Config{ OpConfig: config.Config{ @@ -321,7 +327,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, }, client, pg, logger, eventRecorder) cluster.Name = clusterName cluster.Namespace = namespace - _, err := cluster.createStatefulSet() + _, err = cluster.createStatefulSet() if err != nil { return nil, err } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 824a030f4..0725ffc1d 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -161,7 +161,8 @@ func (c *Controller) acquireInitialListOfClusters() error { func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) { if c.opConfig.EnableTeamIdClusternamePrefix { if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil { - c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusInvalid) + pgSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusInvalid + c.KubeClient.SetPostgresCRDStatus(clusterName, pgSpec) return nil, err } } @@ -470,13 +471,25 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. switch eventType { case EventAdd: - c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed) + informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusAddFailed + _, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec) + if err != nil { + c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err) + } c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError) case EventUpdate: - c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed) + informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed + _, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec) + if err != nil { + c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err) + } c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError) default: - c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed) + informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed + _, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec) + if err != nil { + c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err) + } c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError) } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index de1fb605a..c34faddd4 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -191,24 +191,8 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { } // SetPostgresCRDStatus of Postgres cluster -func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, status string) (*apiacidv1.Postgresql, error) { - var pg *apiacidv1.Postgresql - var pgStatus apiacidv1.PostgresStatus - pgStatus.PostgresClusterStatus = status - - patch, err := json.Marshal(struct { - PgStatus interface{} `json:"status"` - }{&pgStatus}) - - if err != nil { - return pg, fmt.Errorf("could not marshal status: %v", err) - } - - // we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ), - // however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11) - // we should take advantage of it. - pg, err = client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Patch( - context.TODO(), clusterName.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status") +func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql) (*apiacidv1.Postgresql, error) { + pg, err := client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).UpdateStatus(context.TODO(), pg, metav1.UpdateOptions{}) if err != nil { return pg, fmt.Errorf("could not update status: %v", err) } From 1aa546bd96e6f2201beb08d612bdb26703da20b2 Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Fri, 5 Dec 2025 19:50:49 +0100 Subject: [PATCH 2/2] Check for observedGeneration in e2e Signed-off-by: Mikkel Oscar Lyderik Larsen --- e2e/tests/k8s_api.py | 7 +++++-- e2e/tests/test_e2e.py | 41 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 1f42ad4bc..08a5465bb 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -92,10 +92,13 @@ class K8s: namespace='default' ) - def pg_get_status(self, name="acid-minimal-cluster", namespace="default"): + def pg_get(self, name="acid-minimal-cluster", namespace="default"): pg = self.api.custom_objects_api.get_namespaced_custom_object( "acid.zalan.do", "v1", namespace, "postgresqls", name) - return pg.get("status", {}).get("PostgresClusterStatus", None) + return pg + + def pg_get_status(self, name="acid-minimal-cluster", namespace="default"): + return pg_get(self, name, namespace).get("status", {}) def wait_for_pod_start(self, pod_labels, namespace='default'): pod_phase = 'No pod running' diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index f473b5cc4..cb105eb28 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -71,6 +71,19 @@ class EndToEndTestCase(unittest.TestCase): raise time.sleep(interval) + def eventuallyTrueFunc(self, f, xf, m, retries=60, interval=2): + while True: + try: + y = f() + x = xf(y) + self.assertTrue(xf(y), m) + return True + except AssertionError: + retries = retries - 1 + if not retries > 0: + raise + time.sleep(interval) + @classmethod @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def setUpClass(cls): @@ -559,7 +572,7 @@ class EndToEndTestCase(unittest.TestCase): pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar" del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove] - + k8s.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch) @@ -576,7 +589,7 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar", "The replication slot cannot be updated", 10, 5) - + # make sure slot from Patroni didn't get deleted self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1, "The replication slot from Patroni gets deleted", 10, 5) @@ -1670,6 +1683,13 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name=pooler_name), 2, "Operator did not succeed in overwriting labels") + # status observedGeneration should match metadata.generation + self.eventuallyTrueFunc( + lambda: k8s.pg_get(), + lambda pg: pg.get("metadata", {}).get("generation", 0) == pg.get("status", {}).get("observedGeneration", -1), + "Expected generation and status.observedGeneration to match", + ) + k8s.api.custom_objects_api.patch_namespaced_custom_object( 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', @@ -1683,6 +1703,13 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler="+pooler_name), 0, "Pooler pods not scaled down") + # status observedGeneration should match metadata.generation + self.eventuallyTrueFunc( + lambda: k8s.pg_get(), + lambda pg: pg.get("metadata", {}).get("generation", 0) == pg.get("status", {}).get("observedGeneration", -1), + "Expected generation and status.observedGeneration to match", + ) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_owner_references(self): ''' @@ -2022,7 +2049,7 @@ class EndToEndTestCase(unittest.TestCase): # pod_label_wait_timeout should have been exceeded hence the rolling update is continued on next sync # check if the cluster state is "SyncFailed" - self.eventuallyEqual(lambda: k8s.pg_get_status(), "SyncFailed", "Expected SYNC event to fail") + self.eventuallyEqual(lambda: k8s.pg_get_status(), {"PostgresClusterStatus": "SyncFailed"}, "Expected SYNC event to fail") # wait for next sync, replica should be running normally by now and be ready for switchover k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label) @@ -2037,7 +2064,13 @@ class EndToEndTestCase(unittest.TestCase): # status should again be "SyncFailed" but turn into "Running" on the next sync time.sleep(30) - self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Expected running cluster after two syncs") + self.eventuallyEqual(lambda: k8s.pg_get_status(), {"PostgresClusterStatus": "Running"}, "Expected running cluster after two syncs") + # status observedGeneration should match metadata.generation + self.eventuallyTrueFunc( + lambda: k8s.pg_get(), + lambda pg: pg.get("metadata", {}).get("generation", 0) == pg.get("status", {}).get("observedGeneration", -1), + "Expected generation and status.observedGeneration to match", + ) # revert config changes patch_resync_config = {