diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index 6df2de723..afa65ebff 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -185,8 +185,10 @@ spec: # Note: usernames specified here as database owners must be declared in the users key of the spec key. dockerImage: type: string - enableConnectionPooler: + enableMasterConnectionPooler: type: boolean + enableReplicaConnectionPooler: + type: boolean enableLogicalBackup: type: boolean enableMasterLoadBalancer: diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 576031543..1fcc9ff5b 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -140,7 +140,7 @@ These parameters are grouped directly under the `spec` key in the manifest. is `false`, then no volume will be mounted no matter how operator was configured (so you can override the operator configuration). Optional. -* **enableConnectionPooler** +* **enableMasterConnectionPooler** Tells the operator to create a connection pooler with a database. If this field is true, a connection pooler deployment will be created even if `connectionPooler` section is empty. Optional, not set by default. @@ -387,7 +387,7 @@ CPU and memory limits for the sidecar container. Parameters are grouped under the `connectionPooler` top-level key and specify configuration for connection pooler. If this section is not empty, a connection -pooler will be created for a database even if `enableConnectionPooler` is not +pooler will be created for a database even if `enableMasterConnectionPooler` is not present. * **numberOfInstances** diff --git a/docs/user.md b/docs/user.md index a4b1424b8..a63017655 100644 --- a/docs/user.md +++ b/docs/user.md @@ -736,7 +736,7 @@ manifest: ```yaml spec: - enableConnectionPooler: true + enableMasterConnectionPooler: true ``` This will tell the operator to create a connection pooler with default @@ -772,7 +772,7 @@ spec: memory: 100Mi ``` -The `enableConnectionPooler` flag is not required when the `connectionPooler` +The `enableMasterConnectionPooler` flag is not required when the `connectionPooler` section is present in the manifest. But, it can be used to disable/remove the pooler while keeping its configuration. diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 49e7da10d..7b85b9dc6 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -99,7 +99,7 @@ class EndToEndTestCase(unittest.TestCase): 'postgresqls', 'acid-minimal-cluster', { 'spec': { - 'enableConnectionPooler': True, + 'enableMasterConnectionPooler': True, } }) k8s.wait_for_pod_start(pod_selector) @@ -141,7 +141,7 @@ class EndToEndTestCase(unittest.TestCase): 'postgresqls', 'acid-minimal-cluster', { 'spec': { - 'enableConnectionPooler': False, + 'enableMasterConnectionPooler': False, } }) k8s.wait_for_pods_to_stop(pod_selector) diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 24c56f565..62114d018 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -18,8 +18,8 @@ spec: - createdb enableMasterLoadBalancer: false enableReplicaLoadBalancer: false - enableConnectionPooler: true # not needed when connectionPooler section is present (see below) - #enableReplicaConnectionPooler: true # set to enable connectionPooler for replica endpoints + enableMasterConnectionPooler: true # not needed when connectionPooler section is present (see below) + enableReplicaConnectionPooler: true # set to enable connectionPooler for replica endpoints allowedSourceRanges: # load balancers' source ranges for both master and replica services - 127.0.0.1/32 databases: diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 1d42e7254..f08c20e0c 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -181,8 +181,10 @@ spec: # Note: usernames specified here as database owners must be declared in the users key of the spec key. dockerImage: type: string - enableConnectionPooler: + enableMasterConnectionPooler: type: boolean + enableReplicaConnectionPooler: + type: boolean enableLogicalBackup: type: boolean enableMasterLoadBalancer: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index 87da0bda3..cce47d4cd 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -259,7 +259,7 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ "dockerImage": { Type: "string", }, - "enableConnectionPooler": { + "enableMasterConnectionPooler": { Type: "boolean", }, "enableReplicaConnectionPooler": { diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 867157d26..5a9e45a52 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -29,7 +29,7 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` - EnableConnectionPooler *bool `json:"enableConnectionPooler,omitempty"` + EnableMasterConnectionPooler *bool `json:"enableMasterConnectionPooler,omitempty"` EnableReplicaConnectionPooler *bool `json:"enableReplicaConnectionPooler,omitempty"` ConnectionPooler *ConnectionPooler `json:"connectionPooler,omitempty"` diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index ee305c1b0..83d5232c2 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -55,9 +55,10 @@ type Config struct { // K8S objects that are belongs to a connection pooler type ConnectionPoolerObjects struct { - Deployment *appsv1.Deployment - Service *v1.Service - ReplService *v1.Service + Deployment *appsv1.Deployment + ReplDeployment *appsv1.Deployment + Service *v1.Service + ReplService *v1.Service // It could happen that a connection pooler was enabled, but the operator // was not able to properly process a corresponding event or was restarted. diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 1f6510e65..f7c186d0c 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -719,7 +719,7 @@ func TestInitSystemUsers(t *testing.T) { } // cluster with connection pooler - cl.Spec.EnableConnectionPooler = boolToPointer(true) + cl.Spec.EnableMasterConnectionPooler = boolToPointer(true) cl.initSystemUsers() if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; !exist { t.Errorf("%s, connection pooler user is not present", testName) diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 75e2d2097..1a38bd41d 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -462,7 +462,7 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi } // Creates a connection pool credentials lookup function in every database to -// perform remote authentification. +// perform remote authentication. func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { var stmtBytes bytes.Buffer c.logger.Info("Installing lookup function") diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index d13d1889f..f18c0802a 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -75,8 +75,12 @@ func (c *Cluster) statefulSetName() string { return c.Name } -func (c *Cluster) connectionPoolerName() string { - return c.Name + "-pooler" +func (c *Cluster) connectionPoolerName(role PostgresRole) string { + name := c.Name + "-pooler" + if role == Replica { + name = name + "-repl" + } + return name } func (c *Cluster) endpointName(role PostgresRole) string { @@ -2085,7 +2089,7 @@ func (c *Cluster) getConnectionPoolerEnvVars(spec *acidv1.PostgresSpec) []v1.Env return []v1.EnvVar{ { - Name: "CONNECTION_POOLER_MASTER_PORT", + Name: "CONNECTION_POOLER_PORT", Value: fmt.Sprint(pgPort), }, { @@ -2115,7 +2119,7 @@ func (c *Cluster) getConnectionPoolerEnvVars(spec *acidv1.PostgresSpec) []v1.Env } } -func (c *Cluster) generateConnectionPoolerPodTemplate(spec *acidv1.PostgresSpec) ( +func (c *Cluster) generateConnectionPoolerPodTemplate(spec *acidv1.PostgresSpec, role PostgresRole) ( *v1.PodTemplateSpec, error) { gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) @@ -2151,11 +2155,11 @@ func (c *Cluster) generateConnectionPoolerPodTemplate(spec *acidv1.PostgresSpec) envVars := []v1.EnvVar{ { Name: "PGHOST", - Value: c.serviceAddress(Master), + Value: c.serviceAddress(role), }, { Name: "PGPORT", - Value: c.servicePort(Master), + Value: c.servicePort(role), }, { Name: "PGUSER", @@ -2237,7 +2241,7 @@ func (c *Cluster) ownerReferences() []metav1.OwnerReference { } } -func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec) ( +func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec, role PostgresRole) ( *appsv1.Deployment, error) { // there are two ways to enable connection pooler, either to specify a @@ -2250,7 +2254,7 @@ func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec) spec.ConnectionPooler = &acidv1.ConnectionPooler{} } - podTemplate, err := c.generateConnectionPoolerPodTemplate(spec) + podTemplate, err := c.generateConnectionPoolerPodTemplate(spec, role) numberOfInstances := spec.ConnectionPooler.NumberOfInstances if numberOfInstances == nil { numberOfInstances = util.CoalesceInt32( @@ -2269,9 +2273,12 @@ func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec) return nil, err } + var name string + name = c.connectionPoolerName(role) + deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: c.connectionPoolerName(), + Name: name, Namespace: c.Namespace, Labels: c.connectionPoolerLabelsSelector().MatchLabels, Annotations: map[string]string{}, @@ -2293,7 +2300,53 @@ func (c *Cluster) generateConnectionPoolerDeployment(spec *acidv1.PostgresSpec) return deployment, nil } -func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1.Service { +/* func (c *Cluster) generateReplicaConnectionPoolerDeployment(spec *acidv1.PostgresSpec) ( + *appsv1.Deployment, error) { + + podTemplate, err := c.generateConnectionPoolerPodTemplate(spec, Replica) + numberOfInstances := spec.ConnectionPooler.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPooler.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + if *numberOfInstances < constants.ConnectionPoolerMinInstances { + msg := "Adjusted number of connection pooler instances from %d to %d" + c.logger.Warningf(msg, numberOfInstances, constants.ConnectionPoolerMinInstances) + + *numberOfInstances = constants.ConnectionPoolerMinInstances + } + + if err != nil { + return nil, err + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connectionPoolerName() + "-repl", + Namespace: c.Namespace, + Labels: c.connectionPoolerLabelsSelector().MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this deployment, but there is a hope that this object + // will be garbage collected if something went wrong and operator + // didn't deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: numberOfInstances, + Selector: c.connectionPoolerLabelsSelector(), + Template: *podTemplate, + }, + } + + return deployment, nil +} */ + +func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec, role PostgresRole) *v1.Service { // there are two ways to enable connection pooler, either to specify a // connectionPooler section or enableConnectionPooler. In the second case @@ -2304,24 +2357,26 @@ func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1 if spec.ConnectionPooler == nil { spec.ConnectionPooler = &acidv1.ConnectionPooler{} } + name := c.connectionPoolerName(role) serviceSpec := v1.ServiceSpec{ Ports: []v1.ServicePort{ { - Name: c.connectionPoolerName(), + Name: name, Port: pgPort, - TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, + TargetPort: intstr.IntOrString{StrVal: c.servicePort(role)}, }, }, Type: v1.ServiceTypeClusterIP, - Selector: map[string]string{ - "connection-pooler-repl": c.connectionPoolerName(), - }, } - + if role == Replica { + serviceSpec.Selector = c.roleLabelsSet(false, Replica) + } else { + serviceSpec.Selector = map[string]string{"connection-pooler": name} + } service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: c.connectionPoolerName(), + Name: name, Namespace: c.Namespace, Labels: c.connectionPoolerLabelsSelector().MatchLabels, Annotations: map[string]string{}, @@ -2339,7 +2394,7 @@ func (c *Cluster) generateConnectionPoolerService(spec *acidv1.PostgresSpec) *v1 return service } -func (c *Cluster) generateReplicaConnectionPoolerService(spec *acidv1.PostgresSpec) *v1.Service { +/* func (c *Cluster) generateReplicaConnectionPoolerService(spec *acidv1.PostgresSpec) *v1.Service { replicaserviceSpec := v1.ServiceSpec{ Ports: []v1.ServicePort{ @@ -2349,10 +2404,8 @@ func (c *Cluster) generateReplicaConnectionPoolerService(spec *acidv1.PostgresSp TargetPort: intstr.IntOrString{StrVal: c.servicePort(Replica)}, }, }, - Type: v1.ServiceTypeClusterIP, - Selector: map[string]string{ - "connection-pooler-repl": c.connectionPoolerName() + "-repl", - }, + Type: v1.ServiceTypeClusterIP, + Selector: c.roleLabelsSet(false, Replica), } service := &v1.Service{ @@ -2373,7 +2426,7 @@ func (c *Cluster) generateReplicaConnectionPoolerService(spec *acidv1.PostgresSp } return service -} +} */ func ensurePath(file string, defaultDir string, defaultFile string) string { if file == "" { diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 1e474fbf5..58020a138 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -1090,7 +1090,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { }, } for _, tt := range tests { - podSpec, err := tt.cluster.generateConnectionPoolerPodTemplate(tt.spec) + podSpec, err := tt.cluster.generateConnectionPoolerPodTemplate(tt.spec, Master) if err != tt.expected && err.Error() != tt.expected.Error() { t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", @@ -1192,7 +1192,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) { }, } for _, tt := range tests { - deployment, err := tt.cluster.generateConnectionPoolerDeployment(tt.spec) + deployment, err := tt.cluster.generateConnectionPoolerDeployment(tt.spec, Master) if err != tt.expected && err.Error() != tt.expected.Error() { t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", @@ -1221,9 +1221,9 @@ func testServiceOwnwerReference(cluster *Cluster, service *v1.Service) error { func testServiceSelector(cluster *Cluster, service *v1.Service) error { selector := service.Spec.Selector - if selector["connection-pooler"] != cluster.connectionPoolerName() { + if selector["connection-pooler"] != cluster.connectionPoolerName(Master) { return fmt.Errorf("Selector is incorrect, got %s, expected %s", - selector["connection-pooler"], cluster.connectionPoolerName()) + selector["connection-pooler"], cluster.connectionPoolerName(Master)) } return nil @@ -1289,7 +1289,7 @@ func TestConnectionPoolerServiceSpec(t *testing.T) { }, } for _, tt := range tests { - service := tt.cluster.generateConnectionPoolerService(tt.spec) + service := tt.cluster.generateConnectionPoolerService(tt.spec, Master) if err := tt.check(cluster, service); err != nil { t.Errorf("%s [%s]: Service spec is incorrect, %+v", diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 160e42548..84c0b14fe 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -126,56 +126,74 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo msg = "could not prepare database for connection pooler: %v" return nil, fmt.Errorf(msg, err) } + if c.Spec.EnableMasterConnectionPooler != nil || c.ConnectionPooler != nil { + deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Master) + if err != nil { + msg = "could not generate deployment for connection pooler: %v" + return nil, fmt.Errorf(msg, err) + } - deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec) - if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return nil, fmt.Errorf(msg, err) - } + // client-go does retry 10 times (with NoBackoff by default) when the API + // believe a request can be retried and returns Retry-After header. This + // should be good enough to not think about it here. + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) - // client-go does retry 10 times (with NoBackoff by default) when the API - // believe a request can be retried and returns Retry-After header. This - // should be good enough to not think about it here. - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) + if err != nil { + return nil, err + } - if err != nil { - return nil, err - } - - serviceSpec := c.generateConnectionPoolerService(&c.Spec) - service, err := c.KubeClient. - Services(serviceSpec.Namespace). - Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) - - if err != nil { - return nil, err - } - if c.Spec.EnableReplicaConnectionPooler != nil && *c.Spec.EnableReplicaConnectionPooler == true { - replServiceSpec := c.generateReplicaConnectionPoolerService(&c.Spec) - replService, err := c.KubeClient. + serviceSpec := c.generateConnectionPoolerService(&c.Spec, Master) + service, err := c.KubeClient. Services(serviceSpec.Namespace). + Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) + + if err != nil { + return nil, err + } + c.ConnectionPooler = &ConnectionPoolerObjects{ + Deployment: deployment, + Service: service, + } + c.logger.Debugf("created new connection pooler %q, uid: %q", + util.NameFromMeta(deployment.ObjectMeta), deployment.UID) + } + + if c.Spec.EnableReplicaConnectionPooler != nil && *c.Spec.EnableReplicaConnectionPooler == true { + repldeploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Replica) + if err != nil { + msg = "could not generate deployment for connection pooler: %v" + return nil, fmt.Errorf(msg, err) + } + + // client-go does retry 10 times (with NoBackoff by default) when the API + // believe a request can be retried and returns Retry-After header. This + // should be good enough to not think about it here. + repldeployment, err := c.KubeClient. + Deployments(repldeploymentSpec.Namespace). + Create(context.TODO(), repldeploymentSpec, metav1.CreateOptions{}) + + if err != nil { + return nil, err + } + + replServiceSpec := c.generateConnectionPoolerService(&c.Spec, Replica) + replService, err := c.KubeClient. + Services(replServiceSpec.Namespace). Create(context.TODO(), replServiceSpec, metav1.CreateOptions{}) if err != nil { return nil, err } c.ConnectionPooler = &ConnectionPoolerObjects{ - Deployment: deployment, - Service: service, - ReplService: replService, - } - } else { - c.ConnectionPooler = &ConnectionPoolerObjects{ - Deployment: deployment, - Service: service, + ReplDeployment: repldeployment, + ReplService: replService, } + c.logger.Debugf("created new connection pooler for replica %q, uid: %q", + util.NameFromMeta(repldeployment.ObjectMeta), repldeployment.UID) } - c.logger.Debugf("created new connection pooler %q, uid: %q", - util.NameFromMeta(deployment.ObjectMeta), deployment.UID) - return c.ConnectionPooler, nil } @@ -192,7 +210,7 @@ func (c *Cluster) deleteConnectionPooler() (err error) { // Clean up the deployment object. If deployment resource we've remembered // is somehow empty, try to delete based on what would we generate - deploymentName := c.connectionPoolerName() + deploymentName := c.connectionPoolerName(Master) deployment := c.ConnectionPooler.Deployment if deployment != nil { @@ -217,7 +235,7 @@ func (c *Cluster) deleteConnectionPooler() (err error) { // Repeat the same for the service object service := c.ConnectionPooler.Service - serviceName := c.connectionPoolerName() + serviceName := c.connectionPoolerName(Master) if service != nil { serviceName = service.Name diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go index 9739cc354..ca7683160 100644 --- a/pkg/cluster/resources_test.go +++ b/pkg/cluster/resources_test.go @@ -97,7 +97,7 @@ func TestNeedConnectionPooler(t *testing.T) { } cluster.Spec = acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), + EnableMasterConnectionPooler: boolToPointer(true), } if !cluster.needConnectionPooler() { @@ -106,8 +106,8 @@ func TestNeedConnectionPooler(t *testing.T) { } cluster.Spec = acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(false), - ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableMasterConnectionPooler: boolToPointer(false), + ConnectionPooler: &acidv1.ConnectionPooler{}, } if cluster.needConnectionPooler() { @@ -116,8 +116,8 @@ func TestNeedConnectionPooler(t *testing.T) { } cluster.Spec = acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableMasterConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, } if !cluster.needConnectionPooler() { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 056e43043..d53df68a2 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -922,34 +922,18 @@ func (c *Cluster) syncConnectionPooler(oldSpec, func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) ( SyncReason, error) { - deployment, err := c.KubeClient. - Deployments(c.Namespace). - Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{}) - - if err != nil && k8sutil.ResourceNotFound(err) { - msg := "Deployment %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName()) - - deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec) - if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return NoSync, fmt.Errorf(msg, err) - } - - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) - - if err != nil { - return NoSync, err - } - - c.ConnectionPooler.Deployment = deployment - } else if err != nil { + masterdeployment, err := c.checkAndCreateConnectionPoolerDeployment(Master, newSpec) + if err != nil { + msg := "could not get connection pooler deployment to sync: %v" + return NoSync, fmt.Errorf(msg, err) + } + replicadeployment, err := c.checkAndCreateConnectionPoolerDeployment(Replica, newSpec) + if err != nil { msg := "could not get connection pooler deployment to sync: %v" return NoSync, fmt.Errorf(msg, err) } else { - c.ConnectionPooler.Deployment = deployment + c.ConnectionPooler.Deployment = masterdeployment + c.ConnectionPooler.ReplDeployment = replicadeployment // actual synchronization oldConnectionPooler := oldSpec.Spec.ConnectionPooler @@ -968,32 +952,28 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql newConnectionPooler = &acidv1.ConnectionPooler{} } - c.logger.Infof("Old: %+v, New %+v", oldConnectionPooler, newConnectionPooler) + c.logger.Infof("Old: %+v, New: %+v", oldConnectionPooler, newConnectionPooler) specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) - defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment) + defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, masterdeployment) reason := append(specReason, defaultsReason...) if specSync || defaultsSync { - c.logger.Infof("Update connection pooler deployment %s, reason: %+v", - c.connectionPoolerName(), reason) - - newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec) - if err != nil { - msg := "could not generate deployment for connection pooler: %v" - return reason, fmt.Errorf(msg, err) - } - - oldDeploymentSpec := c.ConnectionPooler.Deployment - - deployment, err := c.updateConnectionPoolerDeployment( - oldDeploymentSpec, - newDeploymentSpec) - + newmasterdeployment, err := c.UpdateConnectionPoolerDeploymentSub(Master, reason[:], newSpec) if err != nil { return reason, err } + c.ConnectionPooler.Deployment = newmasterdeployment + return reason, nil + } - c.ConnectionPooler.Deployment = deployment + defaultsSync, defaultsReason = c.needSyncConnectionPoolerDefaults(newConnectionPooler, replicadeployment) + reason = append(specReason, defaultsReason...) + if specSync || defaultsSync { + newreplicadeployment, err := c.UpdateConnectionPoolerDeploymentSub(Replica, reason[:], newSpec) + if err != nil { + return reason, err + } + c.ConnectionPooler.Deployment = newreplicadeployment return reason, nil } } @@ -1002,32 +982,95 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if newAnnotations != nil { c.updateConnectionPoolerAnnotations(newAnnotations) } + masterservice, err := c.checkAndCreateConnectionPoolerService(Master, newSpec) + if err != nil { + msg := "could not get connection pooler service to sync: %v" + return NoSync, fmt.Errorf(msg, err) + } + replicaservice, err := c.checkAndCreateConnectionPoolerService(Replica, newSpec) + if err != nil { + msg := "could not get connection pooler service to sync: %v" + return NoSync, fmt.Errorf(msg, err) + } else { + // Service updates are not supported and probably not that useful anyway + c.ConnectionPooler.Service = masterservice + c.ConnectionPooler.ReplService = replicaservice + } + + return NoSync, nil +} + +func (c *Cluster) UpdateConnectionPoolerDeploymentSub(role PostgresRole, reason []string, newSpec *acidv1.Postgresql) (*appsv1.Deployment, error) { + + c.logger.Infof("Update connection pooler deployment %s, reason: %+v", + c.connectionPoolerName(role), reason) + + newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role) + if err != nil { + msg := "could not generate deployment for connection pooler: %v" + return nil, fmt.Errorf(msg, err) + } + + oldDeploymentSpec := c.ConnectionPooler.Deployment + + deployment, err := c.updateConnectionPoolerDeployment( + oldDeploymentSpec, + newDeploymentSpec) + + if err != nil { + return nil, err + } + return deployment, nil +} + +func (c *Cluster) checkAndCreateConnectionPoolerDeployment(role PostgresRole, newSpec *acidv1.Postgresql) (*appsv1.Deployment, error) { + + deployment, err := c.KubeClient. + Deployments(c.Namespace). + Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Deployment %s for connection pooler synchronization is not found, create it" + c.logger.Warningf(msg, c.connectionPoolerName(role)) + + deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role) + if err != nil { + msg = "could not generate deployment for connection pooler: %v" + return nil, fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) + + if err != nil { + return nil, err + } + return deployment, nil + } + return deployment, nil +} + +func (c *Cluster) checkAndCreateConnectionPoolerService(role PostgresRole, newSpec *acidv1.Postgresql) (*v1.Service, error) { service, err := c.KubeClient. Services(c.Namespace). - Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{}) + Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) if err != nil && k8sutil.ResourceNotFound(err) { msg := "Service %s for connection pooler synchronization is not found, create it" - c.logger.Warningf(msg, c.connectionPoolerName()) + c.logger.Warningf(msg, c.connectionPoolerName(role)) - serviceSpec := c.generateConnectionPoolerService(&newSpec.Spec) + serviceSpec := c.generateConnectionPoolerService(&newSpec.Spec, role) service, err := c.KubeClient. Services(serviceSpec.Namespace). Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { - return NoSync, err + return nil, err } - c.ConnectionPooler.Service = service - } else if err != nil { - msg := "could not get connection pooler service to sync: %v" - return NoSync, fmt.Errorf(msg, err) - } else { - // Service updates are not supported and probably not that useful anyway - c.ConnectionPooler.Service = service + return service, nil } - - return NoSync, nil + return service, nil } diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index d9248ae33..eaffbf475 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -139,7 +139,7 @@ func TestConnectionPoolerSynchronization(t *testing.T) { }, newSpec: &acidv1.Postgresql{ Spec: acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), + EnableMasterConnectionPooler: boolToPointer(true), }, }, cluster: clusterMissingObjects, @@ -232,14 +232,14 @@ func TestConnectionPoolerSynchronization(t *testing.T) { subTest: "there is no sync from nil to an empty spec", oldSpec: &acidv1.Postgresql{ Spec: acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - ConnectionPooler: nil, + EnableMasterConnectionPooler: boolToPointer(true), + ConnectionPooler: nil, }, }, newSpec: &acidv1.Postgresql{ Spec: acidv1.PostgresSpec{ - EnableConnectionPooler: boolToPointer(true), - ConnectionPooler: &acidv1.ConnectionPooler{}, + EnableMasterConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, }, }, cluster: clusterMock, diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 7559ce3d4..578eaf40e 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -424,7 +424,7 @@ func (c *Cluster) connectionPoolerLabelsSelector() *metav1.LabelSelector { connectionPoolerLabels := labels.Set(map[string]string{}) extraLabels := labels.Set(map[string]string{ - "connection-pooler": c.connectionPoolerName(), + "connection-pooler": c.connectionPoolerName(Master), "application": "db-connection-pooler", }) @@ -520,11 +520,16 @@ func (c *Cluster) patroniKubernetesUseConfigMaps() bool { } func (c *Cluster) needConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { - if spec.EnableConnectionPooler == nil { + if spec.EnableMasterConnectionPooler != nil { + return *spec.EnableMasterConnectionPooler + } else if spec.EnableReplicaConnectionPooler != nil { + return *spec.EnableReplicaConnectionPooler + } else if spec.ConnectionPooler == nil { return spec.ConnectionPooler != nil - } else { - return *spec.EnableConnectionPooler } + // if the connectionPooler section is there, then we enable even though the + // flags are not there + return true } func (c *Cluster) needConnectionPooler() bool { diff --git a/ui/operator_ui/main.py b/ui/operator_ui/main.py index dc2450b9f..634c9f8d4 100644 --- a/ui/operator_ui/main.py +++ b/ui/operator_ui/main.py @@ -607,16 +607,27 @@ def update_postgresql(namespace: str, cluster: str): spec['volume'] = {'size': size} - if 'enableConnectionPooler' in postgresql['spec']: - cp = postgresql['spec']['enableConnectionPooler'] + if 'enableMasterConnectionPooler' in postgresql['spec']: + cp = postgresql['spec']['enableMasterConnectionPooler'] if not cp: - if 'enableConnectionPooler' in o['spec']: - del o['spec']['enableConnectionPooler'] + if 'enableMasterConnectionPooler' in o['spec']: + del o['spec']['enableMasterConnectionPooler'] else: - spec['enableConnectionPooler'] = True + spec['enableMasterConnectionPooler'] = True else: - if 'enableConnectionPooler' in o['spec']: - del o['spec']['enableConnectionPooler'] + if 'enableMasterConnectionPooler' in o['spec']: + del o['spec']['enableMasterConnectionPooler'] + + if 'enableReplicaConnectionPooler' in postgresql['spec']: + cp = postgresql['spec']['enableReplicaConnectionPooler'] + if not cp: + if 'enableReplicaConnectionPooler' in o['spec']: + del o['spec']['enableReplicaConnectionPooler'] + else: + spec['enableReplicaConnectionPooler'] = True + else: + if 'enableReplicaConnectionPooler' in o['spec']: + del o['spec']['enableReplicaConnectionPooler'] if 'enableReplicaLoadBalancer' in postgresql['spec']: rlb = postgresql['spec']['enableReplicaLoadBalancer']