From d032e4783eb84fcc9c3056d374f4b6340f18141b Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 4 Mar 2022 13:36:17 +0100 Subject: [PATCH] LoadBalancer toggles for master and replica pooler pods (#1799) * Add support for pooler load balancer Signed-off-by: Sergey Shatunov * Rename to enable_master_pooler_load_balancer Signed-off-by: Sergey Shatunov * target port should be intval * enhance pooler e2e test * add new options to crds.go Co-authored-by: Sergey Shatunov --- .../crds/operatorconfigurations.yaml | 6 + .../postgres-operator/crds/postgresqls.yaml | 4 + charts/postgres-operator/values.yaml | 4 + docs/administrator.md | 5 + docs/reference/cluster_manifest.md | 12 ++ docs/reference/operator_parameters.md | 16 +- e2e/tests/test_e2e.py | 75 +++---- manifests/configmap.yaml | 2 + manifests/operatorconfiguration.crd.yaml | 6 + ...gresql-operator-default-configuration.yaml | 2 + manifests/postgresql.crd.yaml | 4 + pkg/apis/acid.zalan.do/v1/crds.go | 12 ++ .../v1/operator_configuration_type.go | 16 +- pkg/apis/acid.zalan.do/v1/postgresql_type.go | 6 +- .../acid.zalan.do/v1/zz_generated.deepcopy.go | 10 + pkg/cluster/cluster_test.go | 2 +- pkg/cluster/connection_pooler.go | 90 ++++++--- pkg/cluster/connection_pooler_test.go | 12 +- pkg/cluster/k8sres.go | 41 ++-- pkg/cluster/k8sres_test.go | 183 ++++++++++++++++++ pkg/cluster/resources.go | 27 ++- pkg/cluster/sync.go | 4 +- pkg/cluster/util_test.go | 2 +- pkg/controller/operator_config.go | 13 +- pkg/util/config/config.go | 2 + 25 files changed, 429 insertions(+), 127 deletions(-) diff --git a/charts/postgres-operator/crds/operatorconfigurations.yaml b/charts/postgres-operator/crds/operatorconfigurations.yaml index f67c04fc5..469aa5588 100644 --- a/charts/postgres-operator/crds/operatorconfigurations.yaml +++ b/charts/postgres-operator/crds/operatorconfigurations.yaml @@ -380,9 +380,15 @@ spec: enable_master_load_balancer: type: boolean default: true + enable_master_pooler_load_balancer: + type: boolean + default: false enable_replica_load_balancer: type: boolean default: false + enable_replica_pooler_load_balancer: + type: boolean + default: false external_traffic_policy: type: string enum: diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index a7072b8cc..5ec40bdb1 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -197,8 +197,12 @@ spec: type: boolean enableMasterLoadBalancer: type: boolean + enableMasterPoolerLoadBalancer: + type: boolean enableReplicaLoadBalancer: type: boolean + enableReplicaPoolerLoadBalancer: + type: boolean enableShmVolume: type: boolean init_containers: diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index 5d38b5233..9f144ba5e 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -228,8 +228,12 @@ configLoadBalancer: # toggles service type load balancer pointing to the master pod of the cluster enable_master_load_balancer: false + # toggles service type load balancer pointing to the master pooler pod of the cluster + enable_master_pooler_load_balancer: false # toggles service type load balancer pointing to the replica pod of the cluster enable_replica_load_balancer: false + # toggles service type load balancer pointing to the replica pooler pod of the cluster + enable_replica_pooler_load_balancer: false # define external traffic policy for the load balancer external_traffic_policy: "Cluster" # defines the DNS name string template for the master load balancer cluster diff --git a/docs/administrator.md b/docs/administrator.md index e68427658..dd6bf59b0 100644 --- a/docs/administrator.md +++ b/docs/administrator.md @@ -750,6 +750,11 @@ lead to K8s removing this field from the manifest due to its Then the resultant manifest will not contain the necessary change, and the operator will respectively do nothing with the existing source ranges. +Load balancer services can also be enabled for the [connection pooler](user.md#connection-pooler) +pods with manifest flags `enableMasterPoolerLoadBalancer` and/or +`enableReplicaPoolerLoadBalancer` or in the operator configuration with +`enable_master_pooler_load_balancer` and/or `enable_replica_pooler_load_balancer`. + ## Running periodic 'autorepair' scans of K8s objects The Postgres Operator periodically scans all K8s objects belonging to each diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index cff86333e..bf77bd8b8 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -91,11 +91,23 @@ These parameters are grouped directly under the `spec` key in the manifest. `enable_master_load_balancer` parameter) to define whether to enable the load balancer pointing to the Postgres primary. Optional. +* **enableMasterPoolerLoadBalancer** + boolean flag to override the operator defaults (set by the + `enable_master_pooler_load_balancer` parameter) to define whether to enable + the load balancer for master pooler pods pointing to the Postgres primary. + Optional. + * **enableReplicaLoadBalancer** boolean flag to override the operator defaults (set by the `enable_replica_load_balancer` parameter) to define whether to enable the load balancer pointing to the Postgres standby instances. Optional. +* **enableReplicaPoolerLoadBalancer** + boolean flag to override the operator defaults (set by the + `enable_replica_pooler_load_balancer` parameter) to define whether to enable + the load balancer for replica pooler pods pointing to the Postgres standby + instances. Optional. + * **allowedSourceRanges** when one or more load balancers are enabled for the cluster, this parameter defines the comma-separated range of IP networks (in CIDR-notation). The diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index cbe31ba57..5988271c5 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -546,11 +546,21 @@ In the CRD-based configuration they are grouped under the `load_balancer` key. toggles service type load balancer pointing to the master pod of the cluster. Can be overridden by individual cluster settings. The default is `true`. -* **enable_replica_load_balancer** - toggles service type load balancer pointing to the replica pod of the - cluster. Can be overridden by individual cluster settings. The default is +* **enable_master_pooler_load_balancer** + toggles service type load balancer pointing to the master pooler pod of the + cluster. Can be overridden by individual cluster settings. The default is `false`. +* **enable_replica_load_balancer** + toggles service type load balancer pointing to the replica pod(s) of the + cluster. Can be overridden by individual cluster settings. The default is + `false`. + +* **enable_replica_pooler_load_balancer** + toggles service type load balancer pointing to the replica pooler pod(s) of + the cluster. Can be overridden by individual cluster settings. The default + is `false`. + * **external_traffic_policy** defines external traffic policy for load balancers. Allowed values are `Cluster` (default) and `Local`. diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index c4d104069..09b2c9c60 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -467,6 +467,9 @@ class EndToEndTestCase(unittest.TestCase): the end turn connection pooler off to not interfere with other tests. ''' k8s = self.k8s + pooler_label = 'application=db-connection-pooler,cluster-name=acid-minimal-cluster' + master_pooler_label = 'connection-pooler=acid-minimal-cluster-pooler' + replica_pooler_label = master_pooler_label + '-repl' self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") k8s.api.custom_objects_api.patch_namespaced_custom_object( @@ -476,22 +479,23 @@ class EndToEndTestCase(unittest.TestCase): 'spec': { 'enableConnectionPooler': True, 'enableReplicaConnectionPooler': True, + 'enableMasterPoolerLoadBalancer': True, + 'enableReplicaPoolerLoadBalancer': True, } }) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2, - "Deployment replicas is 2 default") - self.eventuallyEqual(lambda: k8s.count_running_pods( - "connection-pooler=acid-minimal-cluster-pooler"), - 2, "No pooler pods found") - self.eventuallyEqual(lambda: k8s.count_running_pods( - "connection-pooler=acid-minimal-cluster-pooler-repl"), - 2, "No pooler replica pods found") - self.eventuallyEqual(lambda: k8s.count_services_with_label( - 'application=db-connection-pooler,cluster-name=acid-minimal-cluster'), - 2, "No pooler service found") - self.eventuallyEqual(lambda: k8s.count_secrets_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), - 1, "Pooler secret not created") + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2, "Deployment replicas is 2 default") + self.eventuallyEqual(lambda: k8s.count_running_pods(master_pooler_label), 2, "No pooler pods found") + self.eventuallyEqual(lambda: k8s.count_running_pods(replica_pooler_label), 2, "No pooler replica pods found") + self.eventuallyEqual(lambda: k8s.count_services_with_label(pooler_label), 2, "No pooler service found") + self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label), 1, "Pooler secret not created") + self.eventuallyEqual(lambda: k8s.get_service_type(master_pooler_label+","+pooler_label), + 'LoadBalancer', + "Expected LoadBalancer service type for master pooler pod, found {}") + self.eventuallyEqual(lambda: k8s.get_service_type(replica_pooler_label+","+pooler_label), + 'LoadBalancer', + "Expected LoadBalancer service type for replica pooler pod, found {}") # Turn off only master connection pooler k8s.api.custom_objects_api.patch_namespaced_custom_object( @@ -504,20 +508,17 @@ class EndToEndTestCase(unittest.TestCase): } }) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, - "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name="acid-minimal-cluster-pooler-repl"), 2, "Deployment replicas is 2 default") - self.eventuallyEqual(lambda: k8s.count_running_pods( - "connection-pooler=acid-minimal-cluster-pooler"), + self.eventuallyEqual(lambda: k8s.count_running_pods(master_pooler_label), 0, "Master pooler pods not deleted") - self.eventuallyEqual(lambda: k8s.count_running_pods( - "connection-pooler=acid-minimal-cluster-pooler-repl"), + self.eventuallyEqual(lambda: k8s.count_running_pods(replica_pooler_label), 2, "Pooler replica pods not found") - self.eventuallyEqual(lambda: k8s.count_services_with_label( - 'application=db-connection-pooler,cluster-name=acid-minimal-cluster'), + self.eventuallyEqual(lambda: k8s.count_services_with_label(pooler_label), 1, "No pooler service found") - self.eventuallyEqual(lambda: k8s.count_secrets_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), + self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label), 1, "Secret not created") # Turn off only replica connection pooler @@ -528,20 +529,24 @@ class EndToEndTestCase(unittest.TestCase): 'spec': { 'enableConnectionPooler': True, 'enableReplicaConnectionPooler': False, + 'enableMasterPoolerLoadBalancer': False, } }) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, - "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 2, "Deployment replicas is 2 default") - self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), + self.eventuallyEqual(lambda: k8s.count_running_pods(master_pooler_label), 2, "Master pooler pods not found") - self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler-repl"), + self.eventuallyEqual(lambda: k8s.count_running_pods(replica_pooler_label), 0, "Pooler replica pods not deleted") - self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), + self.eventuallyEqual(lambda: k8s.count_services_with_label(pooler_label), 1, "No pooler service found") - self.eventuallyEqual(lambda: k8s.count_secrets_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), + self.eventuallyEqual(lambda: k8s.get_service_type(master_pooler_label+","+pooler_label), + 'ClusterIP', + "Expected LoadBalancer service type for master, found {}") + self.eventuallyEqual(lambda: k8s.count_secrets_with_label(pooler_label), 1, "Secret not created") # scale up connection pooler deployment @@ -558,7 +563,7 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(), 3, "Deployment replicas is scaled to 3") - self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), + self.eventuallyEqual(lambda: k8s.count_running_pods(master_pooler_label), 3, "Scale up of pooler pods does not work") # turn it off, keeping config should be overwritten by false @@ -569,12 +574,13 @@ class EndToEndTestCase(unittest.TestCase): 'spec': { 'enableConnectionPooler': False, 'enableReplicaConnectionPooler': False, + 'enableReplicaPoolerLoadBalancer': False, } }) - self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), + self.eventuallyEqual(lambda: k8s.count_running_pods(master_pooler_label), 0, "Pooler pods not scaled down") - self.eventuallyEqual(lambda: k8s.count_services_with_label('application=db-connection-pooler,cluster-name=acid-minimal-cluster'), + self.eventuallyEqual(lambda: k8s.count_services_with_label(pooler_label), 0, "Pooler service not removed") self.eventuallyEqual(lambda: k8s.count_secrets_with_label('application=spilo,cluster-name=acid-minimal-cluster'), 4, "Secrets not deleted") @@ -1177,10 +1183,11 @@ class EndToEndTestCase(unittest.TestCase): @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_overwrite_pooler_deployment(self): + pooler_name = 'acid-minimal-cluster-pooler' k8s = self.k8s k8s.create_with_kubectl("manifests/minimal-fake-pooler-deployment.yaml") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name="acid-minimal-cluster-pooler"), 1, + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name=pooler_name), 1, "Initial broken deployment not rolled out") k8s.api.custom_objects_api.patch_namespaced_custom_object( @@ -1193,7 +1200,7 @@ class EndToEndTestCase(unittest.TestCase): }) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name="acid-minimal-cluster-pooler"), 2, + self.eventuallyEqual(lambda: k8s.get_deployment_replica_count(name=pooler_name), 2, "Operator did not succeed in overwriting labels") k8s.api.custom_objects_api.patch_namespaced_custom_object( @@ -1206,7 +1213,7 @@ class EndToEndTestCase(unittest.TestCase): }) self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), + self.eventuallyEqual(lambda: k8s.count_running_pods("connection-pooler="+pooler_name), 0, "Pooler pods not scaled down") @timeout_decorator.timeout(TEST_TIMEOUT_SEC) diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index b9605441e..d127299c4 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -44,6 +44,7 @@ data: # enable_init_containers: "true" # enable_lazy_spilo_upgrade: "false" enable_master_load_balancer: "false" + enable_master_pooler_load_balancer: "false" enable_password_rotation: "false" enable_pgversion_env_var: "true" # enable_pod_antiaffinity: "false" @@ -51,6 +52,7 @@ data: # enable_postgres_team_crd: "false" # enable_postgres_team_crd_superusers: "false" enable_replica_load_balancer: "false" + enable_replica_pooler_load_balancer: "false" # enable_shm_volume: "true" # enable_sidecars: "true" enable_spilo_wal_path_compat: "true" diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index e6c2a2d7c..d6241abea 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -378,9 +378,15 @@ spec: enable_master_load_balancer: type: boolean default: true + enable_master_pooler_load_balancer: + type: boolean + default: false enable_replica_load_balancer: type: boolean default: false + enable_replica_pooler_load_balancer: + type: boolean + default: false external_traffic_policy: type: string enum: diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index 0641bd1b1..7ab8565a9 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -119,7 +119,9 @@ configuration: # keyy: valuey # db_hosted_zone: "" enable_master_load_balancer: false + enable_master_pooler_load_balancer: false enable_replica_load_balancer: false + enable_replica_pooler_load_balancer: false external_traffic_policy: "Cluster" master_dns_name_format: "{cluster}.{team}.{hostedzone}" replica_dns_name_format: "{cluster}-repl.{team}.{hostedzone}" diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 7cf220eb7..183b97fc6 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -195,8 +195,12 @@ spec: type: boolean enableMasterLoadBalancer: type: boolean + enableMasterPoolerLoadBalancer: + type: boolean enableReplicaLoadBalancer: type: boolean + enableReplicaPoolerLoadBalancer: + type: boolean enableShmVolume: type: boolean init_containers: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index d8d46e248..84c57b7cf 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -302,9 +302,15 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ "enableMasterLoadBalancer": { Type: "boolean", }, + "enableMasterPoolerLoadBalancer": { + Type: "boolean", + }, "enableReplicaLoadBalancer": { Type: "boolean", }, + "enableReplicaPoolerLoadBalancer": { + Type: "boolean", + }, "enableShmVolume": { Type: "boolean", }, @@ -1469,9 +1475,15 @@ var OperatorConfigCRDResourceValidation = apiextv1.CustomResourceValidation{ "enable_master_load_balancer": { Type: "boolean", }, + "enable_master_pooler_load_balancer": { + Type: "boolean", + }, "enable_replica_load_balancer": { Type: "boolean", }, + "enable_replica_pooler_load_balancer": { + Type: "boolean", + }, "external_traffic_policy": { Type: "string", Enum: []apiextv1.JSON{ diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index 00fce42b5..f5c57aaa4 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -121,13 +121,15 @@ type OperatorTimeouts struct { // LoadBalancerConfiguration defines the LB configuration type LoadBalancerConfiguration struct { - DbHostedZone string `json:"db_hosted_zone,omitempty"` - EnableMasterLoadBalancer bool `json:"enable_master_load_balancer,omitempty"` - EnableReplicaLoadBalancer bool `json:"enable_replica_load_balancer,omitempty"` - CustomServiceAnnotations map[string]string `json:"custom_service_annotations,omitempty"` - MasterDNSNameFormat config.StringTemplate `json:"master_dns_name_format,omitempty"` - ReplicaDNSNameFormat config.StringTemplate `json:"replica_dns_name_format,omitempty"` - ExternalTrafficPolicy string `json:"external_traffic_policy" default:"Cluster"` + DbHostedZone string `json:"db_hosted_zone,omitempty"` + EnableMasterLoadBalancer bool `json:"enable_master_load_balancer,omitempty"` + EnableMasterPoolerLoadBalancer bool `json:"enable_master_pooler_load_balancer,omitempty"` + EnableReplicaLoadBalancer bool `json:"enable_replica_load_balancer,omitempty"` + EnableReplicaPoolerLoadBalancer bool `json:"enable_replica_pooler_load_balancer,omitempty"` + CustomServiceAnnotations map[string]string `json:"custom_service_annotations,omitempty"` + MasterDNSNameFormat config.StringTemplate `json:"master_dns_name_format,omitempty"` + ReplicaDNSNameFormat config.StringTemplate `json:"replica_dns_name_format,omitempty"` + ExternalTrafficPolicy string `json:"external_traffic_policy" default:"Cluster"` } // AWSGCPConfiguration defines the configuration for AWS diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index a1fc4fcf7..50be9e551 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -42,8 +42,10 @@ type PostgresSpec struct { // vars that enable load balancers are pointers because it is important to know if any of them is omitted from the Postgres manifest // in that case the var evaluates to nil and the value is taken from the operator config - EnableMasterLoadBalancer *bool `json:"enableMasterLoadBalancer,omitempty"` - EnableReplicaLoadBalancer *bool `json:"enableReplicaLoadBalancer,omitempty"` + EnableMasterLoadBalancer *bool `json:"enableMasterLoadBalancer,omitempty"` + EnableMasterPoolerLoadBalancer *bool `json:"enableMasterPoolerLoadBalancer,omitempty"` + EnableReplicaLoadBalancer *bool `json:"enableReplicaLoadBalancer,omitempty"` + EnableReplicaPoolerLoadBalancer *bool `json:"enableReplicaPoolerLoadBalancer,omitempty"` // deprecated load balancer settings maintained for backward compatibility // see "Load balancers" operator docs diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index c2298fada..91f6be563 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -611,11 +611,21 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { *out = new(bool) **out = **in } + if in.EnableMasterPoolerLoadBalancer != nil { + in, out := &in.EnableMasterPoolerLoadBalancer, &out.EnableMasterPoolerLoadBalancer + *out = new(bool) + **out = **in + } if in.EnableReplicaLoadBalancer != nil { in, out := &in.EnableReplicaLoadBalancer, &out.EnableReplicaLoadBalancer *out = new(bool) **out = **in } + if in.EnableReplicaPoolerLoadBalancer != nil { + in, out := &in.EnableReplicaPoolerLoadBalancer, &out.EnableReplicaPoolerLoadBalancer + *out = new(bool) + **out = **in + } if in.UseLoadBalancer != nil { in, out := &in.UseLoadBalancer, &out.UseLoadBalancer *out = new(bool) diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index d06cc21e1..fba11a6dd 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -1041,7 +1041,7 @@ func TestCrossNamespacedSecrets(t *testing.T) { ConnectionPoolerDefaultCPULimit: "100m", ConnectionPoolerDefaultMemoryRequest: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi", - NumberOfInstances: int32ToPointer(1), + NumberOfInstances: k8sutil.Int32ToPointer(1), }, PodManagementPolicy: "ordered_ready", Resources: config.Resources{ diff --git a/pkg/cluster/connection_pooler.go b/pkg/cluster/connection_pooler.go index ec9fe291d..12c0be6ae 100644 --- a/pkg/cluster/connection_pooler.go +++ b/pkg/cluster/connection_pooler.go @@ -247,7 +247,7 @@ func (c *Cluster) generateConnectionPoolerPodTemplate(role PostgresRole) ( }, { Name: "PGPORT", - Value: c.servicePort(role), + Value: fmt.Sprint(c.servicePort(role)), }, { Name: "PGUSER", @@ -386,7 +386,7 @@ func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPo { Name: connectionPooler.Name, Port: pgPort, - TargetPort: intstr.IntOrString{StrVal: c.servicePort(connectionPooler.Role)}, + TargetPort: intstr.IntOrString{IntVal: c.servicePort(connectionPooler.Role)}, }, }, Type: v1.ServiceTypeClusterIP, @@ -395,6 +395,10 @@ func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPo }, } + if c.shouldCreateLoadBalancerForPoolerService(connectionPooler.Role, spec) { + c.configureLoadBalanceService(&serviceSpec, spec.AllowedSourceRanges) + } + service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: connectionPooler.Name, @@ -415,6 +419,29 @@ func (c *Cluster) generateConnectionPoolerService(connectionPooler *ConnectionPo return service } +func (c *Cluster) shouldCreateLoadBalancerForPoolerService(role PostgresRole, spec *acidv1.PostgresSpec) bool { + + switch role { + + case Replica: + // if the value is explicitly set in a Postgresql manifest, follow this setting + if spec.EnableReplicaPoolerLoadBalancer != nil { + return *spec.EnableReplicaPoolerLoadBalancer + } + // otherwise, follow the operator configuration + return c.OpConfig.EnableReplicaPoolerLoadBalancer + + case Master: + if spec.EnableMasterPoolerLoadBalancer != nil { + return *spec.EnableMasterPoolerLoadBalancer + } + return c.OpConfig.EnableMasterPoolerLoadBalancer + + default: + panic(fmt.Sprintf("Unknown role %v", role)) + } +} + //delete connection pooler func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) { c.logger.Infof("deleting connection pooler spilo-role=%s", role) @@ -811,6 +838,7 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, Look func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql, role PostgresRole) ( SyncReason, error) { + syncReason := make([]string, 0) deployment, err := c.KubeClient. Deployments(c.Namespace). Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) @@ -865,25 +893,26 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if oldSpec != nil { specSync, specReason = needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler, c.logger) + syncReason = append(syncReason, specReason...) } defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(&c.Config, newConnectionPooler, deployment) - reason := append(specReason, defaultsReason...) + syncReason = append(syncReason, defaultsReason...) if specSync || defaultsSync { c.logger.Infof("Update connection pooler deployment %s, reason: %+v", - c.connectionPoolerName(role), reason) + c.connectionPoolerName(role), syncReason) newDeploymentSpec, err := c.generateConnectionPoolerDeployment(c.ConnectionPooler[role]) if err != nil { msg := "could not generate deployment for connection pooler: %v" - return reason, fmt.Errorf(msg, err) + return syncReason, fmt.Errorf(msg, err) } deployment, err := updateConnectionPoolerDeployment(c.KubeClient, newDeploymentSpec) if err != nil { - return reason, err + return syncReason, err } c.ConnectionPooler[role].Deployment = deployment } @@ -898,31 +927,40 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql c.ConnectionPooler[role].Deployment = deployment } - service, err := c.KubeClient. - Services(c.Namespace). - Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) - - if err != nil && k8sutil.ResourceNotFound(err) { - msg := "Service %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName(role)) - - serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role]) - service, err := c.KubeClient. - Services(serviceSpec.Namespace). - Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) - - if err != nil { - return NoSync, err + if svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil { + c.ConnectionPooler[role].Service = svc + desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role]) + if match, reason := k8sutil.SameService(svc, desiredSvc); !match { + syncReason = append(syncReason, reason) + c.logServiceChanges(role, svc, desiredSvc, false, reason) + updatedService, err := c.updateService(role, svc, desiredSvc) + if err != nil { + return syncReason, fmt.Errorf("could not update %s service to match desired state: %v", role, err) + } + c.ConnectionPooler[role].Service = updatedService + c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) } - c.ConnectionPooler[role].Service = service + return NoSync, nil + } - } else if err != nil { + if !k8sutil.ResourceNotFound(err) { msg := "could not get connection pooler service to sync: %v" return NoSync, fmt.Errorf(msg, err) - } else { - // Service updates are not supported and probably not that useful anyway - c.ConnectionPooler[role].Service = service } + c.ConnectionPooler[role].Service = nil + msg := "Service %s for connection pooler synchronization is not found, create it" + c.logger.Warningf(msg, c.connectionPoolerName(role)) + + serviceSpec := c.generateConnectionPoolerService(c.ConnectionPooler[role]) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) + + if err != nil { + return NoSync, err + } + c.ConnectionPooler[role].Service = service + return NoSync, nil } diff --git a/pkg/cluster/connection_pooler_test.go b/pkg/cluster/connection_pooler_test.go index 9b983c7b0..da45899b4 100644 --- a/pkg/cluster/connection_pooler_test.go +++ b/pkg/cluster/connection_pooler_test.go @@ -27,10 +27,6 @@ func boolToPointer(value bool) *bool { return &value } -func int32ToPointer(value int32) *int32 { - return &value -} - func deploymentUpdated(cluster *Cluster, err error, reason SyncReason) error { for _, role := range [2]PostgresRole{Master, Replica} { @@ -294,7 +290,7 @@ func TestConnectionPoolerCreateDeletion(t *testing.T) { ConnectionPoolerDefaultCPULimit: "100m", ConnectionPoolerDefaultMemoryRequest: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi", - NumberOfInstances: int32ToPointer(1), + NumberOfInstances: k8sutil.Int32ToPointer(1), }, PodManagementPolicy: "ordered_ready", Resources: config.Resources{ @@ -401,7 +397,7 @@ func TestConnectionPoolerSync(t *testing.T) { ConnectionPoolerDefaultCPULimit: "100m", ConnectionPoolerDefaultMemoryRequest: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi", - NumberOfInstances: int32ToPointer(1), + NumberOfInstances: k8sutil.Int32ToPointer(1), }, PodManagementPolicy: "ordered_ready", Resources: config.Resources{ @@ -639,7 +635,7 @@ func TestConnectionPoolerSync(t *testing.T) { for _, tt := range tests { tt.cluster.OpConfig.ConnectionPooler.Image = tt.defaultImage tt.cluster.OpConfig.ConnectionPooler.NumberOfInstances = - int32ToPointer(tt.defaultInstances) + k8sutil.Int32ToPointer(tt.defaultInstances) t.Logf("running test for %s [%s]", testName, tt.subTest) @@ -664,7 +660,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { ReplicationUsername: replicationUserName, }, ConnectionPooler: config.ConnectionPooler{ - MaxDBConnections: int32ToPointer(60), + MaxDBConnections: k8sutil.Int32ToPointer(60), ConnectionPoolerDefaultCPURequest: "100m", ConnectionPoolerDefaultCPULimit: "100m", ConnectionPoolerDefaultMemoryRequest: "100Mi", diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 91dd20765..6fa983c80 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -102,15 +102,15 @@ func (c *Cluster) serviceAddress(role PostgresRole) string { return "" } -func (c *Cluster) servicePort(role PostgresRole) string { +func (c *Cluster) servicePort(role PostgresRole) int32 { service, exist := c.Services[role] if exist { - return fmt.Sprint(service.Spec.Ports[0].Port) + return service.Spec.Ports[0].Port } - c.logger.Warningf("No service for role %s", role) - return "" + c.logger.Warningf("No service for role %s - defaulting to port 5432", role) + return pgPort } func (c *Cluster) podDisruptionBudgetName() string { @@ -1699,23 +1699,7 @@ func (c *Cluster) generateService(role PostgresRole, spec *acidv1.PostgresSpec) } if c.shouldCreateLoadBalancerForService(role, spec) { - - // spec.AllowedSourceRanges evaluates to the empty slice of zero length - // when omitted or set to 'null'/empty sequence in the PG manifest - if len(spec.AllowedSourceRanges) > 0 { - serviceSpec.LoadBalancerSourceRanges = spec.AllowedSourceRanges - } else { - // safe default value: lock a load balancer only to the local address unless overridden explicitly - serviceSpec.LoadBalancerSourceRanges = []string{localHost} - } - - c.logger.Debugf("final load balancer source ranges as seen in a service spec (not necessarily applied): %q", serviceSpec.LoadBalancerSourceRanges) - serviceSpec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyType(c.OpConfig.ExternalTrafficPolicy) - serviceSpec.Type = v1.ServiceTypeLoadBalancer - } else if role == Replica { - // before PR #258, the replica service was only created if allocated a LB - // now we always create the service but warn if the LB is absent - c.logger.Debugf("No load balancer created for the replica service") + c.configureLoadBalanceService(&serviceSpec, spec.AllowedSourceRanges) } service := &v1.Service{ @@ -1731,6 +1715,21 @@ func (c *Cluster) generateService(role PostgresRole, spec *acidv1.PostgresSpec) return service } +func (c *Cluster) configureLoadBalanceService(serviceSpec *v1.ServiceSpec, sourceRanges []string) { + // spec.AllowedSourceRanges evaluates to the empty slice of zero length + // when omitted or set to 'null'/empty sequence in the PG manifest + if len(sourceRanges) > 0 { + serviceSpec.LoadBalancerSourceRanges = sourceRanges + } else { + // safe default value: lock a load balancer only to the local address unless overridden explicitly + serviceSpec.LoadBalancerSourceRanges = []string{localHost} + } + + c.logger.Debugf("final load balancer source ranges as seen in a service spec (not necessarily applied): %q", serviceSpec.LoadBalancerSourceRanges) + serviceSpec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyType(c.OpConfig.ExternalTrafficPolicy) + serviceSpec.Type = v1.ServiceTypeLoadBalancer +} + func (c *Cluster) generateServiceAnnotations(role PostgresRole, spec *acidv1.PostgresSpec) map[string]string { annotations := make(map[string]string) diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 6875c9dba..cb2fd83c0 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/fake" v1core "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -1484,6 +1485,188 @@ func TestGenerateService(t *testing.T) { } +func newLBFakeClient() (k8sutil.KubernetesClient, *fake.Clientset) { + clientSet := fake.NewSimpleClientset() + + return k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + }, clientSet +} + +func getServices(serviceType v1.ServiceType, sourceRanges []string, extTrafficPolicy, clusterName string) []v1.ServiceSpec { + return []v1.ServiceSpec{ + v1.ServiceSpec{ + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyType(extTrafficPolicy), + LoadBalancerSourceRanges: sourceRanges, + Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, + Type: serviceType, + }, + v1.ServiceSpec{ + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyType(extTrafficPolicy), + LoadBalancerSourceRanges: sourceRanges, + Ports: []v1.ServicePort{{Name: clusterName + "-pooler", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, + Selector: map[string]string{"connection-pooler": clusterName + "-pooler"}, + Type: serviceType, + }, + v1.ServiceSpec{ + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyType(extTrafficPolicy), + LoadBalancerSourceRanges: sourceRanges, + Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, + Selector: map[string]string{"spilo-role": "replica", "application": "spilo", "cluster-name": clusterName}, + Type: serviceType, + }, + v1.ServiceSpec{ + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyType(extTrafficPolicy), + LoadBalancerSourceRanges: sourceRanges, + Ports: []v1.ServicePort{{Name: clusterName + "-pooler-repl", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, + Selector: map[string]string{"connection-pooler": clusterName + "-pooler-repl"}, + Type: serviceType, + }, + } +} + +func TestEnableLoadBalancers(t *testing.T) { + testName := "Test enabling LoadBalancers" + client, _ := newLBFakeClient() + clusterName := "acid-test-cluster" + namespace := "default" + clusterNameLabel := "cluster-name" + roleLabel := "spilo-role" + roles := []PostgresRole{Master, Replica} + sourceRanges := []string{"192.186.1.2/22"} + extTrafficPolicy := "Cluster" + + tests := []struct { + subTest string + config config.Config + pgSpec acidv1.Postgresql + expectedServices []v1.ServiceSpec + }{ + { + subTest: "LBs enabled in config, disabled in manifest", + config: config.Config{ + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + NumberOfInstances: k8sutil.Int32ToPointer(1), + }, + EnableMasterLoadBalancer: true, + EnableMasterPoolerLoadBalancer: true, + EnableReplicaLoadBalancer: true, + EnableReplicaPoolerLoadBalancer: true, + ExternalTrafficPolicy: extTrafficPolicy, + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: clusterNameLabel, + PodRoleLabel: roleLabel, + }, + }, + pgSpec: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + Spec: acidv1.PostgresSpec{ + AllowedSourceRanges: sourceRanges, + EnableConnectionPooler: util.True(), + EnableReplicaConnectionPooler: util.True(), + EnableMasterLoadBalancer: util.False(), + EnableMasterPoolerLoadBalancer: util.False(), + EnableReplicaLoadBalancer: util.False(), + EnableReplicaPoolerLoadBalancer: util.False(), + NumberOfInstances: 1, + Resources: acidv1.Resources{ + ResourceRequests: acidv1.ResourceDescription{CPU: "1", Memory: "10"}, + ResourceLimits: acidv1.ResourceDescription{CPU: "1", Memory: "10"}, + }, + TeamID: "acid", + Volume: acidv1.Volume{ + Size: "1G", + }, + }, + }, + expectedServices: getServices(v1.ServiceTypeClusterIP, nil, "", clusterName), + }, + { + subTest: "LBs enabled in manifest, disabled in config", + config: config.Config{ + ConnectionPooler: config.ConnectionPooler{ + ConnectionPoolerDefaultCPURequest: "100m", + ConnectionPoolerDefaultCPULimit: "100m", + ConnectionPoolerDefaultMemoryRequest: "100Mi", + ConnectionPoolerDefaultMemoryLimit: "100Mi", + NumberOfInstances: k8sutil.Int32ToPointer(1), + }, + EnableMasterLoadBalancer: false, + EnableMasterPoolerLoadBalancer: false, + EnableReplicaLoadBalancer: false, + EnableReplicaPoolerLoadBalancer: false, + ExternalTrafficPolicy: extTrafficPolicy, + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: clusterNameLabel, + PodRoleLabel: roleLabel, + }, + }, + pgSpec: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + Spec: acidv1.PostgresSpec{ + AllowedSourceRanges: sourceRanges, + EnableConnectionPooler: util.True(), + EnableReplicaConnectionPooler: util.True(), + EnableMasterLoadBalancer: util.True(), + EnableMasterPoolerLoadBalancer: util.True(), + EnableReplicaLoadBalancer: util.True(), + EnableReplicaPoolerLoadBalancer: util.True(), + NumberOfInstances: 1, + Resources: acidv1.Resources{ + ResourceRequests: acidv1.ResourceDescription{CPU: "1", Memory: "10"}, + ResourceLimits: acidv1.ResourceDescription{CPU: "1", Memory: "10"}, + }, + TeamID: "acid", + Volume: acidv1.Volume{ + Size: "1G", + }, + }, + }, + expectedServices: getServices(v1.ServiceTypeLoadBalancer, sourceRanges, extTrafficPolicy, clusterName), + }, + } + + for _, tt := range tests { + var cluster = New( + Config{ + OpConfig: tt.config, + }, client, tt.pgSpec, logger, eventRecorder) + + cluster.Name = clusterName + cluster.Namespace = namespace + cluster.ConnectionPooler = map[PostgresRole]*ConnectionPoolerObjects{} + generatedServices := make([]v1.ServiceSpec, 0) + for _, role := range roles { + cluster.syncService(role) + cluster.ConnectionPooler[role] = &ConnectionPoolerObjects{ + Name: cluster.connectionPoolerName(role), + ClusterName: cluster.ClusterName, + Namespace: cluster.Namespace, + Role: role, + } + cluster.syncConnectionPoolerWorker(&tt.pgSpec, &tt.pgSpec, role) + generatedServices = append(generatedServices, cluster.Services[role].Spec) + generatedServices = append(generatedServices, cluster.ConnectionPooler[role].Service.Spec) + } + if !reflect.DeepEqual(tt.expectedServices, generatedServices) { + t.Errorf("%s %s: expected %#v but got %#v", testName, tt.subTest, tt.expectedServices, generatedServices) + } + } +} + func TestGenerateCapabilities(t *testing.T) { testName := "TestGenerateCapabilities" diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 48a82ee04..5e5c6156e 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -275,7 +275,7 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { return service, nil } -func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error { +func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newService *v1.Service) (*v1.Service, error) { var ( svc *v1.Service err error @@ -283,11 +283,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error c.setProcessName("updating %v service", role) - if c.Services[role] == nil { - return fmt.Errorf("there is no service in the cluster") - } - - serviceName := util.NameFromMeta(c.Services[role].ObjectMeta) + serviceName := util.NameFromMeta(oldService.ObjectMeta) // update the service annotation in order to propagate ELB notation. if len(newService.ObjectMeta.Annotations) > 0 { @@ -301,39 +297,38 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error "") if err != nil { - return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) + return nil, fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) } } else { - return fmt.Errorf("could not form patch for the service metadata: %v", err) + return nil, fmt.Errorf("could not form patch for the service metadata: %v", err) } } // now, patch the service spec, but when disabling LoadBalancers do update instead // patch does not work because of LoadBalancerSourceRanges field (even if set to nil) - oldServiceType := c.Services[role].Spec.Type + oldServiceType := oldService.Spec.Type newServiceType := newService.Spec.Type if newServiceType == "ClusterIP" && newServiceType != oldServiceType { - newService.ResourceVersion = c.Services[role].ResourceVersion - newService.Spec.ClusterIP = c.Services[role].Spec.ClusterIP + newService.ResourceVersion = oldService.ResourceVersion + newService.Spec.ClusterIP = oldService.Spec.ClusterIP svc, err = c.KubeClient.Services(serviceName.Namespace).Update(context.TODO(), newService, metav1.UpdateOptions{}) if err != nil { - return fmt.Errorf("could not update service %q: %v", serviceName, err) + return nil, fmt.Errorf("could not update service %q: %v", serviceName, err) } } else { patchData, err := specPatch(newService.Spec) if err != nil { - return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) + return nil, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) } svc, err = c.KubeClient.Services(serviceName.Namespace).Patch( context.TODO(), serviceName.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "") if err != nil { - return fmt.Errorf("could not patch service %q: %v", serviceName, err) + return nil, fmt.Errorf("could not patch service %q: %v", serviceName, err) } } - c.Services[role] = svc - return nil + return svc, nil } func (c *Cluster) deleteService(role PostgresRole) error { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index a897ff318..4548a5b14 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -174,9 +174,11 @@ func (c *Cluster) syncService(role PostgresRole) error { desiredSvc := c.generateService(role, &c.Spec) if match, reason := k8sutil.SameService(svc, desiredSvc); !match { c.logServiceChanges(role, svc, desiredSvc, false, reason) - if err = c.updateService(role, desiredSvc); err != nil { + updatedSvc, err := c.updateService(role, svc, desiredSvc) + if err != nil { return fmt.Errorf("could not update %s service to match desired state: %v", role, err) } + c.Services[role] = updatedSvc c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) } return nil diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 1463a5609..9ed644ea2 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -57,7 +57,7 @@ func TestInheritedAnnotations(t *testing.T) { ConnectionPoolerDefaultCPULimit: "100m", ConnectionPoolerDefaultMemoryRequest: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi", - NumberOfInstances: int32ToPointer(1), + NumberOfInstances: k8sutil.Int32ToPointer(1), }, PodManagementPolicy: "ordered_ready", Resources: config.Resources{ diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index 543254242..e470fb59d 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -10,6 +10,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -24,10 +25,6 @@ func (c *Controller) readOperatorConfigurationFromCRD(configObjectNamespace, con return config, nil } -func int32ToPointer(value int32) *int32 { - return &value -} - // importConfigurationFromCRD is a transitional function that converts CRD configuration to the one based on the configmap func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigurationData) *config.Config { result := &config.Config{} @@ -141,7 +138,9 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur // load balancer config result.DbHostedZone = util.Coalesce(fromCRD.LoadBalancer.DbHostedZone, "db.example.com") result.EnableMasterLoadBalancer = fromCRD.LoadBalancer.EnableMasterLoadBalancer + result.EnableMasterPoolerLoadBalancer = fromCRD.LoadBalancer.EnableMasterPoolerLoadBalancer result.EnableReplicaLoadBalancer = fromCRD.LoadBalancer.EnableReplicaLoadBalancer + result.EnableReplicaPoolerLoadBalancer = fromCRD.LoadBalancer.EnableReplicaPoolerLoadBalancer result.CustomServiceAnnotations = fromCRD.LoadBalancer.CustomServiceAnnotations result.MasterDNSNameFormat = fromCRD.LoadBalancer.MasterDNSNameFormat result.ReplicaDNSNameFormat = fromCRD.LoadBalancer.ReplicaDNSNameFormat @@ -212,11 +211,11 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur // so ensure default values here. result.ConnectionPooler.NumberOfInstances = util.CoalesceInt32( fromCRD.ConnectionPooler.NumberOfInstances, - int32ToPointer(2)) + k8sutil.Int32ToPointer(2)) result.ConnectionPooler.NumberOfInstances = util.MaxInt32( result.ConnectionPooler.NumberOfInstances, - int32ToPointer(2)) + k8sutil.Int32ToPointer(2)) result.ConnectionPooler.Schema = util.Coalesce( fromCRD.ConnectionPooler.Schema, @@ -257,7 +256,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur result.ConnectionPooler.MaxDBConnections = util.CoalesceInt32( fromCRD.ConnectionPooler.MaxDBConnections, - int32ToPointer(constants.ConnectionPoolerMaxDBConnections)) + k8sutil.Int32ToPointer(constants.ConnectionPoolerMaxDBConnections)) return result } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index f32762e46..fc4b73074 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -190,7 +190,9 @@ type Config struct { EnablePostgresTeamCRD bool `name:"enable_postgres_team_crd" default:"false"` EnablePostgresTeamCRDSuperusers bool `name:"enable_postgres_team_crd_superusers" default:"false"` EnableMasterLoadBalancer bool `name:"enable_master_load_balancer" default:"true"` + EnableMasterPoolerLoadBalancer bool `name:"enable_master_pooler_load_balancer" default:"false"` EnableReplicaLoadBalancer bool `name:"enable_replica_load_balancer" default:"false"` + EnableReplicaPoolerLoadBalancer bool `name:"enable_replica_pooler_load_balancer" default:"false"` CustomServiceAnnotations map[string]string `name:"custom_service_annotations"` CustomPodAnnotations map[string]string `name:"custom_pod_annotations"` EnablePodAntiAffinity bool `name:"enable_pod_antiaffinity" default:"false"`