From 6c3752068bc7d5c13e5127b2b474584761eadffb Mon Sep 17 00:00:00 2001 From: Dmitrii Dolgov <9erthalion6@gmail.com> Date: Wed, 12 Feb 2020 17:28:48 +0100 Subject: [PATCH] Various improvements Add synchronization logic. For now get rid of podTemplate, type fields. Add crd validation & configuration part, put retry on top of lookup function installation. --- .../templates/clusterrole.yaml | 1 + manifests/operatorconfiguration.crd.yaml | 37 ++++ pkg/apis/acid.zalan.do/v1/postgresql_type.go | 18 +- pkg/cluster/cluster.go | 44 ++++- pkg/cluster/database.go | 22 ++- pkg/cluster/k8sres.go | 175 +++++++++--------- pkg/cluster/k8sres_test.go | 15 -- pkg/cluster/resources.go | 75 +++++++- pkg/cluster/sync.go | 111 +++++++++++ pkg/cluster/sync_test.go | 125 +++++++++++++ pkg/cluster/util.go | 2 +- pkg/controller/operator_config.go | 57 +++++- pkg/util/constants/pooler.go | 13 ++ pkg/util/k8sutil/k8sutil.go | 93 ++++++++++ 14 files changed, 653 insertions(+), 135 deletions(-) create mode 100644 pkg/cluster/sync_test.go create mode 100644 pkg/util/constants/pooler.go diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index f8550a539..316f7de15 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -106,6 +106,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index 7bd5c529c..c44955771 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -294,6 +294,43 @@ spec: pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' scalyr_server_url: type: string + connection_pool: + type: object + properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_instances_number: + type: integer + #default: 1 + connection_pool_image: + type: string + #default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1" + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_default_cpu_limit: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_cpu_request: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_memory_limit: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100m" + connection_pool_default_memory_request: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100m" status: type: object additionalProperties: diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index e4d56c6e8..c56d70626 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -27,7 +27,8 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` - ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` + EnableConnectionPool bool `json:"enable_connection_pool,omitempty"` + ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` TeamID string `json:"teamId"` DockerImage string `json:"dockerImage,omitempty"` @@ -159,12 +160,15 @@ type PostgresStatus struct { // Options for connection pooler type ConnectionPool struct { - NumberOfInstances *int32 `json:"instancesNumber,omitempty"` - Schema *string `json:"schema,omitempty"` - User *string `json:"user,omitempty"` - Type *string `json:"type,omitempty"` - Mode *string `json:"mode,omitempty"` - PodTemplate *v1.PodTemplateSpec `json:"podTemplate,omitempty"` + NumberOfInstances *int32 `json:"instancesNumber,omitempty"` + Schema string `json:"schema,omitempty"` + User string `json:"user,omitempty"` + Mode string `json:"mode,omitempty"` + DockerImage string `json:"dockerImage,omitempty"` + // TODO: prepared snippets of configuration, one can choose via type, e.g. + // pgbouncer-large (with higher resources) or odyssey-small (with smaller + // resources) + // Type string `json:"type,omitempty"` Resources `json:"resources,omitempty"` } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index afcb0df82..8e65b12ea 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/r3labs/diff" "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -723,6 +724,17 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } + // connection pool + if !reflect.DeepEqual(oldSpec.Spec.ConnectionPool, + newSpec.Spec.ConnectionPool) { + c.logger.Debug("syncing connection pool") + + if err := c.syncConnectionPool(oldSpec, newSpec); err != nil { + c.logger.Errorf("could not sync connection pool: %v", err) + updateFailed = true + } + } + return nil } @@ -852,13 +864,13 @@ func (c *Cluster) initSystemUsers() { if c.needConnectionPool() { username := c.Spec.ConnectionPool.User - if username == nil { - username = &c.OpConfig.ConnectionPool.User + if username == "" { + username = c.OpConfig.ConnectionPool.User } c.systemUsers[constants.ConnectionPoolUserKeyName] = spec.PgUser{ Origin: spec.RoleConnectionPool, - Name: *username, + Name: username, Password: util.RandomPassword(constants.PasswordLength), } } @@ -1188,3 +1200,29 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error { return c.deleteClusterObject(get, deleteConfigMapFn, "configmap") } + +// Test if two connection pool configuration needs to be synced. For simplicity +// compare not the actual K8S objects, but the configuration itself and request +// sync if there is any difference. +func (c *Cluster) needSyncConnPoolDeployments(oldSpec, newSpec *acidv1.ConnectionPool) (sync bool, reasons []string) { + reasons = []string{} + sync = false + + changelog, err := diff.Diff(oldSpec, newSpec) + if err != nil { + c.logger.Infof("Cannot get diff, do not do anything, %+v", err) + return false, reasons + } else { + if len(changelog) > 0 { + sync = true + } + + for _, change := range changelog { + msg := fmt.Sprintf("%s %+v from %s to %s", + change.Type, change.Path, change.From, change.To) + reasons = append(reasons, msg) + } + } + + return sync, reasons +} diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 1b74bd6b6..0c1e07a11 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -300,8 +300,26 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error { params, err) } - if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { - return fmt.Errorf("could not execute sql statement %s: %v", + // golang sql will do retries couple of times if pq driver reports + // connections issues (driver.ErrBadConn), but since our query is + // idempotent, we can retry in a view of other errors (e.g. due to + // failover a db is temporary in a read-only mode or so) to make sure + // it was applied. + execErr := retryutil.Retry( + constants.PostgresConnectTimeout, + constants.PostgresConnectRetryTimeout, + func() (bool, error) { + if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { + msg := fmt.Errorf("could not execute sql statement %s: %v", + stmtBytes.String(), err) + return false, msg + } + + return true, nil + }) + + if execErr != nil { + return fmt.Errorf("could not execute after retries %s: %v", stmtBytes.String(), err) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 8b7860886..3121691d9 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1734,100 +1734,99 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) { func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( *v1.PodTemplateSpec, error) { - podTemplate := spec.ConnectionPool.PodTemplate + gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) + resources, err := generateResourceRequirements( + spec.ConnectionPool.Resources, + c.makeDefaultConnPoolResources()) - if podTemplate == nil { - gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) - resources, err := generateResourceRequirements( - spec.ConnectionPool.Resources, - c.makeDefaultConnPoolResources()) + effectiveMode := util.Coalesce( + spec.ConnectionPool.Mode, + c.OpConfig.ConnectionPool.Mode) - effectiveMode := spec.ConnectionPool.Mode - if effectiveMode == nil { - effectiveMode = &c.OpConfig.ConnectionPool.Mode + effectiveDockerImage := util.Coalesce( + spec.ConnectionPool.DockerImage, + c.OpConfig.ConnectionPool.Image) + + if err != nil { + return nil, fmt.Errorf("could not generate resource requirements: %v", err) + } + + secretSelector := func(key string) *v1.SecretKeySelector { + return &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: c.credentialSecretName(c.OpConfig.SuperUsername), + }, + Key: key, } + } - if err != nil { - return nil, fmt.Errorf("could not generate resource requirements: %v", err) - } + envVars := []v1.EnvVar{ + { + Name: "PGHOST", + Value: c.serviceAddress(Master), + }, + { + Name: "PGPORT", + Value: c.servicePort(Master), + }, + { + Name: "PGUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + // the convention is to use the same schema name as + // connection pool username + { + Name: "PGSCHEMA", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + { + Name: "PGPASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("password"), + }, + }, + { + Name: "CONNECTION_POOL_MODE", + Value: effectiveMode, + }, + { + Name: "CONNECTION_POOL_PORT", + Value: fmt.Sprint(pgPort), + }, + } - secretSelector := func(key string) *v1.SecretKeySelector { - return &v1.SecretKeySelector{ - LocalObjectReference: v1.LocalObjectReference{ - Name: c.credentialSecretName(c.OpConfig.SuperUsername), - }, - Key: key, - } - } + poolerContainer := v1.Container{ + Name: connectionPoolContainer, + Image: effectiveDockerImage, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: *resources, + Ports: []v1.ContainerPort{ + { + ContainerPort: pgPort, + Protocol: v1.ProtocolTCP, + }, + }, + Env: envVars, + } - envVars := []v1.EnvVar{ - { - Name: "PGHOST", - Value: c.serviceAddress(Master), - }, - { - Name: "PGPORT", - Value: c.servicePort(Master), - }, - { - Name: "PGUSER", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: secretSelector("username"), - }, - }, - // the convention is to use the same schema name as - // connection pool username - { - Name: "PGSCHEMA", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: secretSelector("username"), - }, - }, - { - Name: "PGPASSWORD", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: secretSelector("password"), - }, - }, - { - Name: "CONNECTION_POOL_MODE", - Value: *effectiveMode, - }, - { - Name: "CONNECTION_POOL_PORT", - Value: fmt.Sprint(pgPort), - }, - } - - poolerContainer := v1.Container{ - Name: connectionPoolContainer, - Image: c.OpConfig.ConnectionPool.Image, - ImagePullPolicy: v1.PullIfNotPresent, - Resources: *resources, - Ports: []v1.ContainerPort{ - { - ContainerPort: pgPort, - Protocol: v1.ProtocolTCP, - }, - }, - Env: envVars, - } - - podTemplate = &v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: c.connPoolLabelsSelector().MatchLabels, - Namespace: c.Namespace, - Annotations: c.generatePodAnnotations(spec), - }, - Spec: v1.PodSpec{ - ServiceAccountName: c.OpConfig.PodServiceAccountName, - TerminationGracePeriodSeconds: &gracePeriod, - Containers: []v1.Container{poolerContainer}, - // TODO: add tolerations to scheduler pooler on the same node - // as database - //Tolerations: *tolerationsSpec, - }, - } + podTemplate := &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: c.connPoolLabelsSelector().MatchLabels, + Namespace: c.Namespace, + Annotations: c.generatePodAnnotations(spec), + }, + Spec: v1.PodSpec{ + ServiceAccountName: c.OpConfig.PodServiceAccountName, + TerminationGracePeriodSeconds: &gracePeriod, + Containers: []v1.Container{poolerContainer}, + // TODO: add tolerations to scheduler pooler on the same node + // as database + //Tolerations: *tolerationsSpec, + }, } return podTemplate, nil diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 012df4072..c26e04b96 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -613,21 +613,6 @@ func TestConnPoolPodSpec(t *testing.T) { cluster: cluster, check: testEnvs, }, - { - subTest: "custom pod template", - spec: &acidv1.PostgresSpec{ - ConnectionPool: &acidv1.ConnectionPool{ - PodTemplate: &v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod-template", - }, - }, - }, - }, - expected: nil, - cluster: cluster, - check: testCustomPodTemplate, - }, } for _, tt := range tests { podSpec, err := tt.cluster.generateConnPoolPodTemplate(tt.spec) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 4f9d72e19..e44d50800 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -101,9 +101,17 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolR var msg string c.setProcessName("creating connection pool") - err := lookup( - c.OpConfig.ConnectionPool.Schema, - c.OpConfig.ConnectionPool.User) + schema := c.Spec.ConnectionPool.Schema + if schema == "" { + schema = c.OpConfig.ConnectionPool.Schema + } + + user := c.Spec.ConnectionPool.User + if user == "" { + user = c.OpConfig.ConnectionPool.User + } + + err := lookup(schema, user) if err != nil { msg = "could not prepare database for connection pool: %v" @@ -116,6 +124,9 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolR return nil, fmt.Errorf(msg, err) } + // client-go does retry 10 times (with NoBackoff by default) when the API + // believe a request can be retried and returns Retry-After header. This + // should be good enough to not think about it here. deployment, err := c.KubeClient. Deployments(deploymentSpec.Namespace). Create(deploymentSpec) @@ -154,14 +165,22 @@ func (c *Cluster) deleteConnectionPool() (err error) { return nil } + // Clean up the deployment object. If deployment resource we've remembered + // is somehow empty, try to delete based on what would we generate + deploymentName := c.connPoolName() + deployment := c.ConnectionPool.Deployment + + if deployment != nil { + deploymentName = deployment.Name + } + // set delete propagation policy to foreground, so that replica set will be // also deleted. policy := metav1.DeletePropagationForeground options := metav1.DeleteOptions{PropagationPolicy: &policy} - deployment := c.ConnectionPool.Deployment err = c.KubeClient. - Deployments(deployment.Namespace). - Delete(deployment.Name, &options) + Deployments(c.Namespace). + Delete(deploymentName, &options) if !k8sutil.ResourceNotFound(err) { c.logger.Debugf("Connection pool deployment was already deleted") @@ -172,12 +191,19 @@ func (c *Cluster) deleteConnectionPool() (err error) { c.logger.Infof("Connection pool deployment %q has been deleted", util.NameFromMeta(deployment.ObjectMeta)) + // Repeat the same for the service object + service := c.ConnectionPool.Service + serviceName := c.connPoolName() + + if service != nil { + serviceName = service.Name + } + // set delete propagation policy to foreground, so that all the dependant // will be deleted. - service := c.ConnectionPool.Service err = c.KubeClient. - Services(service.Namespace). - Delete(service.Name, &options) + Services(c.Namespace). + Delete(serviceName, &options) if !k8sutil.ResourceNotFound(err) { c.logger.Debugf("Connection pool service was already deleted") @@ -823,3 +849,34 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { return c.PodDisruptionBudget } + +// Perform actual patching of a connection pool deployment, assuming that all +// the check were already done before. +func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { + c.setProcessName("updating connection pool") + if c.ConnectionPool == nil || c.ConnectionPool.Deployment == nil { + return nil, fmt.Errorf("there is no connection pool in the cluster") + } + + patchData, err := specPatch(newDeployment.Spec) + if err != nil { + return nil, fmt.Errorf("could not form patch for the deployment: %v", err) + } + + // An update probably requires RetryOnConflict, but since only one operator + // worker at one time will try to update it changes of conflicts are + // minimal. + deployment, err := c.KubeClient. + Deployments(c.ConnectionPool.Deployment.Namespace). + Patch( + c.ConnectionPool.Deployment.Name, + types.MergePatchType, + patchData, "") + if err != nil { + return nil, fmt.Errorf("could not patch deployment: %v", err) + } + + c.ConnectionPool.Deployment = deployment + + return deployment, nil +} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 5ee827d4b..b59bf5533 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -2,6 +2,7 @@ package cluster import ( "fmt" + "reflect" batchv1beta1 "k8s.io/api/batch/v1beta1" v1 "k8s.io/api/core/v1" @@ -23,6 +24,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() + oldSpec := c.Postgresql c.setSpec(newSpec) defer func() { @@ -108,6 +110,20 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } } + // connection pool + oldPool := oldSpec.Spec.ConnectionPool + newPool := newSpec.Spec.ConnectionPool + if c.needConnectionPool() && + (c.ConnectionPool == nil || !reflect.DeepEqual(oldPool, newPool)) { + + c.logger.Debug("syncing connection pool") + + if err := c.syncConnectionPool(&oldSpec, newSpec); err != nil { + c.logger.Errorf("could not sync connection pool: %v", err) + return err + } + } + return err } @@ -594,3 +610,98 @@ func (c *Cluster) syncLogicalBackupJob() error { return nil } + +// Synchronize connection pool resources. Effectively we're interested only in +// synchronizing the corresponding deployment, but in case of deployment or +// service is missing, create it. After checking, also remember an object for +// the future references. +func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql) error { + if c.ConnectionPool == nil { + c.logger.Warning("Connection pool resources are empty") + c.ConnectionPool = &ConnectionPoolResources{} + } + + deployment, err := c.KubeClient. + Deployments(c.Namespace). + Get(c.connPoolName(), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Deployment %s for connection pool synchronization is not found, create it" + c.logger.Warningf(msg, c.connPoolName()) + + deploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) + if err != nil { + msg = "could not generate deployment for connection pool: %v" + return fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(deploymentSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Deployment = deployment + } else if err != nil { + return fmt.Errorf("could not get connection pool deployment to sync: %v", err) + } else { + c.ConnectionPool.Deployment = deployment + + // actual synchronization + oldConnPool := oldSpec.Spec.ConnectionPool + newConnPool := newSpec.Spec.ConnectionPool + sync, reason := c.needSyncConnPoolDeployments(oldConnPool, newConnPool) + if sync { + c.logger.Infof("Update connection pool deployment %s, reason: %s", + c.connPoolName(), reason) + + newDeploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) + if err != nil { + msg := "could not generate deployment for connection pool: %v" + return fmt.Errorf(msg, err) + } + + oldDeploymentSpec := c.ConnectionPool.Deployment + + deployment, err := c.updateConnPoolDeployment( + oldDeploymentSpec, + newDeploymentSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Deployment = deployment + return nil + } + } + + service, err := c.KubeClient. + Services(c.Namespace). + Get(c.connPoolName(), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Service %s for connection pool synchronization is not found, create it" + c.logger.Warningf(msg, c.connPoolName()) + + serviceSpec := c.generateConnPoolService(&newSpec.Spec) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(serviceSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Service = service + } else if err != nil { + return fmt.Errorf("could not get connection pool service to sync: %v", err) + } else { + // Service updates are not supported and probably not that useful anyway + c.ConnectionPool.Service = service + } + + return nil +} diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go new file mode 100644 index 000000000..c6928a64e --- /dev/null +++ b/pkg/cluster/sync_test.go @@ -0,0 +1,125 @@ +package cluster + +import ( + "fmt" + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func int32ToPointer(value int32) *int32 { + return &value +} + +func deploymentUpdated(cluster *Cluster, err error) error { + if cluster.ConnectionPool.Deployment.Spec.Replicas == nil || + *cluster.ConnectionPool.Deployment.Spec.Replicas != 2 { + return fmt.Errorf("Wrong nubmer of instances") + } + + return nil +} + +func objectsAreSaved(cluster *Cluster, err error) error { + if cluster.ConnectionPool == nil { + return fmt.Errorf("Connection pool resources are empty") + } + + if cluster.ConnectionPool.Deployment == nil { + return fmt.Errorf("Deployment was not saved") + } + + if cluster.ConnectionPool.Service == nil { + return fmt.Errorf("Service was not saved") + } + + return nil +} + +func TestConnPoolSynchronization(t *testing.T) { + testName := "Test connection pool synchronization" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100M", + ConnPoolDefaultMemoryLimit: "100M", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + clusterMissingObjects := *cluster + clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects() + + clusterMock := *cluster + clusterMock.KubeClient = k8sutil.NewMockKubernetesClient() + + tests := []struct { + subTest string + oldSpec *acidv1.Postgresql + newSpec *acidv1.Postgresql + cluster *Cluster + check func(cluster *Cluster, err error) error + }{ + { + subTest: "create if doesn't exist", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "update deployment", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + NumberOfInstances: int32ToPointer(1), + }, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + NumberOfInstances: int32ToPointer(2), + }, + }, + }, + cluster: &clusterMock, + check: deploymentUpdated, + }, + } + for _, tt := range tests { + err := tt.cluster.syncConnectionPool(tt.oldSpec, tt.newSpec) + + if err := tt.check(tt.cluster, err); err != nil { + t.Errorf("%s [%s]: Could not synchronize, %+v", + testName, tt.subTest, err) + } + } +} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index d5f9c744f..d2dd11586 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -497,5 +497,5 @@ func (c *Cluster) patroniUsesKubernetes() bool { } func (c *Cluster) needConnectionPool() bool { - return c.Spec.ConnectionPool != nil + return c.Spec.ConnectionPool != nil || c.Spec.EnableConnectionPool == true } diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index f5d280363..1748fbd1f 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -6,7 +6,9 @@ import ( "time" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -142,17 +144,52 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit - // connection pool + // Connection pool. Looks like we can't use defaulting in CRD before 1.17, + // so ensure default values here. result.ConnectionPool.NumberOfInstances = fromCRD.ConnectionPool.NumberOfInstances - result.ConnectionPool.Schema = fromCRD.ConnectionPool.Schema - result.ConnectionPool.User = fromCRD.ConnectionPool.User - result.ConnectionPool.Type = fromCRD.ConnectionPool.Type - result.ConnectionPool.Image = fromCRD.ConnectionPool.Image - result.ConnectionPool.Mode = fromCRD.ConnectionPool.Mode - result.ConnectionPool.ConnPoolDefaultCPURequest = fromCRD.ConnectionPool.DefaultCPURequest - result.ConnectionPool.ConnPoolDefaultMemoryRequest = fromCRD.ConnectionPool.DefaultMemoryRequest - result.ConnectionPool.ConnPoolDefaultCPULimit = fromCRD.ConnectionPool.DefaultCPULimit - result.ConnectionPool.ConnPoolDefaultMemoryLimit = fromCRD.ConnectionPool.DefaultMemoryLimit + if result.ConnectionPool.NumberOfInstances == nil || + *result.ConnectionPool.NumberOfInstances < 1 { + var value int32 + + value = 1 + result.ConnectionPool.NumberOfInstances = &value + } + + result.ConnectionPool.Schema = util.Coalesce( + fromCRD.ConnectionPool.Schema, + constants.ConnectionPoolSchemaName) + + result.ConnectionPool.User = util.Coalesce( + fromCRD.ConnectionPool.User, + constants.ConnectionPoolUserName) + + result.ConnectionPool.Type = util.Coalesce( + fromCRD.ConnectionPool.Type, + constants.ConnectionPoolDefaultType) + + result.ConnectionPool.Image = util.Coalesce( + fromCRD.ConnectionPool.Image, + "pgbouncer:0.0.1") + + result.ConnectionPool.Mode = util.Coalesce( + fromCRD.ConnectionPool.Mode, + constants.ConnectionPoolDefaultMode) + + result.ConnectionPool.ConnPoolDefaultCPURequest = util.Coalesce( + fromCRD.ConnectionPool.DefaultCPURequest, + constants.ConnectionPoolDefaultCpuRequest) + + result.ConnectionPool.ConnPoolDefaultMemoryRequest = util.Coalesce( + fromCRD.ConnectionPool.DefaultMemoryRequest, + constants.ConnectionPoolDefaultMemoryRequest) + + result.ConnectionPool.ConnPoolDefaultCPULimit = util.Coalesce( + fromCRD.ConnectionPool.DefaultCPULimit, + constants.ConnectionPoolDefaultCpuLimit) + + result.ConnectionPool.ConnPoolDefaultMemoryLimit = util.Coalesce( + fromCRD.ConnectionPool.DefaultMemoryLimit, + constants.ConnectionPoolDefaultMemoryLimit) return result } diff --git a/pkg/util/constants/pooler.go b/pkg/util/constants/pooler.go new file mode 100644 index 000000000..b25a12a6c --- /dev/null +++ b/pkg/util/constants/pooler.go @@ -0,0 +1,13 @@ +package constants + +// Connection pool specific constants +const ( + ConnectionPoolUserName = "pooler" + ConnectionPoolSchemaName = "pooler" + ConnectionPoolDefaultType = "pgbouncer" + ConnectionPoolDefaultMode = "transition" + ConnectionPoolDefaultCpuRequest = "100m" + ConnectionPoolDefaultCpuLimit = "100m" + ConnectionPoolDefaultMemoryRequest = "100M" + ConnectionPoolDefaultMemoryLimit = "100M" +) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 77e46476b..44a293025 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -16,6 +16,7 @@ import ( apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -28,6 +29,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func int32ToPointer(value int32) *int32 { + return &value +} + // KubernetesClient describes getters for Kubernetes objects type KubernetesClient struct { corev1.SecretsGetter @@ -62,16 +67,30 @@ type mockDeployment struct { appsv1.DeploymentInterface } +type mockDeploymentNotExist struct { + appsv1.DeploymentInterface +} + type MockDeploymentGetter struct { } +type MockDeploymentNotExistGetter struct { +} + type mockService struct { corev1.ServiceInterface } +type mockServiceNotExist struct { + corev1.ServiceInterface +} + type MockServiceGetter struct { } +type MockServiceNotExistGetter struct { +} + type mockConfigMap struct { corev1.ConfigMapInterface } @@ -245,6 +264,10 @@ func (mock *MockDeploymentGetter) Deployments(namespace string) appsv1.Deploymen return &mockDeployment{} } +func (mock *MockDeploymentNotExistGetter) Deployments(namespace string) appsv1.DeploymentInterface { + return &mockDeploymentNotExist{} +} + func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { return &apiappsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -257,10 +280,49 @@ func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) erro return nil } +func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + }, nil +} + +func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + Spec: apiappsv1.DeploymentSpec{ + Replicas: int32ToPointer(2), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + }, nil +} + +func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { + return nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonNotFound, + }, + } +} + +func (mock *mockDeploymentNotExist) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + }, nil +} + func (mock *MockServiceGetter) Services(namespace string) corev1.ServiceInterface { return &mockService{} } +func (mock *MockServiceNotExistGetter) Services(namespace string) corev1.ServiceInterface { + return &mockServiceNotExist{} +} + func (mock *mockService) Create(*v1.Service) (*v1.Service, error) { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -273,6 +335,30 @@ func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error { return nil } +func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockServiceNotExist) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { + return nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonNotFound, + }, + } +} + // NewMockKubernetesClient for other tests func NewMockKubernetesClient() KubernetesClient { return KubernetesClient{ @@ -282,3 +368,10 @@ func NewMockKubernetesClient() KubernetesClient { ServicesGetter: &MockServiceGetter{}, } } + +func ClientMissingObjects() KubernetesClient { + return KubernetesClient{ + DeploymentsGetter: &MockDeploymentNotExistGetter{}, + ServicesGetter: &MockServiceNotExistGetter{}, + } +}