diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 11a845caa..da866c7af 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -236,10 +236,10 @@ explanation of `ttl` and `loop_wait` parameters. * **synchronous_mode** Patroni `synchronous_mode` parameter value. The default is set to `false`. Optional. - + * **synchronous_mode_strict** Patroni `synchronous_mode_strict` parameter value. Can be used in addition to `synchronous_mode`. The default is set to `false`. Optional. - + ## Postgres container resources Those parameters define [CPU and memory requests and limits](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/) @@ -392,8 +392,10 @@ CPU and memory limits for the sidecar container. Parameters are grouped under the `connectionPooler` top-level key and specify configuration for connection pooler. If this section is not empty, a connection -pooler will be created for a database even if `enableConnectionPooler` is not -present. +pooler will be created for master service only even if `enableConnectionPooler` +is not present. But if this section is present then it defines the configuration +for both master and replica pooler services (if `enableReplicaConnectionPooler` + is enabled). * **numberOfInstances** How many instances of connection pooler to create. diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index dfaf8a01c..db0697b5d 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -55,10 +55,8 @@ type Config struct { // K8S objects that are belongs to a connection pooler type ConnectionPoolerObjects struct { - Deployment *appsv1.Deployment - ReplDeployment *appsv1.Deployment - Service *v1.Service - ReplService *v1.Service + Deployment map[PostgresRole]*appsv1.Deployment + Service map[PostgresRole]*v1.Service // It could happen that a connection pooler was enabled, but the operator // was not able to properly process a corresponding event or was restarted. @@ -348,7 +346,8 @@ func (c *Cluster) Create() error { // // Do not consider connection pooler as a strict requirement, and if // something fails, report warning - if c.needMasterConnectionPooler() || c.needReplicaConnectionPooler() { + roles := c.RolesConnectionPooler() + for _, r := range roles { if c.ConnectionPooler != nil { c.logger.Warning("Connection pooler already exists in the cluster") return nil @@ -359,7 +358,7 @@ func (c *Cluster) Create() error { return nil } c.logger.Infof("connection pooler %q has been successfully created", - util.NameFromMeta(connectionPooler.Deployment.ObjectMeta)) + util.NameFromMeta(connectionPooler.Deployment[r].ObjectMeta)) } return nil diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 44ee4b674..8b7b63e31 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -126,81 +126,40 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo msg = "could not prepare database for connection pooler: %v" return nil, fmt.Errorf(msg, err) } - if c.needMasterConnectionPooler() { - - deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Master) - if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return nil, fmt.Errorf(msg, err) - } - - // client-go does retry 10 times (with NoBackoff by default) when the API - // believe a request can be retried and returns Retry-After header. This - // should be good enough to not think about it here. - deployment, err := c.KubeClient. - Deployments(deploymentSpec.Namespace). - Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) - - if err != nil { - return nil, err - } - - serviceSpec := c.generateConnectionPoolerService(&c.Spec, Master) - service, err := c.KubeClient. - Services(serviceSpec.Namespace). - Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) - - if err != nil { - return nil, err - } - c.ConnectionPooler = &ConnectionPoolerObjects{ - Deployment: deployment, - Service: service, - } - c.logger.Debugf("created new connection pooler %q, uid: %q", - util.NameFromMeta(deployment.ObjectMeta), deployment.UID) - - } - - if c.needReplicaConnectionPooler() { - - repldeploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, Replica) - if err != nil { - msg = "could not generate deployment for connection pooler: %v" - return nil, fmt.Errorf(msg, err) - } - - // client-go does retry 10 times (with NoBackoff by default) when the API - // believe a request can be retried and returns Retry-After header. This - // should be good enough to not think about it here. - repldeployment, err := c.KubeClient. - Deployments(repldeploymentSpec.Namespace). - Create(context.TODO(), repldeploymentSpec, metav1.CreateOptions{}) - - if err != nil { - return nil, err - } - - replServiceSpec := c.generateConnectionPoolerService(&c.Spec, Replica) - replService, err := c.KubeClient. - Services(replServiceSpec.Namespace). - Create(context.TODO(), replServiceSpec, metav1.CreateOptions{}) - - if err != nil { - return nil, err - } - - if c.needMasterConnectionPooler() { - c.ConnectionPooler.ReplDeployment = repldeployment - c.ConnectionPooler.ReplService = replService - } else { - c.ConnectionPooler = &ConnectionPoolerObjects{ - ReplDeployment: repldeployment, - ReplService: replService, + if c.needConnectionPooler() { + roles := c.RolesConnectionPooler() + for _, r := range roles { + deploymentSpec, err := c.generateConnectionPoolerDeployment(&c.Spec, r) + if err != nil { + msg = "could not generate deployment for connection pooler: %v" + return nil, fmt.Errorf(msg, err) } + + // client-go does retry 10 times (with NoBackoff by default) when the API + // believe a request can be retried and returns Retry-After header. This + // should be good enough to not think about it here. + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) + + if err != nil { + return nil, err + } + + serviceSpec := c.generateConnectionPoolerService(&c.Spec, r) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) + + if err != nil { + return nil, err + } + c.ConnectionPooler.Deployment[r] = deployment + c.ConnectionPooler.Service[r] = service + + c.logger.Debugf("created new connection pooler %q, uid: %q", + util.NameFromMeta(deployment.ObjectMeta), deployment.UID) } - c.logger.Debugf("created new connection pooler for replica %q, uid: %q", - util.NameFromMeta(repldeployment.ObjectMeta), repldeployment.UID) } return c.ConnectionPooler, nil @@ -221,12 +180,8 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) { // is somehow empty, try to delete based on what would we generate var deploymentName string var deployment *appsv1.Deployment + deployment = c.ConnectionPooler.Deployment[role] - if role == Master { - deployment = c.ConnectionPooler.Deployment - } else { - deployment = c.ConnectionPooler.ReplDeployment - } if deployment != nil { deploymentName = deployment.Name } @@ -249,11 +204,8 @@ func (c *Cluster) deleteConnectionPooler(role PostgresRole) (err error) { // Repeat the same for the service object var service *v1.Service - if role == Master { - service = c.ConnectionPooler.Service - } else { - service = c.ConnectionPooler.ReplService - } + service = c.ConnectionPooler.Service[role] + serviceName := c.connectionPoolerName(role) if service != nil { @@ -909,9 +861,9 @@ func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { // Perform actual patching of a connection pooler deployment, assuming that all // the check were already done before. -func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { +func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment, role PostgresRole) (*appsv1.Deployment, error) { c.setProcessName("updating connection pooler") - if c.ConnectionPooler == nil || c.ConnectionPooler.Deployment == nil { + if c.ConnectionPooler == nil || c.ConnectionPooler.Deployment[role] == nil { return nil, fmt.Errorf("there is no connection pooler in the cluster") } @@ -924,9 +876,9 @@ func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeploym // worker at one time will try to update it chances of conflicts are // minimal. deployment, err := c.KubeClient. - Deployments(c.ConnectionPooler.Deployment.Namespace).Patch( + Deployments(c.ConnectionPooler.Deployment[role].Namespace).Patch( context.TODO(), - c.ConnectionPooler.Deployment.Name, + c.ConnectionPooler.Deployment[role].Name, types.MergePatchType, patchData, metav1.PatchOptions{}, @@ -935,21 +887,21 @@ func (c *Cluster) updateConnectionPoolerDeployment(oldDeploymentSpec, newDeploym return nil, fmt.Errorf("could not patch deployment: %v", err) } - c.ConnectionPooler.Deployment = deployment + c.ConnectionPooler.Deployment[role] = deployment return deployment, nil } //updateConnectionPoolerAnnotations updates the annotations of connection pooler deployment -func (c *Cluster) updateConnectionPoolerAnnotations(annotations map[string]string) (*appsv1.Deployment, error) { +func (c *Cluster) updateConnectionPoolerAnnotations(annotations map[string]string, role PostgresRole) (*appsv1.Deployment, error) { c.logger.Debugf("updating connection pooler annotations") patchData, err := metaAnnotationsPatch(annotations) if err != nil { return nil, fmt.Errorf("could not form patch for the deployment metadata: %v", err) } - result, err := c.KubeClient.Deployments(c.ConnectionPooler.Deployment.Namespace).Patch( + result, err := c.KubeClient.Deployments(c.ConnectionPooler.Deployment[role].Namespace).Patch( context.TODO(), - c.ConnectionPooler.Deployment.Name, + c.ConnectionPooler.Deployment[role].Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}, diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 32bbaa779..90fd08dac 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -848,7 +848,7 @@ func (c *Cluster) syncConnectionPooler(oldSpec, var newNeedConnectionPooler, oldNeedConnectionPooler bool // Check and perform the sync requirements for each of the roles. - for _, role := range [2]PostgresRole{Master, Replica} { + for _, role := range c.RolesConnectionPooler() { if role == Master { newNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&newSpec.Spec) oldNeedConnectionPooler = c.needMasterConnectionPoolerWorker(&oldSpec.Spec) @@ -858,6 +858,8 @@ func (c *Cluster) syncConnectionPooler(oldSpec, } if c.ConnectionPooler == nil { c.ConnectionPooler = &ConnectionPoolerObjects{} + c.ConnectionPooler.Deployment = make(map[PostgresRole]*appsv1.Deployment) + c.ConnectionPooler.Service = make(map[PostgresRole]*v1.Service) } if newNeedConnectionPooler { @@ -909,28 +911,12 @@ func (c *Cluster) syncConnectionPooler(oldSpec, if !oldNeedConnectionPooler && !newNeedConnectionPooler { // delete and cleanup resources if not empty - if role == Master { - if c.ConnectionPooler != nil && - (c.ConnectionPooler.Deployment != nil || - c.ConnectionPooler.Service != nil) { + if c.ConnectionPooler != nil && + (c.ConnectionPooler.Deployment[role] != nil || + c.ConnectionPooler.Service[role] != nil) { - if err = c.deleteConnectionPooler(role); err != nil { - c.logger.Warningf("could not remove connection pooler: %v", err) - } - - } else if c.ConnectionPooler.ReplDeployment == nil && c.ConnectionPooler.ReplService == nil { - c.ConnectionPooler = nil - } - } else { - if c.ConnectionPooler != nil && - (c.ConnectionPooler.ReplDeployment != nil || - c.ConnectionPooler.ReplService != nil) { - - if err = c.deleteConnectionPooler(role); err != nil { - c.logger.Warningf("could not remove connection pooler: %v", err) - } - } else if c.ConnectionPooler.Deployment == nil && c.ConnectionPooler.Service == nil { - c.ConnectionPooler = nil + if err = c.deleteConnectionPooler(role); err != nil { + c.logger.Warningf("could not remove connection pooler: %v", err) } } } @@ -967,22 +953,13 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return NoSync, err } - - if role == Master { - c.ConnectionPooler.Deployment = deployment - } else { - c.ConnectionPooler.ReplDeployment = deployment - } + c.ConnectionPooler.Deployment[role] = deployment } else if err != nil { msg := "could not get connection pooler deployment to sync: %v" return NoSync, fmt.Errorf(msg, err) } else { - if role == Master { - c.ConnectionPooler.Deployment = deployment - } else { + c.ConnectionPooler.Deployment[role] = deployment - c.ConnectionPooler.ReplDeployment = deployment - } // actual synchronization oldConnectionPooler := oldSpec.Spec.ConnectionPooler newConnectionPooler := newSpec.Spec.ConnectionPooler @@ -1007,34 +984,33 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql reason := append(specReason, defaultsReason...) if specSync || defaultsSync { - + c.logger.Infof("Update connection pooler deployment %s, reason: %+v", + c.connectionPoolerName(role), reason) newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec, role) if err != nil { msg := "could not generate deployment for connection pooler: %v" return reason, fmt.Errorf(msg, err) } - oldDeploymentSpec := c.ConnectionPooler.Deployment + oldDeploymentSpec := c.ConnectionPooler.Deployment[role] deployment, err := c.updateConnectionPoolerDeployment( oldDeploymentSpec, - newDeploymentSpec) + newDeploymentSpec, + role) if err != nil { return reason, err } - if role == Master { - c.ConnectionPooler.Deployment = deployment - } else { - c.ConnectionPooler.ReplDeployment = deployment - } + c.ConnectionPooler.Deployment[role] = deployment + return reason, nil } } - newAnnotations := c.AnnotationsToPropagate(c.ConnectionPooler.Deployment.Annotations) + newAnnotations := c.AnnotationsToPropagate(c.ConnectionPooler.Deployment[role].Annotations) if newAnnotations != nil { - c.updateConnectionPoolerAnnotations(newAnnotations) + c.updateConnectionPoolerAnnotations(newAnnotations, role) } service, err := c.KubeClient. @@ -1053,22 +1029,14 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql if err != nil { return NoSync, err } - if role == Master { - c.ConnectionPooler.Service = service - } else { - c.ConnectionPooler.ReplService = service - } + c.ConnectionPooler.Service[role] = service } else if err != nil { msg := "could not get connection pooler service to sync: %v" return NoSync, fmt.Errorf(msg, err) } else { // Service updates are not supported and probably not that useful anyway - if role == Master { - c.ConnectionPooler.Service = service - } else { - c.ConnectionPooler.ReplService = service - } + c.ConnectionPooler.Service[role] = service } return NoSync, nil diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 332874fe1..da82f36af 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -521,17 +521,36 @@ func (c *Cluster) patroniKubernetesUseConfigMaps() bool { // isConnectionPoolerEnabled func (c *Cluster) needMasterConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { - return (nil != spec.EnableConnectionPooler && *spec.EnableConnectionPooler) || (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) + return (spec.EnableConnectionPooler != nil && *spec.EnableConnectionPooler) || (spec.ConnectionPooler != nil && spec.EnableConnectionPooler == nil) } func (c *Cluster) needReplicaConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { - return nil != spec.EnableReplicaConnectionPooler && *spec.EnableReplicaConnectionPooler + return spec.EnableReplicaConnectionPooler != nil && *spec.EnableReplicaConnectionPooler } func (c *Cluster) needMasterConnectionPooler() bool { return c.needMasterConnectionPoolerWorker(&c.Spec) } +func (c *Cluster) needConnectionPooler() bool { + return c.needMasterConnectionPoolerWorker(&c.Spec) || c.needReplicaConnectionPoolerWorker(&c.Spec) +} + +// RolesConnectionPooler gives the list of roles which need connection pooler +func (c *Cluster) RolesConnectionPooler() []PostgresRole { + roles := []PostgresRole{} + i := 0 + + if c.needMasterConnectionPoolerWorker(&c.Spec) { + roles[i] = Master + i = i + 1 + } + if c.needMasterConnectionPoolerWorker(&c.Spec) { + roles[i] = Replica + } + return roles +} + func (c *Cluster) needReplicaConnectionPooler() bool { return c.needReplicaConnectionPoolerWorker(&c.Spec) }