From be438b77e6807f903e816f3bb08123f27fcd0a15 Mon Sep 17 00:00:00 2001 From: Dmitrii Dolgov <9erthalion6@gmail.com> Date: Tue, 14 Jan 2020 15:36:30 +0100 Subject: [PATCH 1/9] WIP Connection pooler support Add an initial support for a connection pooler. The idea is to make it generic enough to be able to switch a corresponding docker image to change from pgbouncer to e.g. odyssey. Operator needs to create a deployment with pooler and a service for it to access. --- docker/DebugDockerfile | 13 +- manifests/minimal-postgres-manifest.yaml | 2 + manifests/operator-service-account-rbac.yaml | 1 + pkg/apis/acid.zalan.do/v1/postgresql_type.go | 12 ++ pkg/apis/acid.zalan.do/v1/register.go | 5 +- pkg/cluster/cluster.go | 55 ++++- pkg/cluster/database.go | 67 ++++++ pkg/cluster/k8sres.go | 216 ++++++++++++++++++- pkg/cluster/resources.go | 96 +++++++++ pkg/cluster/sync.go | 6 + pkg/cluster/types.go | 2 + pkg/cluster/util.go | 18 +- pkg/spec/types.go | 4 +- pkg/util/config/config.go | 11 + pkg/util/constants/roles.go | 23 +- pkg/util/k8sutil/k8sutil.go | 2 + 16 files changed, 513 insertions(+), 20 deletions(-) diff --git a/docker/DebugDockerfile b/docker/DebugDockerfile index 76dadf6df..0c11fe3b4 100644 --- a/docker/DebugDockerfile +++ b/docker/DebugDockerfile @@ -3,8 +3,17 @@ MAINTAINER Team ACID @ Zalando # We need root certificates to deal with teams api over https RUN apk --no-cache add ca-certificates go git musl-dev -RUN go get github.com/derekparker/delve/cmd/dlv COPY build/* / -CMD ["/root/go/bin/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"] +RUN addgroup -g 1000 pgo +RUN adduser -D -u 1000 -G pgo -g 'Postgres Operator' pgo + +RUN go get github.com/derekparker/delve/cmd/dlv +RUN cp /root/go/bin/dlv /dlv +RUN chown -R pgo:pgo /dlv + +USER pgo:pgo +RUN ls -l / + +CMD ["/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"] diff --git a/manifests/minimal-postgres-manifest.yaml b/manifests/minimal-postgres-manifest.yaml index 75dfdf07f..ff7785cec 100644 --- a/manifests/minimal-postgres-manifest.yaml +++ b/manifests/minimal-postgres-manifest.yaml @@ -17,3 +17,5 @@ spec: foo: zalando # dbname: owner postgresql: version: "11" + connectionPool: + type: "pgbouncer" diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index a37abe476..d7cd6fd74 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -119,6 +119,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 07b42d4d4..c50d9c902 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -27,6 +27,8 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` + ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` + TeamID string `json:"teamId"` DockerImage string `json:"dockerImage,omitempty"` @@ -154,3 +156,13 @@ type UserFlags []string type PostgresStatus struct { PostgresClusterStatus string `json:"PostgresClusterStatus"` } + +// Options for connection pooler +type ConnectionPool struct { + NumberOfInstances *int32 `json:"instancesNumber,omitempty"` + Schema *string `json:"schema,omitempty"` + User *string `json:"user,omitempty"` + Type *string `json:"type,omitempty"` + Mode *string `json:"mode,omitempty"` + PodTemplate *v1.PodTemplateSpec `json:"podTemplate,omitempty"` +} diff --git a/pkg/apis/acid.zalan.do/v1/register.go b/pkg/apis/acid.zalan.do/v1/register.go index 1c30e35fb..34def209d 100644 --- a/pkg/apis/acid.zalan.do/v1/register.go +++ b/pkg/apis/acid.zalan.do/v1/register.go @@ -10,7 +10,8 @@ import ( // APIVersion of the `postgresql` and `operator` CRDs const ( - APIVersion = "v1" + APIVersion = "v1" + PostgresqlKind = "postgresql" ) var ( @@ -42,7 +43,7 @@ func addKnownTypes(scheme *runtime.Scheme) error { // AddKnownType assumes derives the type kind from the type name, which is always uppercase. // For our CRDs we use lowercase names historically, therefore we have to supply the name separately. // TODO: User uppercase CRDResourceKind of our types in the next major API version - scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("postgresql"), &Postgresql{}) + scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind(PostgresqlKind), &Postgresql{}) scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("postgresqlList"), &PostgresqlList{}) scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("OperatorConfiguration"), &OperatorConfiguration{}) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index c560c4cdf..1681e7d2e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -48,11 +48,17 @@ type Config struct { PodServiceAccountRoleBinding *rbacv1beta1.RoleBinding } +type ConnectionPoolResources struct { + Deployment *appsv1.Deployment + Service *v1.Service +} + type kubeResources struct { Services map[PostgresRole]*v1.Service Endpoints map[PostgresRole]*v1.Endpoints Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet + ConnectionPool *ConnectionPoolResources PodDisruptionBudget *policybeta1.PodDisruptionBudget //Pods are treated separately //PVCs are treated separately @@ -184,7 +190,8 @@ func (c *Cluster) isNewCluster() bool { func (c *Cluster) initUsers() error { c.setProcessName("initializing users") - // clear our the previous state of the cluster users (in case we are running a sync). + // clear our the previous state of the cluster users (in case we are + // running a sync). c.systemUsers = map[string]spec.PgUser{} c.pgUsers = map[string]spec.PgUser{} @@ -292,8 +299,10 @@ func (c *Cluster) Create() error { } c.logger.Infof("pods are ready") - // create database objects unless we are running without pods or disabled that feature explicitly + // create database objects unless we are running without pods or disabled + // that feature explicitly if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) { + c.logger.Infof("Create roles") if err = c.createRoles(); err != nil { return fmt.Errorf("could not create users: %v", err) } @@ -316,6 +325,26 @@ func (c *Cluster) Create() error { c.logger.Errorf("could not list resources: %v", err) } + // Create connection pool deployment and services if necessary. Since we + // need to peform some operations with the database itself (e.g. install + // lookup function), do it as the last step, when everything is available. + // + // Do not consider connection pool as a strict requirement, and if + // something fails, report warning + if c.needConnectionPool() { + if c.ConnectionPool != nil { + c.logger.Warning("Connection pool already exists in the cluster") + return nil + } + connPool, err := c.createConnectionPool() + if err != nil { + c.logger.Warningf("could not create connection pool: %v", err) + return nil + } + c.logger.Infof("connection pool %q has been successfully created", + util.NameFromMeta(connPool.Deployment.ObjectMeta)) + } + return nil } @@ -745,6 +774,12 @@ func (c *Cluster) Delete() { c.logger.Warningf("could not remove leftover patroni objects; %v", err) } + // Delete connection pool objects anyway, even if it's not mentioned in the + // manifest, just to not keep orphaned components in case if something went + // wrong + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } } //NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status). @@ -811,6 +846,22 @@ func (c *Cluster) initSystemUsers() { Name: c.OpConfig.ReplicationUsername, Password: util.RandomPassword(constants.PasswordLength), } + + // Connection pool user is an exception, if requested it's going to be + // created by operator as a normal pgUser + if c.needConnectionPool() { + + username := c.Spec.ConnectionPool.User + if username == nil { + username = &c.OpConfig.ConnectionPool.User + } + + c.systemUsers[constants.ConnectionPoolUserKeyName] = spec.PgUser{ + Origin: spec.RoleConnectionPool, + Name: *username, + Password: util.RandomPassword(constants.PasswordLength), + } + } } func (c *Cluster) initRobotUsers() error { diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 07ea011a6..1b74bd6b6 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -1,10 +1,12 @@ package cluster import ( + "bytes" "database/sql" "fmt" "net" "strings" + "text/template" "time" "github.com/lib/pq" @@ -28,6 +30,25 @@ const ( getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` + connectionPoolLookup = ` + CREATE SCHEMA IF NOT EXISTS {{.pool_schema}}; + + CREATE OR REPLACE FUNCTION {{.pool_schema}}.user_lookup( + in i_username text, out uname text, out phash text) + RETURNS record AS $$ + BEGIN + SELECT usename, passwd FROM pg_catalog.pg_shadow + WHERE usename = i_username INTO uname, phash; + RETURN; + END; + $$ LANGUAGE plpgsql SECURITY DEFINER; + + REVOKE ALL ON FUNCTION {{.pool_schema}}.user_lookup(text) + FROM public, {{.pool_schema}}; + GRANT EXECUTE ON FUNCTION {{.pool_schema}}.user_lookup(text) + TO {{.pool_user}}; + GRANT USAGE ON SCHEMA {{.pool_schema}} TO {{.pool_user}}; + ` ) func (c *Cluster) pgConnectionString() string { @@ -243,3 +264,49 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin return result } + +// Creates a connection pool credentials lookup function in every database to +// perform remote authentification. +func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error { + var stmtBytes bytes.Buffer + + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init database connection") + } + defer func() { + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + + currentDatabases, err := c.getDatabases() + if err != nil { + msg := "could not get databases to install pool lookup function: %v" + return fmt.Errorf(msg, err) + } + + templater := template.Must(template.New("sql").Parse(connectionPoolLookup)) + + for dbname, _ := range currentDatabases { + c.logger.Infof("Install pool lookup function into %s", dbname) + + params := TemplateParams{ + "pool_schema": poolSchema, + "pool_user": poolUser, + } + + if err := templater.Execute(&stmtBytes, params); err != nil { + return fmt.Errorf("could not prepare sql statement %+v: %v", + params, err) + } + + if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { + return fmt.Errorf("could not execute sql statement %s: %v", + stmtBytes.String(), err) + } + + c.logger.Infof("Pool lookup function installed into %s", dbname) + } + + return nil +} diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index e6561e0f3..7d6b9be07 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -31,6 +31,8 @@ const ( patroniPGParametersParameterName = "parameters" patroniPGHBAConfParameterName = "pg_hba" localHost = "127.0.0.1/32" + connectionPoolContainer = "connection-pool" + pgPort = 5432 ) type pgUser struct { @@ -66,6 +68,10 @@ func (c *Cluster) statefulSetName() string { return c.Name } +func (c *Cluster) connPoolName() string { + return c.Name + "-pooler" +} + func (c *Cluster) endpointName(role PostgresRole) string { name := c.Name if role == Replica { @@ -84,6 +90,28 @@ func (c *Cluster) serviceName(role PostgresRole) string { return name } +func (c *Cluster) serviceAddress(role PostgresRole) string { + service, exist := c.Services[role] + + if exist { + return service.ObjectMeta.Name + } + + c.logger.Warningf("No service for role %s", role) + return "" +} + +func (c *Cluster) servicePort(role PostgresRole) string { + service, exist := c.Services[role] + + if exist { + return fmt.Sprint(service.Spec.Ports[0].Port) + } + + c.logger.Warningf("No service for role %s", role) + return "" +} + func (c *Cluster) podDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } @@ -315,7 +343,11 @@ func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]stri return *tolerationsSpec } - if len(podToleration["key"]) > 0 || len(podToleration["operator"]) > 0 || len(podToleration["value"]) > 0 || len(podToleration["effect"]) > 0 { + if len(podToleration["key"]) > 0 || + len(podToleration["operator"]) > 0 || + len(podToleration["value"]) > 0 || + len(podToleration["effect"]) > 0 { + return []v1.Toleration{ { Key: podToleration["key"], @@ -1669,3 +1701,185 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar { func (c *Cluster) getLogicalBackupJobName() (jobName string) { return "logical-backup-" + c.clusterName().Name } + +func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( + *v1.PodTemplateSpec, error) { + + podTemplate := spec.ConnectionPool.PodTemplate + + if podTemplate == nil { + gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) + resources, err := generateResourceRequirements( + c.Spec.Resources, + c.makeDefaultResources()) + + effectiveMode := spec.ConnectionPool.Mode + if effectiveMode == nil { + effectiveMode = &c.OpConfig.ConnectionPool.Mode + } + + if err != nil { + return nil, fmt.Errorf("could not generate resource requirements: %v", err) + } + + secretSelector := func(key string) *v1.SecretKeySelector { + return &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: c.credentialSecretName(c.OpConfig.SuperUsername), + }, + Key: key, + } + } + + envVars := []v1.EnvVar{ + { + Name: "PGHOST", + Value: c.serviceAddress(Master), + }, + { + Name: "PGPORT", + Value: c.servicePort(Master), + }, + { + Name: "PGUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + // the convention is to use the same schema name as + // connection pool username + { + Name: "PGSCHEMA", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + { + Name: "PGPASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("password"), + }, + }, + { + Name: "CONNECTION_POOL_MODE", + Value: *effectiveMode, + }, + { + Name: "CONNECTION_POOL_PORT", + Value: fmt.Sprint(pgPort), + }, + } + + poolerContainer := v1.Container{ + Name: connectionPoolContainer, + Image: c.OpConfig.ConnectionPool.Image, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: *resources, + Ports: []v1.ContainerPort{ + { + ContainerPort: pgPort, + Protocol: v1.ProtocolTCP, + }, + }, + Env: envVars, + } + + podTemplate = &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: c.connPoolLabelsSelector().MatchLabels, + Namespace: c.Namespace, + Annotations: c.generatePodAnnotations(spec), + }, + Spec: v1.PodSpec{ + ServiceAccountName: c.OpConfig.PodServiceAccountName, + TerminationGracePeriodSeconds: &gracePeriod, + Containers: []v1.Container{poolerContainer}, + // TODO: add tolerations to scheduler pooler on the same node + // as database + //Tolerations: *tolerationsSpec, + }, + } + } + + return podTemplate, nil +} + +func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( + *appsv1.Deployment, error) { + + podTemplate, err := c.generateConnPoolPodTemplate(spec) + numberOfInstances := spec.ConnectionPool.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = c.OpConfig.ConnectionPool.NumberOfInstances + } + + if err != nil { + return nil, err + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connPoolName(), + Namespace: c.Namespace, + Labels: c.labelsSet(true), + Annotations: map[string]string{}, + // make Postgresql CRD object its owner, so that if CRD object is + // deleted, this object will be deleted even if something went + // wrong and operator didn't deleted it. + OwnerReferences: []metav1.OwnerReference{ + { + UID: c.Statefulset.ObjectMeta.UID, + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: c.Statefulset.ObjectMeta.Name, + }, + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: numberOfInstances, + Selector: c.connPoolLabelsSelector(), + Template: *podTemplate, + }, + } + + return deployment, nil +} + +func (c *Cluster) generateConnPoolService(spec *acidv1.PostgresSpec) *v1.Service { + serviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: c.connPoolName(), + Port: pgPort, + TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, + }, + }, + Type: v1.ServiceTypeClusterIP, + Selector: map[string]string{ + "connection-pool": c.connPoolName(), + }, + } + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connPoolName(), + Namespace: c.Namespace, + Labels: c.labelsSet(true), + Annotations: map[string]string{}, + // make Postgresql CRD object its owner, so that if CRD object is + // deleted, this object will be deleted even if something went + // wrong and operator didn't deleted it. + OwnerReferences: []metav1.OwnerReference{ + { + UID: c.Postgresql.ObjectMeta.UID, + APIVersion: acidv1.APIVersion, + Kind: acidv1.PostgresqlKind, + Name: c.Postgresql.ObjectMeta.Name, + }, + }, + }, + Spec: serviceSpec, + } + + return service +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index c94a7bb46..7baa96c02 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -90,6 +90,102 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) { return statefulSet, nil } +// Prepare the database for connection pool to be used, i.e. install lookup +// function (do it first, because it should be fast and if it didn't succeed, +// it doesn't makes sense to create more K8S objects. At this moment we assume +// that necessary connection pool user exists. +// +// After that create all the objects for connection pool, namely a deployment +// with a chosen pooler and a service to expose it. +func (c *Cluster) createConnectionPool() (*ConnectionPoolResources, error) { + var msg string + c.setProcessName("creating connection pool") + + err := c.installLookupFunction( + c.OpConfig.ConnectionPool.Schema, + c.OpConfig.ConnectionPool.User) + + if err != nil { + msg = "could not prepare database for connection pool: %v" + return nil, fmt.Errorf(msg, err) + } + + deploymentSpec, err := c.generateConnPoolDeployment(&c.Spec) + if err != nil { + msg = "could not generate deployment for connection pool: %v" + return nil, fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(deploymentSpec) + + if err != nil { + return nil, err + } + + serviceSpec := c.generateConnPoolService(&c.Spec) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(serviceSpec) + + if err != nil { + return nil, err + } + + c.ConnectionPool = &ConnectionPoolResources{ + Deployment: deployment, + Service: service, + } + c.logger.Debugf("created new connection pool %q, uid: %q", + util.NameFromMeta(deployment.ObjectMeta), deployment.UID) + + return c.ConnectionPool, nil +} + +func (c *Cluster) deleteConnectionPool() (err error) { + c.setProcessName("deleting connection pool") + c.logger.Debugln("deleting connection pool") + + // Lack of connection pooler objects is not a fatal error, just log it if + // it was present before in the manifest + if c.needConnectionPool() && c.ConnectionPool == nil { + c.logger.Infof("No connection pool to delete") + return nil + } + + deployment := c.ConnectionPool.Deployment + err = c.KubeClient. + Deployments(deployment.Namespace). + Delete(deployment.Name, c.deleteOptions) + + if !k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pool deployment was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete deployment: %v", err) + } + + c.logger.Infof("Connection pool deployment %q has been deleted", + util.NameFromMeta(deployment.ObjectMeta)) + + service := c.ConnectionPool.Service + err = c.KubeClient. + Services(service.Namespace). + Delete(service.Name, c.deleteOptions) + + if !k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pool service was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete service: %v", err) + } + + c.logger.Infof("Connection pool service %q has been deleted", + util.NameFromMeta(deployment.ObjectMeta)) + + c.ConnectionPool = nil + return nil +} + func getPodIndex(podName string) (int32, error) { parts := strings.Split(podName, "-") if len(parts) == 0 { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index fa4fc9ec1..5ee827d4b 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -456,6 +456,12 @@ func (c *Cluster) syncRoles() (err error) { for _, u := range c.pgUsers { userNames = append(userNames, u.Name) } + + // An exception from system users, connection pool user + connPoolUser := c.systemUsers[constants.ConnectionPoolUserKeyName] + userNames = append(userNames, connPoolUser.Name) + c.pgUsers[connPoolUser.Name] = connPoolUser + dbUsers, err = c.readPgUsersFromDatabase(userNames) if err != nil { return fmt.Errorf("error getting users from the database: %v", err) diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 138b7015c..286505621 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -69,3 +69,5 @@ type ClusterStatus struct { Spec acidv1.PostgresSpec Error error } + +type TemplateParams map[string]interface{} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 8c02fed2e..d5f9c744f 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -408,7 +408,19 @@ func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set { } func (c *Cluster) labelsSelector() *metav1.LabelSelector { - return &metav1.LabelSelector{MatchLabels: c.labelsSet(false), MatchExpressions: nil} + return &metav1.LabelSelector{ + MatchLabels: c.labelsSet(false), + MatchExpressions: nil, + } +} + +func (c *Cluster) connPoolLabelsSelector() *metav1.LabelSelector { + return &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "connection-pool": c.connPoolName(), + }, + MatchExpressions: nil, + } } func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) labels.Set { @@ -483,3 +495,7 @@ func (c *Cluster) GetSpec() (*acidv1.Postgresql, error) { func (c *Cluster) patroniUsesKubernetes() bool { return c.OpConfig.EtcdHost == "" } + +func (c *Cluster) needConnectionPool() bool { + return c.Spec.ConnectionPool != nil +} diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 3e6bec8db..6f071c44a 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -23,13 +23,15 @@ const fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespa // RoleOrigin contains the code of the origin of a role type RoleOrigin int -// The rolesOrigin constant values must be sorted by the role priority for resolveNameConflict(...) to work. +// The rolesOrigin constant values must be sorted by the role priority for +// resolveNameConflict(...) to work. const ( RoleOriginUnknown RoleOrigin = iota RoleOriginManifest RoleOriginInfrastructure RoleOriginTeamsAPI RoleOriginSystem + RoleConnectionPool ) type syncUserOperation int diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index e4e429abb..a7a522566 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -83,6 +83,16 @@ type LogicalBackup struct { LogicalBackupS3SSE string `name:"logical_backup_s3_sse" default:"AES256"` } +// Operator options for connection pooler +type ConnectionPool struct { + NumberOfInstances *int32 `name:"connection_pool_instances_number" default:"1"` + Schema string `name:"connection_pool_schema" default:"pooler"` + User string `name:"connection_pool_user" default:"pooler"` + Type string `name:"connection_pool_type" default:"pgbouncer"` + Image string `name:"connection_pool_image" default:"pgbouncer:1.0"` + Mode string `name:"connection_pool_mode" default:"session"` +} + // Config describes operator config type Config struct { CRD @@ -90,6 +100,7 @@ type Config struct { Auth Scalyr LogicalBackup + ConnectionPool WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to' EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS diff --git a/pkg/util/constants/roles.go b/pkg/util/constants/roles.go index 2c20d69db..3d201142c 100644 --- a/pkg/util/constants/roles.go +++ b/pkg/util/constants/roles.go @@ -2,15 +2,16 @@ package constants // Roles specific constants const ( - PasswordLength = 64 - SuperuserKeyName = "superuser" - ReplicationUserKeyName = "replication" - RoleFlagSuperuser = "SUPERUSER" - RoleFlagInherit = "INHERIT" - RoleFlagLogin = "LOGIN" - RoleFlagNoLogin = "NOLOGIN" - RoleFlagCreateRole = "CREATEROLE" - RoleFlagCreateDB = "CREATEDB" - RoleFlagReplication = "REPLICATION" - RoleFlagByPassRLS = "BYPASSRLS" + PasswordLength = 64 + SuperuserKeyName = "superuser" + ConnectionPoolUserKeyName = "pooler" + ReplicationUserKeyName = "replication" + RoleFlagSuperuser = "SUPERUSER" + RoleFlagInherit = "INHERIT" + RoleFlagLogin = "LOGIN" + RoleFlagNoLogin = "NOLOGIN" + RoleFlagCreateRole = "CREATEROLE" + RoleFlagCreateDB = "CREATEDB" + RoleFlagReplication = "REPLICATION" + RoleFlagByPassRLS = "BYPASSRLS" ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index c7b2366b0..be9e216dd 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -39,6 +39,7 @@ type KubernetesClient struct { corev1.NamespacesGetter corev1.ServiceAccountsGetter appsv1.StatefulSetsGetter + appsv1.DeploymentsGetter rbacv1beta1.RoleBindingsGetter policyv1beta1.PodDisruptionBudgetsGetter apiextbeta1.CustomResourceDefinitionsGetter @@ -101,6 +102,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.NodesGetter = client.CoreV1() kubeClient.NamespacesGetter = client.CoreV1() kubeClient.StatefulSetsGetter = client.AppsV1() + kubeClient.DeploymentsGetter = client.AppsV1() kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1() kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RoleBindingsGetter = client.RbacV1beta1() From c028be493fbc6c71df34257ed505abd5e68d115c Mon Sep 17 00:00:00 2001 From: Dmitrii Dolgov <9erthalion6@gmail.com> Date: Wed, 15 Jan 2020 16:46:03 +0100 Subject: [PATCH 2/9] Improve cleaning up Set up a proper owner reference to StatefulSet, and delete with foreground policy to not leave orphans. --- pkg/cluster/k8sres.go | 56 ++++++++++++++++++++++++---------------- pkg/cluster/resources.go | 10 +++++-- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 7d6b9be07..209402a20 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1804,6 +1804,26 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( return podTemplate, nil } +// Return an array of ownerReferences to make an arbitraty object dependent on +// the StatefulSet. Dependency is made on StatefulSet instead of PostgreSQL CRD +// while the former is represent the actual state, and only it's deletion means +// we delete the cluster (e.g. if CRD was deleted, StatefulSet somehow +// survived, we can't delete an object because it will affect the functioning +// cluster). +func (c *Cluster) ownerReferences() []metav1.OwnerReference { + controller := true + + return []metav1.OwnerReference{ + { + UID: c.Statefulset.ObjectMeta.UID, + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: c.Statefulset.ObjectMeta.Name, + Controller: &controller, + }, + } +} + func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( *appsv1.Deployment, error) { @@ -1823,17 +1843,13 @@ func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( Namespace: c.Namespace, Labels: c.labelsSet(true), Annotations: map[string]string{}, - // make Postgresql CRD object its owner, so that if CRD object is - // deleted, this object will be deleted even if something went - // wrong and operator didn't deleted it. - OwnerReferences: []metav1.OwnerReference{ - { - UID: c.Statefulset.ObjectMeta.UID, - APIVersion: "apps/v1", - Kind: "StatefulSet", - Name: c.Statefulset.ObjectMeta.Name, - }, - }, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Ophaned" + // propagation policy, which means that it's deletion will not + // clean up this deployment, but there is a hope that this object + // will be garbage collected if something went wrong and operator + // didn't deleted it. + OwnerReferences: c.ownerReferences(), }, Spec: appsv1.DeploymentSpec{ Replicas: numberOfInstances, @@ -1866,17 +1882,13 @@ func (c *Cluster) generateConnPoolService(spec *acidv1.PostgresSpec) *v1.Service Namespace: c.Namespace, Labels: c.labelsSet(true), Annotations: map[string]string{}, - // make Postgresql CRD object its owner, so that if CRD object is - // deleted, this object will be deleted even if something went - // wrong and operator didn't deleted it. - OwnerReferences: []metav1.OwnerReference{ - { - UID: c.Postgresql.ObjectMeta.UID, - APIVersion: acidv1.APIVersion, - Kind: acidv1.PostgresqlKind, - Name: c.Postgresql.ObjectMeta.Name, - }, - }, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Ophaned" + // propagation policy, which means that it's deletion will not + // clean up this service, but there is a hope that this object will + // be garbage collected if something went wrong and operator didn't + // deleted it. + OwnerReferences: c.ownerReferences(), }, Spec: serviceSpec, } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 7baa96c02..b4c7e578f 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -154,10 +154,14 @@ func (c *Cluster) deleteConnectionPool() (err error) { return nil } + // set delete propagation policy to foreground, so that replica set will be + // also deleted. + policy := metav1.DeletePropagationForeground + options := metav1.DeleteOptions{PropagationPolicy: &policy} deployment := c.ConnectionPool.Deployment err = c.KubeClient. Deployments(deployment.Namespace). - Delete(deployment.Name, c.deleteOptions) + Delete(deployment.Name, &options) if !k8sutil.ResourceNotFound(err) { c.logger.Debugf("Connection pool deployment was already deleted") @@ -168,10 +172,12 @@ func (c *Cluster) deleteConnectionPool() (err error) { c.logger.Infof("Connection pool deployment %q has been deleted", util.NameFromMeta(deployment.ObjectMeta)) + // set delete propagation policy to foreground, so that all the dependant + // will be deleted. service := c.ConnectionPool.Service err = c.KubeClient. Services(service.Namespace). - Delete(service.Name, c.deleteOptions) + Delete(service.Name, &options) if !k8sutil.ResourceNotFound(err) { c.logger.Debugf("Connection pool service was already deleted") From 7254039f2ef41e5b6941277478130d27d615d4f8 Mon Sep 17 00:00:00 2001 From: Dmitrii Dolgov <9erthalion6@gmail.com> Date: Mon, 20 Jan 2020 16:06:45 +0100 Subject: [PATCH 3/9] Add CRD configuration With convertion for config, and start tests. --- .../v1/operator_configuration_type.go | 15 +++++ pkg/apis/acid.zalan.do/v1/postgresql_type.go | 2 + pkg/cluster/k8sres.go | 47 +++++++++++--- pkg/cluster/k8sres_test.go | 65 +++++++++++++++++++ pkg/controller/operator_config.go | 12 ++++ pkg/util/config/config.go | 16 +++-- 6 files changed, 142 insertions(+), 15 deletions(-) diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index ded5261fb..58f171843 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -152,6 +152,20 @@ type ScalyrConfiguration struct { ScalyrMemoryLimit string `json:"scalyr_memory_limit,omitempty"` } +// Defines default configuration for connection pool +type ConnectionPoolConfiguration struct { + NumberOfInstances *int32 `json:"connection_pool_instances_number,omitempty"` + Schema string `json:"connection_pool_schema,omitempty"` + User string `json:"connection_pool_user,omitempty"` + Type string `json:"connection_pool_type,omitempty"` + Image string `json:"connection_pool_image,omitempty"` + Mode string `json:"connection_pool_mode,omitempty"` + DefaultCPURequest string `name:"connection_pool_default_cpu_request,omitempty"` + DefaultMemoryRequest string `name:"connection_pool_default_memory_request,omitempty"` + DefaultCPULimit string `name:"connection_pool_default_cpu_limit,omitempty"` + DefaultMemoryLimit string `name:"connection_pool_default_memory_limit,omitempty"` +} + // OperatorLogicalBackupConfiguration defines configuration for logical backup type OperatorLogicalBackupConfiguration struct { Schedule string `json:"logical_backup_schedule,omitempty"` @@ -188,6 +202,7 @@ type OperatorConfigurationData struct { LoggingRESTAPI LoggingRESTAPIConfiguration `json:"logging_rest_api"` Scalyr ScalyrConfiguration `json:"scalyr"` LogicalBackup OperatorLogicalBackupConfiguration `json:"logical_backup"` + ConnectionPool ConnectionPoolConfiguration `json:"connection_pool"` } //Duration shortens this frequently used name diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index c50d9c902..e4d56c6e8 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -165,4 +165,6 @@ type ConnectionPool struct { Type *string `json:"type,omitempty"` Mode *string `json:"mode,omitempty"` PodTemplate *v1.PodTemplateSpec `json:"podTemplate,omitempty"` + + Resources `json:"resources,omitempty"` } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 209402a20..f992c2244 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -120,10 +120,39 @@ func (c *Cluster) makeDefaultResources() acidv1.Resources { config := c.OpConfig - defaultRequests := acidv1.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest} - defaultLimits := acidv1.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit} + defaultRequests := acidv1.ResourceDescription{ + CPU: config.Resources.DefaultCPURequest, + Memory: config.Resources.DefaultMemoryRequest, + } + defaultLimits := acidv1.ResourceDescription{ + CPU: config.Resources.DefaultCPULimit, + Memory: config.Resources.DefaultMemoryLimit, + } - return acidv1.Resources{ResourceRequests: defaultRequests, ResourceLimits: defaultLimits} + return acidv1.Resources{ + ResourceRequests: defaultRequests, + ResourceLimits: defaultLimits, + } +} + +// Generate default resource section for connection pool deployment, to be used +// if nothing custom is specified in the manifest +func (c *Cluster) makeDefaultConnPoolResources() acidv1.Resources { + config := c.OpConfig + + defaultRequests := acidv1.ResourceDescription{ + CPU: config.ConnectionPool.ConnPoolDefaultCPURequest, + Memory: config.ConnectionPool.ConnPoolDefaultMemoryRequest, + } + defaultLimits := acidv1.ResourceDescription{ + CPU: config.ConnectionPool.ConnPoolDefaultCPULimit, + Memory: config.ConnectionPool.ConnPoolDefaultMemoryLimit, + } + + return acidv1.Resources{ + ResourceRequests: defaultRequests, + ResourceLimits: defaultLimits, + } } func generateResourceRequirements(resources acidv1.Resources, defaultResources acidv1.Resources) (*v1.ResourceRequirements, error) { @@ -765,12 +794,12 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef request := spec.Resources.ResourceRequests.Memory if request == "" { - request = c.OpConfig.DefaultMemoryRequest + request = c.OpConfig.Resources.DefaultMemoryRequest } limit := spec.Resources.ResourceLimits.Memory if limit == "" { - limit = c.OpConfig.DefaultMemoryLimit + limit = c.OpConfig.Resources.DefaultMemoryLimit } isSmaller, err := util.IsSmallerQuantity(request, limit) @@ -792,12 +821,12 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef // TODO #413 sidecarRequest := sidecar.Resources.ResourceRequests.Memory if request == "" { - request = c.OpConfig.DefaultMemoryRequest + request = c.OpConfig.Resources.DefaultMemoryRequest } sidecarLimit := sidecar.Resources.ResourceLimits.Memory if limit == "" { - limit = c.OpConfig.DefaultMemoryLimit + limit = c.OpConfig.Resources.DefaultMemoryLimit } isSmaller, err := util.IsSmallerQuantity(sidecarRequest, sidecarLimit) @@ -1710,8 +1739,8 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( if podTemplate == nil { gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) resources, err := generateResourceRequirements( - c.Spec.Resources, - c.makeDefaultResources()) + spec.ConnectionPool.Resources, + c.makeDefaultConnPoolResources()) effectiveMode := spec.ConnectionPool.Mode if effectiveMode == nil { diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index e8fe05456..aa9ef6513 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -1,6 +1,7 @@ package cluster import ( + "errors" "reflect" v1 "k8s.io/api/core/v1" @@ -451,3 +452,67 @@ func TestSecretVolume(t *testing.T) { } } } + +func TestConnPoolPodTemplate(t *testing.T) { + testName := "Test connection pool pod template generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100M", + ConnPoolDefaultMemoryLimit: "100M", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + var clusterNoDefaultRes = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{}, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + }{ + { + subTest: "empty pod template", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + }, + { + subTest: "no default resources", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`), + cluster: clusterNoDefaultRes, + }, + } + for _, tt := range tests { + _, err := tt.cluster.generateConnPoolPodTemplate(tt.spec) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + } +} diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index c6f10faa0..f5d280363 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -142,5 +142,17 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit + // connection pool + result.ConnectionPool.NumberOfInstances = fromCRD.ConnectionPool.NumberOfInstances + result.ConnectionPool.Schema = fromCRD.ConnectionPool.Schema + result.ConnectionPool.User = fromCRD.ConnectionPool.User + result.ConnectionPool.Type = fromCRD.ConnectionPool.Type + result.ConnectionPool.Image = fromCRD.ConnectionPool.Image + result.ConnectionPool.Mode = fromCRD.ConnectionPool.Mode + result.ConnectionPool.ConnPoolDefaultCPURequest = fromCRD.ConnectionPool.DefaultCPURequest + result.ConnectionPool.ConnPoolDefaultMemoryRequest = fromCRD.ConnectionPool.DefaultMemoryRequest + result.ConnectionPool.ConnPoolDefaultCPULimit = fromCRD.ConnectionPool.DefaultCPULimit + result.ConnectionPool.ConnPoolDefaultMemoryLimit = fromCRD.ConnectionPool.DefaultMemoryLimit + return result } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index a7a522566..2baf99931 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -85,12 +85,16 @@ type LogicalBackup struct { // Operator options for connection pooler type ConnectionPool struct { - NumberOfInstances *int32 `name:"connection_pool_instances_number" default:"1"` - Schema string `name:"connection_pool_schema" default:"pooler"` - User string `name:"connection_pool_user" default:"pooler"` - Type string `name:"connection_pool_type" default:"pgbouncer"` - Image string `name:"connection_pool_image" default:"pgbouncer:1.0"` - Mode string `name:"connection_pool_mode" default:"session"` + NumberOfInstances *int32 `name:"connection_pool_instances_number" default:"1"` + Schema string `name:"connection_pool_schema" default:"pooler"` + User string `name:"connection_pool_user" default:"pooler"` + Type string `name:"connection_pool_type" default:"pgbouncer"` + Image string `name:"connection_pool_image" default:"pgbouncer:1.0"` + Mode string `name:"connection_pool_mode" default:"session"` + ConnPoolDefaultCPURequest string `name:"connection_pool_default_cpu_request" default:"100m"` + ConnPoolDefaultMemoryRequest string `name:"connection_pool_default_memory_request" default:"100Mi"` + ConnPoolDefaultCPULimit string `name:"connection_pool_default_cpu_limit" default:"3"` + ConnPoolDefaultMemoryLimit string `name:"connection_pool_default_memory_limit" default:"1Gi"` } // Config describes operator config From 8bd2086cd25b52a80724f493a7cf15335d72bade Mon Sep 17 00:00:00 2001 From: Dmitrii Dolgov <9erthalion6@gmail.com> Date: Wed, 22 Jan 2020 14:49:54 +0100 Subject: [PATCH 4/9] Add more tests --- pkg/cluster/cluster.go | 2 +- pkg/cluster/k8sres.go | 5 + pkg/cluster/k8sres_test.go | 327 +++++++++++++++++++++++++++++++++- pkg/cluster/resources.go | 4 +- pkg/cluster/resources_test.go | 65 +++++++ pkg/cluster/types.go | 2 + pkg/util/k8sutil/k8sutil.go | 57 +++++- 7 files changed, 452 insertions(+), 10 deletions(-) create mode 100644 pkg/cluster/resources_test.go diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1681e7d2e..afcb0df82 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -336,7 +336,7 @@ func (c *Cluster) Create() error { c.logger.Warning("Connection pool already exists in the cluster") return nil } - connPool, err := c.createConnectionPool() + connPool, err := c.createConnectionPool(c.installLookupFunction) if err != nil { c.logger.Warningf("could not create connection pool: %v", err) return nil diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index f992c2244..8b7860886 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1842,6 +1842,11 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( func (c *Cluster) ownerReferences() []metav1.OwnerReference { controller := true + if c.Statefulset == nil { + c.logger.Warning("Cannot get owner reference, no statefulset") + return []metav1.OwnerReference{} + } + return []metav1.OwnerReference{ { UID: c.Statefulset.ObjectMeta.UID, diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index aa9ef6513..012df4072 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -2,6 +2,7 @@ package cluster import ( "errors" + "fmt" "reflect" v1 "k8s.io/api/core/v1" @@ -14,6 +15,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" + appsv1 "k8s.io/api/apps/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -453,7 +455,80 @@ func TestSecretVolume(t *testing.T) { } } -func TestConnPoolPodTemplate(t *testing.T) { +func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"] + if cpuReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest { + return fmt.Errorf("CPU request doesn't match, got %s, expected %s", + cpuReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest) + } + + memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"] + if memReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest { + return fmt.Errorf("Memory request doesn't match, got %s, expected %s", + memReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest) + } + + cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"] + if cpuLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit { + return fmt.Errorf("CPU limit doesn't match, got %s, expected %s", + cpuLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit) + } + + memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"] + if memLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit { + return fmt.Errorf("Memory limit doesn't match, got %s, expected %s", + memLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit) + } + + return nil +} + +func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + poolLabels := podSpec.ObjectMeta.Labels["connection-pool"] + + if poolLabels != cluster.connPoolLabelsSelector().MatchLabels["connection-pool"] { + return fmt.Errorf("Pod labels do not match, got %+v, expected %+v", + podSpec.ObjectMeta.Labels, cluster.connPoolLabelsSelector().MatchLabels) + } + + return nil +} + +func testEnvs(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + required := map[string]bool{ + "PGHOST": false, + "PGPORT": false, + "PGUSER": false, + "PGSCHEMA": false, + "PGPASSWORD": false, + "CONNECTION_POOL_MODE": false, + "CONNECTION_POOL_PORT": false, + } + + envs := podSpec.Spec.Containers[0].Env + for _, env := range envs { + required[env.Name] = true + } + + for env, value := range required { + if !value { + return fmt.Errorf("Environment variable %s is not present", env) + } + } + + return nil +} + +func testCustomPodTemplate(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + if podSpec.ObjectMeta.Name != "test-pod-template" { + return fmt.Errorf("Custom pod template is not used, current spec %+v", + podSpec) + } + + return nil +} + +func TestConnPoolPodSpec(t *testing.T) { testName := "Test connection pool pod template generation" var cluster = New( Config{ @@ -484,19 +559,23 @@ func TestConnPoolPodTemplate(t *testing.T) { }, }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } + tests := []struct { subTest string spec *acidv1.PostgresSpec expected error cluster *Cluster + check func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error }{ { - subTest: "empty pod template", + subTest: "default configuration", spec: &acidv1.PostgresSpec{ ConnectionPool: &acidv1.ConnectionPool{}, }, expected: nil, cluster: cluster, + check: noCheck, }, { subTest: "no default resources", @@ -505,14 +584,256 @@ func TestConnPoolPodTemplate(t *testing.T) { }, expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`), cluster: clusterNoDefaultRes, + check: noCheck, + }, + { + subTest: "default resources are set", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testResources, + }, + { + subTest: "labels for service", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testLabels, + }, + { + subTest: "required envs", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testEnvs, + }, + { + subTest: "custom pod template", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + PodTemplate: &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-template", + }, + }, + }, + }, + expected: nil, + cluster: cluster, + check: testCustomPodTemplate, }, } for _, tt := range tests { - _, err := tt.cluster.generateConnPoolPodTemplate(tt.spec) + podSpec, err := tt.cluster.generateConnPoolPodTemplate(tt.spec) if err != tt.expected && err.Error() != tt.expected.Error() { t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", testName, tt.subTest, err, tt.expected) } + + err = tt.check(cluster, podSpec) + if err != nil { + t.Errorf("%s [%s]: Pod spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + +func testDeploymentOwnwerReference(cluster *Cluster, deployment *appsv1.Deployment) error { + owner := deployment.ObjectMeta.OwnerReferences[0] + + if owner.Name != cluster.Statefulset.ObjectMeta.Name { + return fmt.Errorf("Ownere reference is incorrect, got %s, expected %s", + owner.Name, cluster.Statefulset.ObjectMeta.Name) + } + + return nil +} + +func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error { + labels := deployment.Spec.Selector.MatchLabels + expected := cluster.connPoolLabelsSelector().MatchLabels + + if labels["connection-pool"] != expected["connection-pool"] { + return fmt.Errorf("Labels are incorrect, got %+v, expected %+v", + labels, expected) + } + + return nil +} + +func TestConnPoolDeploymentSpec(t *testing.T) { + testName := "Test connection pool deployment spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100M", + ConnPoolDefaultMemoryLimit: "100M", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + noCheck := func(cluster *Cluster, deployment *appsv1.Deployment) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, deployment *appsv1.Deployment) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testDeploymentOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testSelector, + }, + } + for _, tt := range tests { + deployment, err := tt.cluster.generateConnPoolDeployment(tt.spec) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, deployment) + if err != nil { + t.Errorf("%s [%s]: Deployment spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + +func testServiceOwnwerReference(cluster *Cluster, service *v1.Service) error { + owner := service.ObjectMeta.OwnerReferences[0] + + if owner.Name != cluster.Statefulset.ObjectMeta.Name { + return fmt.Errorf("Ownere reference is incorrect, got %s, expected %s", + owner.Name, cluster.Statefulset.ObjectMeta.Name) + } + + return nil +} + +func testServiceSelector(cluster *Cluster, service *v1.Service) error { + selector := service.Spec.Selector + + if selector["connection-pool"] != cluster.connPoolName() { + return fmt.Errorf("Selector is incorrect, got %s, expected %s", + selector["connection-pool"], cluster.connPoolName()) + } + + return nil +} + +func TestConnPoolServiceSpec(t *testing.T) { + testName := "Test connection pool service spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100M", + ConnPoolDefaultMemoryLimit: "100M", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + noCheck := func(cluster *Cluster, deployment *v1.Service) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + cluster *Cluster + check func(cluster *Cluster, deployment *v1.Service) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: testServiceOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: testServiceSelector, + }, + } + for _, tt := range tests { + service := tt.cluster.generateConnPoolService(tt.spec) + + if err := tt.check(cluster, service); err != nil { + t.Errorf("%s [%s]: Service spec is incorrect, %+v", + testName, tt.subTest, err) + } } } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index b4c7e578f..4f9d72e19 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -97,11 +97,11 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) { // // After that create all the objects for connection pool, namely a deployment // with a chosen pooler and a service to expose it. -func (c *Cluster) createConnectionPool() (*ConnectionPoolResources, error) { +func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolResources, error) { var msg string c.setProcessName("creating connection pool") - err := c.installLookupFunction( + err := lookup( c.OpConfig.ConnectionPool.Schema, c.OpConfig.ConnectionPool.User) diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go new file mode 100644 index 000000000..a3754c564 --- /dev/null +++ b/pkg/cluster/resources_test.go @@ -0,0 +1,65 @@ +package cluster + +import ( + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func mockInstallLookupFunction(schema string, user string) error { + return nil +} + +func TestConnPoolCreationAndDeletion(t *testing.T) { + testName := "Test connection pool creation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100M", + ConnPoolDefaultMemoryLimit: "100M", + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + } + poolResources, err := cluster.createConnectionPool(mockInstallLookupFunction) + + if err != nil { + t.Errorf("%s: Cannot create connection pool, %s, %+v", + testName, err, poolResources) + } + + if poolResources.Deployment == nil { + t.Errorf("%s: Connection pool deployment is empty", testName) + } + + if poolResources.Service == nil { + t.Errorf("%s: Connection pool service is empty", testName) + } + + err = cluster.deleteConnectionPool() + if err != nil { + t.Errorf("%s: Cannot delete connection pool, %s", testName, err) + } +} diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 286505621..04d00cb58 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -71,3 +71,5 @@ type ClusterStatus struct { } type TemplateParams map[string]interface{} + +type InstallFunction func(schema string, user string) error diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index be9e216dd..fed85d1d6 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -9,6 +9,7 @@ import ( batchv1beta1 "k8s.io/api/batch/v1beta1" clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1" + apiappsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -56,6 +57,20 @@ type mockSecret struct { type MockSecretGetter struct { } +type mockDeployment struct { + appsv1.DeploymentInterface +} + +type MockDeploymentGetter struct { +} + +type mockService struct { + corev1.ServiceInterface +} + +type MockServiceGetter struct { +} + type mockConfigMap struct { corev1.ConfigMapInterface } @@ -232,19 +247,53 @@ func (c *mockConfigMap) Get(name string, options metav1.GetOptions) (*v1.ConfigM } // Secrets to be mocked -func (c *MockSecretGetter) Secrets(namespace string) corev1.SecretInterface { +func (mock *MockSecretGetter) Secrets(namespace string) corev1.SecretInterface { return &mockSecret{} } // ConfigMaps to be mocked -func (c *MockConfigMapsGetter) ConfigMaps(namespace string) corev1.ConfigMapInterface { +func (mock *MockConfigMapsGetter) ConfigMaps(namespace string) corev1.ConfigMapInterface { return &mockConfigMap{} } +func (mock *MockDeploymentGetter) Deployments(namespace string) appsv1.DeploymentInterface { + return &mockDeployment{} +} + +func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + }, nil +} + +func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) error { + return nil +} + +func (mock *MockServiceGetter) Services(namespace string) corev1.ServiceInterface { + return &mockService{} +} + +func (mock *mockService) Create(*v1.Service) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error { + return nil +} + // NewMockKubernetesClient for other tests func NewMockKubernetesClient() KubernetesClient { return KubernetesClient{ - SecretsGetter: &MockSecretGetter{}, - ConfigMapsGetter: &MockConfigMapsGetter{}, + SecretsGetter: &MockSecretGetter{}, + ConfigMapsGetter: &MockConfigMapsGetter{}, + DeploymentsGetter: &MockDeploymentGetter{}, + ServicesGetter: &MockServiceGetter{}, } } From 3ff1147bcefafc89dca084e17116f638a9923508 Mon Sep 17 00:00:00 2001 From: Dmitrii Dolgov <9erthalion6@gmail.com> Date: Wed, 12 Feb 2020 17:28:48 +0100 Subject: [PATCH 5/9] Various improvements Add synchronization logic. For now get rid of podTemplate, type fields. Add crd validation & configuration part, put retry on top of lookup function installation. --- .../templates/clusterrole.yaml | 1 + manifests/operatorconfiguration.crd.yaml | 37 ++++ pkg/apis/acid.zalan.do/v1/postgresql_type.go | 18 +- pkg/cluster/cluster.go | 44 ++++- pkg/cluster/database.go | 22 ++- pkg/cluster/k8sres.go | 175 +++++++++--------- pkg/cluster/k8sres_test.go | 15 -- pkg/cluster/resources.go | 75 +++++++- pkg/cluster/sync.go | 111 +++++++++++ pkg/cluster/sync_test.go | 125 +++++++++++++ pkg/cluster/util.go | 2 +- pkg/controller/operator_config.go | 57 +++++- pkg/util/constants/pooler.go | 13 ++ pkg/util/k8sutil/k8sutil.go | 93 ++++++++++ 14 files changed, 653 insertions(+), 135 deletions(-) create mode 100644 pkg/cluster/sync_test.go create mode 100644 pkg/util/constants/pooler.go diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index f8550a539..316f7de15 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -106,6 +106,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index 7bd5c529c..c44955771 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -294,6 +294,43 @@ spec: pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' scalyr_server_url: type: string + connection_pool: + type: object + properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_instances_number: + type: integer + #default: 1 + connection_pool_image: + type: string + #default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1" + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_default_cpu_limit: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_cpu_request: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_memory_limit: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100m" + connection_pool_default_memory_request: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100m" status: type: object additionalProperties: diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index e4d56c6e8..c56d70626 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -27,7 +27,8 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` - ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` + EnableConnectionPool bool `json:"enable_connection_pool,omitempty"` + ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` TeamID string `json:"teamId"` DockerImage string `json:"dockerImage,omitempty"` @@ -159,12 +160,15 @@ type PostgresStatus struct { // Options for connection pooler type ConnectionPool struct { - NumberOfInstances *int32 `json:"instancesNumber,omitempty"` - Schema *string `json:"schema,omitempty"` - User *string `json:"user,omitempty"` - Type *string `json:"type,omitempty"` - Mode *string `json:"mode,omitempty"` - PodTemplate *v1.PodTemplateSpec `json:"podTemplate,omitempty"` + NumberOfInstances *int32 `json:"instancesNumber,omitempty"` + Schema string `json:"schema,omitempty"` + User string `json:"user,omitempty"` + Mode string `json:"mode,omitempty"` + DockerImage string `json:"dockerImage,omitempty"` + // TODO: prepared snippets of configuration, one can choose via type, e.g. + // pgbouncer-large (with higher resources) or odyssey-small (with smaller + // resources) + // Type string `json:"type,omitempty"` Resources `json:"resources,omitempty"` } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index afcb0df82..8e65b12ea 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/r3labs/diff" "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -723,6 +724,17 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } + // connection pool + if !reflect.DeepEqual(oldSpec.Spec.ConnectionPool, + newSpec.Spec.ConnectionPool) { + c.logger.Debug("syncing connection pool") + + if err := c.syncConnectionPool(oldSpec, newSpec); err != nil { + c.logger.Errorf("could not sync connection pool: %v", err) + updateFailed = true + } + } + return nil } @@ -852,13 +864,13 @@ func (c *Cluster) initSystemUsers() { if c.needConnectionPool() { username := c.Spec.ConnectionPool.User - if username == nil { - username = &c.OpConfig.ConnectionPool.User + if username == "" { + username = c.OpConfig.ConnectionPool.User } c.systemUsers[constants.ConnectionPoolUserKeyName] = spec.PgUser{ Origin: spec.RoleConnectionPool, - Name: *username, + Name: username, Password: util.RandomPassword(constants.PasswordLength), } } @@ -1188,3 +1200,29 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error { return c.deleteClusterObject(get, deleteConfigMapFn, "configmap") } + +// Test if two connection pool configuration needs to be synced. For simplicity +// compare not the actual K8S objects, but the configuration itself and request +// sync if there is any difference. +func (c *Cluster) needSyncConnPoolDeployments(oldSpec, newSpec *acidv1.ConnectionPool) (sync bool, reasons []string) { + reasons = []string{} + sync = false + + changelog, err := diff.Diff(oldSpec, newSpec) + if err != nil { + c.logger.Infof("Cannot get diff, do not do anything, %+v", err) + return false, reasons + } else { + if len(changelog) > 0 { + sync = true + } + + for _, change := range changelog { + msg := fmt.Sprintf("%s %+v from %s to %s", + change.Type, change.Path, change.From, change.To) + reasons = append(reasons, msg) + } + } + + return sync, reasons +} diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 1b74bd6b6..0c1e07a11 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -300,8 +300,26 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error { params, err) } - if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { - return fmt.Errorf("could not execute sql statement %s: %v", + // golang sql will do retries couple of times if pq driver reports + // connections issues (driver.ErrBadConn), but since our query is + // idempotent, we can retry in a view of other errors (e.g. due to + // failover a db is temporary in a read-only mode or so) to make sure + // it was applied. + execErr := retryutil.Retry( + constants.PostgresConnectTimeout, + constants.PostgresConnectRetryTimeout, + func() (bool, error) { + if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { + msg := fmt.Errorf("could not execute sql statement %s: %v", + stmtBytes.String(), err) + return false, msg + } + + return true, nil + }) + + if execErr != nil { + return fmt.Errorf("could not execute after retries %s: %v", stmtBytes.String(), err) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 8b7860886..3121691d9 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1734,100 +1734,99 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) { func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( *v1.PodTemplateSpec, error) { - podTemplate := spec.ConnectionPool.PodTemplate + gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) + resources, err := generateResourceRequirements( + spec.ConnectionPool.Resources, + c.makeDefaultConnPoolResources()) - if podTemplate == nil { - gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) - resources, err := generateResourceRequirements( - spec.ConnectionPool.Resources, - c.makeDefaultConnPoolResources()) + effectiveMode := util.Coalesce( + spec.ConnectionPool.Mode, + c.OpConfig.ConnectionPool.Mode) - effectiveMode := spec.ConnectionPool.Mode - if effectiveMode == nil { - effectiveMode = &c.OpConfig.ConnectionPool.Mode + effectiveDockerImage := util.Coalesce( + spec.ConnectionPool.DockerImage, + c.OpConfig.ConnectionPool.Image) + + if err != nil { + return nil, fmt.Errorf("could not generate resource requirements: %v", err) + } + + secretSelector := func(key string) *v1.SecretKeySelector { + return &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: c.credentialSecretName(c.OpConfig.SuperUsername), + }, + Key: key, } + } - if err != nil { - return nil, fmt.Errorf("could not generate resource requirements: %v", err) - } + envVars := []v1.EnvVar{ + { + Name: "PGHOST", + Value: c.serviceAddress(Master), + }, + { + Name: "PGPORT", + Value: c.servicePort(Master), + }, + { + Name: "PGUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + // the convention is to use the same schema name as + // connection pool username + { + Name: "PGSCHEMA", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + { + Name: "PGPASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("password"), + }, + }, + { + Name: "CONNECTION_POOL_MODE", + Value: effectiveMode, + }, + { + Name: "CONNECTION_POOL_PORT", + Value: fmt.Sprint(pgPort), + }, + } - secretSelector := func(key string) *v1.SecretKeySelector { - return &v1.SecretKeySelector{ - LocalObjectReference: v1.LocalObjectReference{ - Name: c.credentialSecretName(c.OpConfig.SuperUsername), - }, - Key: key, - } - } + poolerContainer := v1.Container{ + Name: connectionPoolContainer, + Image: effectiveDockerImage, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: *resources, + Ports: []v1.ContainerPort{ + { + ContainerPort: pgPort, + Protocol: v1.ProtocolTCP, + }, + }, + Env: envVars, + } - envVars := []v1.EnvVar{ - { - Name: "PGHOST", - Value: c.serviceAddress(Master), - }, - { - Name: "PGPORT", - Value: c.servicePort(Master), - }, - { - Name: "PGUSER", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: secretSelector("username"), - }, - }, - // the convention is to use the same schema name as - // connection pool username - { - Name: "PGSCHEMA", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: secretSelector("username"), - }, - }, - { - Name: "PGPASSWORD", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: secretSelector("password"), - }, - }, - { - Name: "CONNECTION_POOL_MODE", - Value: *effectiveMode, - }, - { - Name: "CONNECTION_POOL_PORT", - Value: fmt.Sprint(pgPort), - }, - } - - poolerContainer := v1.Container{ - Name: connectionPoolContainer, - Image: c.OpConfig.ConnectionPool.Image, - ImagePullPolicy: v1.PullIfNotPresent, - Resources: *resources, - Ports: []v1.ContainerPort{ - { - ContainerPort: pgPort, - Protocol: v1.ProtocolTCP, - }, - }, - Env: envVars, - } - - podTemplate = &v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: c.connPoolLabelsSelector().MatchLabels, - Namespace: c.Namespace, - Annotations: c.generatePodAnnotations(spec), - }, - Spec: v1.PodSpec{ - ServiceAccountName: c.OpConfig.PodServiceAccountName, - TerminationGracePeriodSeconds: &gracePeriod, - Containers: []v1.Container{poolerContainer}, - // TODO: add tolerations to scheduler pooler on the same node - // as database - //Tolerations: *tolerationsSpec, - }, - } + podTemplate := &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: c.connPoolLabelsSelector().MatchLabels, + Namespace: c.Namespace, + Annotations: c.generatePodAnnotations(spec), + }, + Spec: v1.PodSpec{ + ServiceAccountName: c.OpConfig.PodServiceAccountName, + TerminationGracePeriodSeconds: &gracePeriod, + Containers: []v1.Container{poolerContainer}, + // TODO: add tolerations to scheduler pooler on the same node + // as database + //Tolerations: *tolerationsSpec, + }, } return podTemplate, nil diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 012df4072..c26e04b96 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -613,21 +613,6 @@ func TestConnPoolPodSpec(t *testing.T) { cluster: cluster, check: testEnvs, }, - { - subTest: "custom pod template", - spec: &acidv1.PostgresSpec{ - ConnectionPool: &acidv1.ConnectionPool{ - PodTemplate: &v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod-template", - }, - }, - }, - }, - expected: nil, - cluster: cluster, - check: testCustomPodTemplate, - }, } for _, tt := range tests { podSpec, err := tt.cluster.generateConnPoolPodTemplate(tt.spec) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 4f9d72e19..e44d50800 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -101,9 +101,17 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolR var msg string c.setProcessName("creating connection pool") - err := lookup( - c.OpConfig.ConnectionPool.Schema, - c.OpConfig.ConnectionPool.User) + schema := c.Spec.ConnectionPool.Schema + if schema == "" { + schema = c.OpConfig.ConnectionPool.Schema + } + + user := c.Spec.ConnectionPool.User + if user == "" { + user = c.OpConfig.ConnectionPool.User + } + + err := lookup(schema, user) if err != nil { msg = "could not prepare database for connection pool: %v" @@ -116,6 +124,9 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolR return nil, fmt.Errorf(msg, err) } + // client-go does retry 10 times (with NoBackoff by default) when the API + // believe a request can be retried and returns Retry-After header. This + // should be good enough to not think about it here. deployment, err := c.KubeClient. Deployments(deploymentSpec.Namespace). Create(deploymentSpec) @@ -154,14 +165,22 @@ func (c *Cluster) deleteConnectionPool() (err error) { return nil } + // Clean up the deployment object. If deployment resource we've remembered + // is somehow empty, try to delete based on what would we generate + deploymentName := c.connPoolName() + deployment := c.ConnectionPool.Deployment + + if deployment != nil { + deploymentName = deployment.Name + } + // set delete propagation policy to foreground, so that replica set will be // also deleted. policy := metav1.DeletePropagationForeground options := metav1.DeleteOptions{PropagationPolicy: &policy} - deployment := c.ConnectionPool.Deployment err = c.KubeClient. - Deployments(deployment.Namespace). - Delete(deployment.Name, &options) + Deployments(c.Namespace). + Delete(deploymentName, &options) if !k8sutil.ResourceNotFound(err) { c.logger.Debugf("Connection pool deployment was already deleted") @@ -172,12 +191,19 @@ func (c *Cluster) deleteConnectionPool() (err error) { c.logger.Infof("Connection pool deployment %q has been deleted", util.NameFromMeta(deployment.ObjectMeta)) + // Repeat the same for the service object + service := c.ConnectionPool.Service + serviceName := c.connPoolName() + + if service != nil { + serviceName = service.Name + } + // set delete propagation policy to foreground, so that all the dependant // will be deleted. - service := c.ConnectionPool.Service err = c.KubeClient. - Services(service.Namespace). - Delete(service.Name, &options) + Services(c.Namespace). + Delete(serviceName, &options) if !k8sutil.ResourceNotFound(err) { c.logger.Debugf("Connection pool service was already deleted") @@ -823,3 +849,34 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { return c.PodDisruptionBudget } + +// Perform actual patching of a connection pool deployment, assuming that all +// the check were already done before. +func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { + c.setProcessName("updating connection pool") + if c.ConnectionPool == nil || c.ConnectionPool.Deployment == nil { + return nil, fmt.Errorf("there is no connection pool in the cluster") + } + + patchData, err := specPatch(newDeployment.Spec) + if err != nil { + return nil, fmt.Errorf("could not form patch for the deployment: %v", err) + } + + // An update probably requires RetryOnConflict, but since only one operator + // worker at one time will try to update it changes of conflicts are + // minimal. + deployment, err := c.KubeClient. + Deployments(c.ConnectionPool.Deployment.Namespace). + Patch( + c.ConnectionPool.Deployment.Name, + types.MergePatchType, + patchData, "") + if err != nil { + return nil, fmt.Errorf("could not patch deployment: %v", err) + } + + c.ConnectionPool.Deployment = deployment + + return deployment, nil +} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 5ee827d4b..b59bf5533 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -2,6 +2,7 @@ package cluster import ( "fmt" + "reflect" batchv1beta1 "k8s.io/api/batch/v1beta1" v1 "k8s.io/api/core/v1" @@ -23,6 +24,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() + oldSpec := c.Postgresql c.setSpec(newSpec) defer func() { @@ -108,6 +110,20 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } } + // connection pool + oldPool := oldSpec.Spec.ConnectionPool + newPool := newSpec.Spec.ConnectionPool + if c.needConnectionPool() && + (c.ConnectionPool == nil || !reflect.DeepEqual(oldPool, newPool)) { + + c.logger.Debug("syncing connection pool") + + if err := c.syncConnectionPool(&oldSpec, newSpec); err != nil { + c.logger.Errorf("could not sync connection pool: %v", err) + return err + } + } + return err } @@ -594,3 +610,98 @@ func (c *Cluster) syncLogicalBackupJob() error { return nil } + +// Synchronize connection pool resources. Effectively we're interested only in +// synchronizing the corresponding deployment, but in case of deployment or +// service is missing, create it. After checking, also remember an object for +// the future references. +func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql) error { + if c.ConnectionPool == nil { + c.logger.Warning("Connection pool resources are empty") + c.ConnectionPool = &ConnectionPoolResources{} + } + + deployment, err := c.KubeClient. + Deployments(c.Namespace). + Get(c.connPoolName(), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Deployment %s for connection pool synchronization is not found, create it" + c.logger.Warningf(msg, c.connPoolName()) + + deploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) + if err != nil { + msg = "could not generate deployment for connection pool: %v" + return fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(deploymentSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Deployment = deployment + } else if err != nil { + return fmt.Errorf("could not get connection pool deployment to sync: %v", err) + } else { + c.ConnectionPool.Deployment = deployment + + // actual synchronization + oldConnPool := oldSpec.Spec.ConnectionPool + newConnPool := newSpec.Spec.ConnectionPool + sync, reason := c.needSyncConnPoolDeployments(oldConnPool, newConnPool) + if sync { + c.logger.Infof("Update connection pool deployment %s, reason: %s", + c.connPoolName(), reason) + + newDeploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) + if err != nil { + msg := "could not generate deployment for connection pool: %v" + return fmt.Errorf(msg, err) + } + + oldDeploymentSpec := c.ConnectionPool.Deployment + + deployment, err := c.updateConnPoolDeployment( + oldDeploymentSpec, + newDeploymentSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Deployment = deployment + return nil + } + } + + service, err := c.KubeClient. + Services(c.Namespace). + Get(c.connPoolName(), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Service %s for connection pool synchronization is not found, create it" + c.logger.Warningf(msg, c.connPoolName()) + + serviceSpec := c.generateConnPoolService(&newSpec.Spec) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(serviceSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Service = service + } else if err != nil { + return fmt.Errorf("could not get connection pool service to sync: %v", err) + } else { + // Service updates are not supported and probably not that useful anyway + c.ConnectionPool.Service = service + } + + return nil +} diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go new file mode 100644 index 000000000..c6928a64e --- /dev/null +++ b/pkg/cluster/sync_test.go @@ -0,0 +1,125 @@ +package cluster + +import ( + "fmt" + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func int32ToPointer(value int32) *int32 { + return &value +} + +func deploymentUpdated(cluster *Cluster, err error) error { + if cluster.ConnectionPool.Deployment.Spec.Replicas == nil || + *cluster.ConnectionPool.Deployment.Spec.Replicas != 2 { + return fmt.Errorf("Wrong nubmer of instances") + } + + return nil +} + +func objectsAreSaved(cluster *Cluster, err error) error { + if cluster.ConnectionPool == nil { + return fmt.Errorf("Connection pool resources are empty") + } + + if cluster.ConnectionPool.Deployment == nil { + return fmt.Errorf("Deployment was not saved") + } + + if cluster.ConnectionPool.Service == nil { + return fmt.Errorf("Service was not saved") + } + + return nil +} + +func TestConnPoolSynchronization(t *testing.T) { + testName := "Test connection pool synchronization" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100M", + ConnPoolDefaultMemoryLimit: "100M", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + clusterMissingObjects := *cluster + clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects() + + clusterMock := *cluster + clusterMock.KubeClient = k8sutil.NewMockKubernetesClient() + + tests := []struct { + subTest string + oldSpec *acidv1.Postgresql + newSpec *acidv1.Postgresql + cluster *Cluster + check func(cluster *Cluster, err error) error + }{ + { + subTest: "create if doesn't exist", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "update deployment", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + NumberOfInstances: int32ToPointer(1), + }, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + NumberOfInstances: int32ToPointer(2), + }, + }, + }, + cluster: &clusterMock, + check: deploymentUpdated, + }, + } + for _, tt := range tests { + err := tt.cluster.syncConnectionPool(tt.oldSpec, tt.newSpec) + + if err := tt.check(tt.cluster, err); err != nil { + t.Errorf("%s [%s]: Could not synchronize, %+v", + testName, tt.subTest, err) + } + } +} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index d5f9c744f..d2dd11586 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -497,5 +497,5 @@ func (c *Cluster) patroniUsesKubernetes() bool { } func (c *Cluster) needConnectionPool() bool { - return c.Spec.ConnectionPool != nil + return c.Spec.ConnectionPool != nil || c.Spec.EnableConnectionPool == true } diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index f5d280363..1748fbd1f 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -6,7 +6,9 @@ import ( "time" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -142,17 +144,52 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit - // connection pool + // Connection pool. Looks like we can't use defaulting in CRD before 1.17, + // so ensure default values here. result.ConnectionPool.NumberOfInstances = fromCRD.ConnectionPool.NumberOfInstances - result.ConnectionPool.Schema = fromCRD.ConnectionPool.Schema - result.ConnectionPool.User = fromCRD.ConnectionPool.User - result.ConnectionPool.Type = fromCRD.ConnectionPool.Type - result.ConnectionPool.Image = fromCRD.ConnectionPool.Image - result.ConnectionPool.Mode = fromCRD.ConnectionPool.Mode - result.ConnectionPool.ConnPoolDefaultCPURequest = fromCRD.ConnectionPool.DefaultCPURequest - result.ConnectionPool.ConnPoolDefaultMemoryRequest = fromCRD.ConnectionPool.DefaultMemoryRequest - result.ConnectionPool.ConnPoolDefaultCPULimit = fromCRD.ConnectionPool.DefaultCPULimit - result.ConnectionPool.ConnPoolDefaultMemoryLimit = fromCRD.ConnectionPool.DefaultMemoryLimit + if result.ConnectionPool.NumberOfInstances == nil || + *result.ConnectionPool.NumberOfInstances < 1 { + var value int32 + + value = 1 + result.ConnectionPool.NumberOfInstances = &value + } + + result.ConnectionPool.Schema = util.Coalesce( + fromCRD.ConnectionPool.Schema, + constants.ConnectionPoolSchemaName) + + result.ConnectionPool.User = util.Coalesce( + fromCRD.ConnectionPool.User, + constants.ConnectionPoolUserName) + + result.ConnectionPool.Type = util.Coalesce( + fromCRD.ConnectionPool.Type, + constants.ConnectionPoolDefaultType) + + result.ConnectionPool.Image = util.Coalesce( + fromCRD.ConnectionPool.Image, + "pgbouncer:0.0.1") + + result.ConnectionPool.Mode = util.Coalesce( + fromCRD.ConnectionPool.Mode, + constants.ConnectionPoolDefaultMode) + + result.ConnectionPool.ConnPoolDefaultCPURequest = util.Coalesce( + fromCRD.ConnectionPool.DefaultCPURequest, + constants.ConnectionPoolDefaultCpuRequest) + + result.ConnectionPool.ConnPoolDefaultMemoryRequest = util.Coalesce( + fromCRD.ConnectionPool.DefaultMemoryRequest, + constants.ConnectionPoolDefaultMemoryRequest) + + result.ConnectionPool.ConnPoolDefaultCPULimit = util.Coalesce( + fromCRD.ConnectionPool.DefaultCPULimit, + constants.ConnectionPoolDefaultCpuLimit) + + result.ConnectionPool.ConnPoolDefaultMemoryLimit = util.Coalesce( + fromCRD.ConnectionPool.DefaultMemoryLimit, + constants.ConnectionPoolDefaultMemoryLimit) return result } diff --git a/pkg/util/constants/pooler.go b/pkg/util/constants/pooler.go new file mode 100644 index 000000000..b25a12a6c --- /dev/null +++ b/pkg/util/constants/pooler.go @@ -0,0 +1,13 @@ +package constants + +// Connection pool specific constants +const ( + ConnectionPoolUserName = "pooler" + ConnectionPoolSchemaName = "pooler" + ConnectionPoolDefaultType = "pgbouncer" + ConnectionPoolDefaultMode = "transition" + ConnectionPoolDefaultCpuRequest = "100m" + ConnectionPoolDefaultCpuLimit = "100m" + ConnectionPoolDefaultMemoryRequest = "100M" + ConnectionPoolDefaultMemoryLimit = "100M" +) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index fed85d1d6..a58261167 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -15,6 +15,7 @@ import ( apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -27,6 +28,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func int32ToPointer(value int32) *int32 { + return &value +} + // KubernetesClient describes getters for Kubernetes objects type KubernetesClient struct { corev1.SecretsGetter @@ -61,16 +66,30 @@ type mockDeployment struct { appsv1.DeploymentInterface } +type mockDeploymentNotExist struct { + appsv1.DeploymentInterface +} + type MockDeploymentGetter struct { } +type MockDeploymentNotExistGetter struct { +} + type mockService struct { corev1.ServiceInterface } +type mockServiceNotExist struct { + corev1.ServiceInterface +} + type MockServiceGetter struct { } +type MockServiceNotExistGetter struct { +} + type mockConfigMap struct { corev1.ConfigMapInterface } @@ -260,6 +279,10 @@ func (mock *MockDeploymentGetter) Deployments(namespace string) appsv1.Deploymen return &mockDeployment{} } +func (mock *MockDeploymentNotExistGetter) Deployments(namespace string) appsv1.DeploymentInterface { + return &mockDeploymentNotExist{} +} + func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { return &apiappsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -272,10 +295,49 @@ func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) erro return nil } +func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + }, nil +} + +func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + Spec: apiappsv1.DeploymentSpec{ + Replicas: int32ToPointer(2), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + }, nil +} + +func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { + return nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonNotFound, + }, + } +} + +func (mock *mockDeploymentNotExist) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + }, nil +} + func (mock *MockServiceGetter) Services(namespace string) corev1.ServiceInterface { return &mockService{} } +func (mock *MockServiceNotExistGetter) Services(namespace string) corev1.ServiceInterface { + return &mockServiceNotExist{} +} + func (mock *mockService) Create(*v1.Service) (*v1.Service, error) { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -288,6 +350,30 @@ func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error { return nil } +func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockServiceNotExist) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { + return nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonNotFound, + }, + } +} + // NewMockKubernetesClient for other tests func NewMockKubernetesClient() KubernetesClient { return KubernetesClient{ @@ -297,3 +383,10 @@ func NewMockKubernetesClient() KubernetesClient { ServicesGetter: &MockServiceGetter{}, } } + +func ClientMissingObjects() KubernetesClient { + return KubernetesClient{ + DeploymentsGetter: &MockDeploymentNotExistGetter{}, + ServicesGetter: &MockServiceNotExistGetter{}, + } +} From 82e9d40587324b01ecd8ca7aca7b1b39d7e091eb Mon Sep 17 00:00:00 2001 From: Dmitrii Dolgov <9erthalion6@gmail.com> Date: Thu, 13 Feb 2020 11:04:11 +0100 Subject: [PATCH 6/9] Add test for both ways to enable connection pool --- pkg/cluster/resources_test.go | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go index a3754c564..7c52addad 100644 --- a/pkg/cluster/resources_test.go +++ b/pkg/cluster/resources_test.go @@ -63,3 +63,41 @@ func TestConnPoolCreationAndDeletion(t *testing.T) { t.Errorf("%s: Cannot delete connection pool, %s", testName, err) } } + +func TestNeedConnPool(t *testing.T) { + testName := "Test how connection pool can be enabled" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100M", + ConnPoolDefaultMemoryLimit: "100M", + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with full definition", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPool: true, + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with flag", + testName) + } +} From 17d077e3709d8681ce5e0e4ba714ce4fa66a9254 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Thu, 13 Feb 2020 11:56:05 +0100 Subject: [PATCH 7/9] add validation for postgresql CRD --- .../crds/operatorconfigurations.yaml | 37 ++++++++++++++++ .../postgres-operator/crds/postgresqls.yaml | 44 +++++++++++++++++++ manifests/postgresql.crd.yaml | 44 +++++++++++++++++++ 3 files changed, 125 insertions(+) diff --git a/charts/postgres-operator/crds/operatorconfigurations.yaml b/charts/postgres-operator/crds/operatorconfigurations.yaml index 9725c2708..c99d4a811 100644 --- a/charts/postgres-operator/crds/operatorconfigurations.yaml +++ b/charts/postgres-operator/crds/operatorconfigurations.yaml @@ -318,6 +318,43 @@ spec: pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' scalyr_server_url: type: string + connection_pool: + type: object + properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_instances_number: + type: integer + #default: 1 + connection_pool_image: + type: string + #default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1" + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_default_cpu_limit: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_cpu_request: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_memory_limit: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100m" + connection_pool_default_memory_request: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100m" status: type: object additionalProperties: diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index b4b676236..8b7de363c 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -106,6 +106,50 @@ spec: uid: format: uuid type: string + connectionPool: + type: object + properties: + dockerImage: + type: string + mode: + type: string + numberOfInstances: + type: integer + minimum: 1 + resources: + type: object + required: + - requests + - limits + properties: + limits: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + requests: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + schema: + type: string + user: + type: string databases: type: object additionalProperties: diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 276bc94b8..7d4bb228b 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -70,6 +70,50 @@ spec: uid: format: uuid type: string + connectionPool: + type: object + properties: + dockerImage: + type: string + mode: + type: string + numberOfInstances: + type: integer + minimum: 1 + resources: + type: object + required: + - requests + - limits + properties: + limits: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + requests: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + schema: + type: string + user: + type: string databases: type: object additionalProperties: From f0ceafa81e32e5369d64bed59d2506840649ee76 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Thu, 13 Feb 2020 12:54:32 +0100 Subject: [PATCH 8/9] reflect connectionPool validation in Go code and publish in manifests and chart --- .../crds/operatorconfigurations.yaml | 38 +++---- .../templates/configmap.yaml | 1 + .../templates/operatorconfiguration.yaml | 2 + charts/postgres-operator/values-crd.yaml | 11 ++ charts/postgres-operator/values.yaml | 12 ++ manifests/configmap.yaml | 9 ++ manifests/operatorconfiguration.crd.yaml | 38 +++---- ...gresql-operator-default-configuration.yaml | 10 ++ pkg/apis/acid.zalan.do/v1/crds.go | 104 ++++++++++++++++++ 9 files changed, 187 insertions(+), 38 deletions(-) diff --git a/charts/postgres-operator/crds/operatorconfigurations.yaml b/charts/postgres-operator/crds/operatorconfigurations.yaml index c99d4a811..a790125d1 100644 --- a/charts/postgres-operator/crds/operatorconfigurations.yaml +++ b/charts/postgres-operator/crds/operatorconfigurations.yaml @@ -321,24 +321,6 @@ spec: connection_pool: type: object properties: - connection_pool_schema: - type: string - #default: "pooler" - connection_pool_user: - type: string - #default: "pooler" - connection_pool_instances_number: - type: integer - #default: 1 - connection_pool_image: - type: string - #default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1" - connection_pool_mode: - type: string - enum: - - "session" - - "transaction" - #default: "transaction" connection_pool_default_cpu_limit: type: string pattern: '^(\d+m|\d+(\.\d{1,3})?)$' @@ -354,7 +336,25 @@ spec: connection_pool_default_memory_request: type: string pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' - #default: "100m" + #default: "100Mi" + connection_pool_image: + type: string + #default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1" + connection_pool_instances_number: + type: integer + #default: 1 + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" status: type: object additionalProperties: diff --git a/charts/postgres-operator/templates/configmap.yaml b/charts/postgres-operator/templates/configmap.yaml index 95eeb9546..634339795 100644 --- a/charts/postgres-operator/templates/configmap.yaml +++ b/charts/postgres-operator/templates/configmap.yaml @@ -20,4 +20,5 @@ data: {{ toYaml .Values.configDebug | indent 2 }} {{ toYaml .Values.configLoggingRestApi | indent 2 }} {{ toYaml .Values.configTeamsApi | indent 2 }} +{{ toYaml .Values.configConnectionPool | indent 2 }} {{- end }} diff --git a/charts/postgres-operator/templates/operatorconfiguration.yaml b/charts/postgres-operator/templates/operatorconfiguration.yaml index 6a301c1fb..55eb8fd4f 100644 --- a/charts/postgres-operator/templates/operatorconfiguration.yaml +++ b/charts/postgres-operator/templates/operatorconfiguration.yaml @@ -34,4 +34,6 @@ configuration: {{ toYaml .Values.configLoggingRestApi | indent 4 }} scalyr: {{ toYaml .Values.configScalyr | indent 4 }} + connection_pool: +{{ toYaml .Values.configConnectionPool | indent 4 }} {{- end }} diff --git a/charts/postgres-operator/values-crd.yaml b/charts/postgres-operator/values-crd.yaml index 1f9b5e495..17b62226a 100644 --- a/charts/postgres-operator/values-crd.yaml +++ b/charts/postgres-operator/values-crd.yaml @@ -261,6 +261,17 @@ configScalyr: # Memory request value for the Scalyr sidecar scalyr_memory_request: 50Mi +configConnectionPool: + connection_pool_default_cpu_limit: "1" + connection_pool_default_cpu_request: "1" + connection_pool_default_memory_limit: 100m + connection_pool_default_memory_request: "100Mi" + # connection_pool_image: "" + connection_pool_instances_number: 1 + connection_pool_mode: "transaction" + # connection_pool_schema: "pooler" + # connection_pool_user: "pooler" + rbac: # Specifies whether RBAC resources should be created create: true diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index 1be5851d2..f11619c8a 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -237,6 +237,18 @@ configTeamsApi: # URL of the Teams API service # teams_api_url: http://fake-teams-api.default.svc.cluster.local +# configure connection pooler deployment created by the operator +configConnectionPool: + connection_pool_default_cpu_limit: "1" + connection_pool_default_cpu_request: "1" + connection_pool_default_memory_limit: 100m + connection_pool_default_memory_request: "100Mi" + # connection_pool_image: "" + connection_pool_instances_number: 1 + connection_pool_mode: "transaction" + # connection_pool_schema: "pooler" + # connection_pool_user: "pooler" + rbac: # Specifies whether RBAC resources should be created create: true diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index d26c83edf..05f22a388 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -11,6 +11,15 @@ data: cluster_history_entries: "1000" cluster_labels: application:spilo cluster_name_label: version + # connection_pool_default_cpu_limit: "1" + # connection_pool_default_cpu_request: "1" + # connection_pool_default_memory_limit: 100m + # connection_pool_default_memory_request: "100Mi" + # connection_pool_image: "" + # connection_pool_instances_number: 1 + # connection_pool_mode: "transaction" + # connection_pool_schema: "pooler" + # connection_pool_user: "pooler" # custom_service_annotations: "keyx:valuez,keya:valuea" # custom_pod_annotations: "keya:valuea,keyb:valueb" db_hosted_zone: db.example.com diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index c44955771..f4224244d 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -297,24 +297,6 @@ spec: connection_pool: type: object properties: - connection_pool_schema: - type: string - #default: "pooler" - connection_pool_user: - type: string - #default: "pooler" - connection_pool_instances_number: - type: integer - #default: 1 - connection_pool_image: - type: string - #default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1" - connection_pool_mode: - type: string - enum: - - "session" - - "transaction" - #default: "transaction" connection_pool_default_cpu_limit: type: string pattern: '^(\d+m|\d+(\.\d{1,3})?)$' @@ -330,7 +312,25 @@ spec: connection_pool_default_memory_request: type: string pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' - #default: "100m" + #default: "100Mi" + connection_pool_image: + type: string + #default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1" + connection_pool_instances_number: + type: integer + #default: 1 + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" status: type: object additionalProperties: diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index efd1a5396..037ae5e35 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -121,3 +121,13 @@ configuration: scalyr_memory_limit: 500Mi scalyr_memory_request: 50Mi # scalyr_server_url: "" + connection_pool: + connection_pool_default_cpu_limit: "1" + connection_pool_default_cpu_request: "1" + connection_pool_default_memory_limit: 100m + connection_pool_default_memory_request: "100Mi" + # connection_pool_image: "" + connection_pool_instances_number: 1 + connection_pool_mode: "transaction" + # connection_pool_schema: "pooler" + # connection_pool_user: "pooler" diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index 4cfc9a9e6..f760d63e5 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -176,6 +176,65 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ }, }, }, + "connectionPool": { + Type: "object", + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "dockerImage": { + Type: "string", + }, + "mode": { + Type: "string", + }, + "numberOfInstances": { + Type: "integer", + Minimum: &min1, + }, + "resources": { + Type: "object", + Required: []string{"requests", "limits"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "limits": { + Type: "object", + Required: []string{"cpu", "memory"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "cpu": { + Type: "string", + Description: "Decimal natural followed by m, or decimal natural followed by dot followed by up to three decimal digits (precision used by Kubernetes). Must be greater than 0", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "memory": { + Type: "string", + Description: "Plain integer or fixed-point integer using one of these suffixes: E, P, T, G, M, k (with or without a tailing i). Must be greater than 0", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + }, + }, + "requests": { + Type: "object", + Required: []string{"cpu", "memory"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "cpu": { + Type: "string", + Description: "Decimal natural followed by m, or decimal natural followed by dot followed by up to three decimal digits (precision used by Kubernetes). Must be greater than 0", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "memory": { + Type: "string", + Description: "Plain integer or fixed-point integer using one of these suffixes: E, P, T, G, M, k (with or without a tailing i). Must be greater than 0", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + }, + }, + }, + }, + "schema": { + Type: "string", + }, + "user": { + Type: "string", + }, + }, + }, "databases": { Type: "object", AdditionalProperties: &apiextv1beta1.JSONSchemaPropsOrBool{ @@ -1037,6 +1096,51 @@ var OperatorConfigCRDResourceValidation = apiextv1beta1.CustomResourceValidation }, }, }, + "connection_pool": { + Type: "object", + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "connection_pool_default_cpu_limit": { + Type: "string", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "connection_pool_default_cpu_request": { + Type: "string", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "connection_pool_default_memory_limit": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "connection_pool_default_memory_request": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "connection_pool_image": { + Type: "string", + }, + "connection_pool_instances_number": { + Type: "integer", + Minimum: &min1, + }, + "connection_pool_mode": { + Type: "string", + Enum: []apiextv1beta1.JSON{ + { + Raw: []byte(`"session"`), + }, + { + Raw: []byte(`"transaction"`), + }, + }, + }, + "connection_pool_schema": { + Type: "string", + }, + "connection_pool_user": { + Type: "string", + }, + }, + }, }, }, "status": { From 2384e1ec10638e1989c0295ffb7fbe96af2e2f1a Mon Sep 17 00:00:00 2001 From: Dmitrii Dolgov <9erthalion6@gmail.com> Date: Thu, 13 Feb 2020 13:21:20 +0100 Subject: [PATCH 9/9] Cleanup configuration Add pool configuration into CRD & charts. Add preliminary documentation. Rename NumberOfInstances to Replicas like in Deployment. Mention couple of potential improvement points for connection pool specification. --- .../crds/operatorconfigurations.yaml | 38 ++++++------- .../postgres-operator/crds/postgresqls.yaml | 16 ++++++ charts/postgres-operator/values-crd.yaml | 24 +++++---- charts/postgres-operator/values.yaml | 22 +++++--- docs/reference/cluster_manifest.md | 27 ++++++++++ docs/reference/operator_parameters.md | 28 ++++++++++ docs/user.md | 53 +++++++++++++++++++ manifests/operatorconfiguration.crd.yaml | 36 ++++++------- manifests/postgresql.crd.yaml | 16 ++++++ .../v1/operator_configuration_type.go | 15 +++--- pkg/apis/acid.zalan.do/v1/postgresql_type.go | 25 +++++---- pkg/cluster/k8sres.go | 4 +- pkg/cluster/sync_test.go | 4 +- pkg/controller/operator_config.go | 12 ++--- pkg/util/config/config.go | 3 +- 15 files changed, 237 insertions(+), 86 deletions(-) diff --git a/charts/postgres-operator/crds/operatorconfigurations.yaml b/charts/postgres-operator/crds/operatorconfigurations.yaml index a790125d1..4c5ebdf66 100644 --- a/charts/postgres-operator/crds/operatorconfigurations.yaml +++ b/charts/postgres-operator/crds/operatorconfigurations.yaml @@ -321,6 +321,24 @@ spec: connection_pool: type: object properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_instances_number: + type: integer + #default: 1 + connection_pool_image: + type: string + #default: "registry.opensource.zalan.do/acid/pgbouncer:1.0.0" + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" connection_pool_default_cpu_limit: type: string pattern: '^(\d+m|\d+(\.\d{1,3})?)$' @@ -336,25 +354,7 @@ spec: connection_pool_default_memory_request: type: string pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' - #default: "100Mi" - connection_pool_image: - type: string - #default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1" - connection_pool_instances_number: - type: integer - #default: 1 - connection_pool_mode: - type: string - enum: - - "session" - - "transaction" - #default: "transaction" - connection_pool_schema: - type: string - #default: "pooler" - connection_pool_user: - type: string - #default: "pooler" + #default: "100m" status: type: object additionalProperties: diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index 8b7de363c..aa4d40b1d 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -242,6 +242,22 @@ spec: type: string replicaLoadBalancer: # deprecated type: boolean + connectionPool: + type: object + properties: + schema: + type: string + user: + type: string + replicas: + type: integer + dockerImage: + type: string + mode: + type: string + enum: + - "session" + - "transaction" resources: type: object required: diff --git a/charts/postgres-operator/values-crd.yaml b/charts/postgres-operator/values-crd.yaml index 17b62226a..cc16a0979 100644 --- a/charts/postgres-operator/values-crd.yaml +++ b/charts/postgres-operator/values-crd.yaml @@ -262,15 +262,21 @@ configScalyr: scalyr_memory_request: 50Mi configConnectionPool: - connection_pool_default_cpu_limit: "1" - connection_pool_default_cpu_request: "1" - connection_pool_default_memory_limit: 100m - connection_pool_default_memory_request: "100Mi" - # connection_pool_image: "" - connection_pool_instances_number: 1 - connection_pool_mode: "transaction" - # connection_pool_schema: "pooler" - # connection_pool_user: "pooler" + # number of pooler instances + connection_pool_replicas: 1 + # db schema to install lookup function into + connection_pool_schema: "pooler" + # db user for pooler to use + connection_pool_user: "pooler" + # docker image + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer" + # default pooling mode + connection_pool_mode: "transaction" + # default resources + connection_pool_default_cpu_request: "100m" + connection_pool_default_memory_request: "100M" + connection_pool_default_cpu_limit: "100m" + connection_pool_default_memory_limit: "100M" rbac: # Specifies whether RBAC resources should be created diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index f11619c8a..1f4cb6f70 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -239,15 +239,21 @@ configTeamsApi: # configure connection pooler deployment created by the operator configConnectionPool: - connection_pool_default_cpu_limit: "1" - connection_pool_default_cpu_request: "1" - connection_pool_default_memory_limit: 100m - connection_pool_default_memory_request: "100Mi" - # connection_pool_image: "" - connection_pool_instances_number: 1 + # number of pooler instances + connection_pool_replicas: 1 + # db schema to install lookup function into + connection_pool_schema: "pooler" + # db user for pooler to use + connection_pool_user: "pooler" + # docker image + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer" + # default pooling mode connection_pool_mode: "transaction" - # connection_pool_schema: "pooler" - # connection_pool_user: "pooler" + # default resources + connection_pool_default_cpu_request: "100m" + connection_pool_default_memory_request: "100M" + connection_pool_default_cpu_limit: "100m" + connection_pool_default_memory_limit: "100M" rbac: # Specifies whether RBAC resources should be created diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 7b049b6fa..a49890d13 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -149,6 +149,11 @@ These parameters are grouped directly under the `spec` key in the manifest. [the reference schedule format](https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/#schedule) into account. Optional. Default is: "30 00 \* \* \*" +* enableConnectionPool + Tells the operator to create a connection pool with a database. If this + field is true, a connection pool deployment will be created even if + `connectionPool` section is empty. + ## Postgres parameters Those parameters are grouped under the `postgresql` top-level key, which is @@ -359,3 +364,25 @@ CPU and memory limits for the sidecar container. * **memory** memory limits for the sidecar container. Optional, overrides the `default_memory_limits` operator configuration parameter. Optional. + +## Connection pool + +Parameters are grouped under the `connectionPool` top-level key and specify +configuration for connection pool. If this section is not empty, a connection +pool will be created for a database even if `enableConnectionPool` is not +present. + +* **replicas** + How many instances of connection pool to create. + +* **mode** + In which mode to run connection pool, transaction or section. + +* **schema** + Schema to create for credentials lookup function. + +* **user** + User to create for connection pool to be able to connect to a database. + +* **resources** + Resource configuration for connection pool deployment. diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index e3893ea31..52d4e66c1 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -592,3 +592,31 @@ scalyr sidecar. In the CRD-based configuration they are grouped under the * **scalyr_memory_limit** Memory limit value for the Scalyr sidecar. The default is `500Mi`. + +## Connection pool configuration + +Parameters are grouped under the `connection_pool` top-level key and specify +default configuration for connection pool, if a postgres manifest requests it +but do not specify some of the parameters. All of them are optional with the +operator being able to provide some reasonable defaults. + +* **connection_pool_replicas** + How many instances of connection pool to create. + +* **connection_pool_schema** + Schema to create for credentials lookup function. + +* **connection_pool_user** + User to create for connection pool to be able to connect to a database. + +* **connection_pool_image** + Docker image to use for connection pool deployment. + +* **connection_pool_mode** + Default pool mode, sesssion or transaction. + +* **connection_pool_default_cpu_request** + **connection_pool_default_memory_reques** + **connection_pool_default_cpu_limit** + **connection_pool_default_memory_limit** + Default resource configuration for connection pool deployment. diff --git a/docs/user.md b/docs/user.md index f81e11ede..47be76773 100644 --- a/docs/user.md +++ b/docs/user.md @@ -454,3 +454,56 @@ monitoring is outside the scope of operator responsibilities. See [configuration reference](reference/cluster_manifest.md) and [administrator documentation](administrator.md) for details on how backups are executed. + +## Connection pool + +The operator can create a database side connection pool for those applications, +where an application side pool is not feasible, but a number of connections is +high. To create a connection pool together with a database, modify the +manifest: + +```yaml +spec: + enableConnectionPool: true +``` + +This will tell the operator to create a connection pool with default +configuration, though which one can access the master via a separate service +`{cluster-name}-pooler`. In most of the cases provided default configuration +should be good enough. + +To configure a new connection pool, specify: + +``` +spec: + connectionPool: + # how many instances of connection pool to create + replicas: 1 + + # in which mode to run, session or transaction + mode: "transaction" + + # schema, which operator will create to install credentials lookup + # function + schema: "pooler" + + # user, which operator will create for connection pool + user: "pooler" + + # resources for each instance + resources: + requests: + cpu: "100m" + memory: "100M" + limits: + cpu: "100m" + memory: "100M" +``` + +By default `pgbouncer` is used to create a connection pool. To find out about +pool modes see [docs](https://www.pgbouncer.org/config.html#pool_mode) (but it +should be general approach between different implementation). + +Note, that using `pgbouncer` means meaningful resource CPU limit should be less +than 1 core (there is a way to utilize more than one, but in K8S it's easier +just to spin up more instances). diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index f4224244d..aa64c6f6d 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -297,6 +297,24 @@ spec: connection_pool: type: object properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_instances_number: + type: integer + #default: 1 + connection_pool_image: + type: string + #default: "registry.opensource.zalan.do/acid/pgbouncer:1.0.0" + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" connection_pool_default_cpu_limit: type: string pattern: '^(\d+m|\d+(\.\d{1,3})?)$' @@ -313,24 +331,6 @@ spec: type: string pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' #default: "100Mi" - connection_pool_image: - type: string - #default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1" - connection_pool_instances_number: - type: integer - #default: 1 - connection_pool_mode: - type: string - enum: - - "session" - - "transaction" - #default: "transaction" - connection_pool_schema: - type: string - #default: "pooler" - connection_pool_user: - type: string - #default: "pooler" status: type: object additionalProperties: diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 7d4bb228b..ff9366421 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -206,6 +206,22 @@ spec: type: string replicaLoadBalancer: # deprecated type: boolean + connectionPool: + type: object + properties: + schema: + type: string + user: + type: string + replicas: + type: integer + dockerImage: + type: string + mode: + type: string + enum: + - "session" + - "transaction" resources: type: object required: diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index 58f171843..dd0822c6a 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -65,12 +65,12 @@ type KubernetesMetaConfiguration struct { // TODO: use a proper toleration structure? PodToleration map[string]string `json:"toleration,omitempty"` // TODO: use namespacedname - PodEnvironmentConfigMap string `json:"pod_environment_configmap,omitempty"` - PodPriorityClassName string `json:"pod_priority_class_name,omitempty"` - MasterPodMoveTimeout Duration `json:"master_pod_move_timeout,omitempty"` - EnablePodAntiAffinity bool `json:"enable_pod_antiaffinity,omitempty"` - PodAntiAffinityTopologyKey string `json:"pod_antiaffinity_topology_key,omitempty"` - PodManagementPolicy string `json:"pod_management_policy,omitempty"` + PodEnvironmentConfigMap string `json:"pod_environment_configmap,omitempty"` + PodPriorityClassName string `json:"pod_priority_class_name,omitempty"` + MasterPodMoveTimeout Duration `json:"master_pod_move_timeout,omitempty"` + EnablePodAntiAffinity bool `json:"enable_pod_antiaffinity,omitempty"` + PodAntiAffinityTopologyKey string `json:"pod_antiaffinity_topology_key,omitempty"` + PodManagementPolicy string `json:"pod_management_policy,omitempty"` } // PostgresPodResourcesDefaults defines the spec of default resources @@ -154,10 +154,9 @@ type ScalyrConfiguration struct { // Defines default configuration for connection pool type ConnectionPoolConfiguration struct { - NumberOfInstances *int32 `json:"connection_pool_instances_number,omitempty"` + Replicas *int32 `json:"connection_pool_replicas,omitempty"` Schema string `json:"connection_pool_schema,omitempty"` User string `json:"connection_pool_user,omitempty"` - Type string `json:"connection_pool_type,omitempty"` Image string `json:"connection_pool_image,omitempty"` Mode string `json:"connection_pool_mode,omitempty"` DefaultCPURequest string `name:"connection_pool_default_cpu_request,omitempty"` diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index c56d70626..e7965f893 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -27,7 +27,7 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` - EnableConnectionPool bool `json:"enable_connection_pool,omitempty"` + EnableConnectionPool bool `json:"enableConnectionPool,omitempty"` ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` TeamID string `json:"teamId"` @@ -159,16 +159,21 @@ type PostgresStatus struct { } // Options for connection pooler +// +// TODO: prepared snippets of configuration, one can choose via type, e.g. +// pgbouncer-large (with higher resources) or odyssey-small (with smaller +// resources) +// Type string `json:"type,omitempty"` +// +// TODO: figure out what other important parameters of the connection pool it +// makes sense to expose. E.g. pool size (min/max boundaries), max client +// connections etc. type ConnectionPool struct { - NumberOfInstances *int32 `json:"instancesNumber,omitempty"` - Schema string `json:"schema,omitempty"` - User string `json:"user,omitempty"` - Mode string `json:"mode,omitempty"` - DockerImage string `json:"dockerImage,omitempty"` - // TODO: prepared snippets of configuration, one can choose via type, e.g. - // pgbouncer-large (with higher resources) or odyssey-small (with smaller - // resources) - // Type string `json:"type,omitempty"` + Replicas *int32 `json:"replicas,omitempty"` + Schema string `json:"schema,omitempty"` + User string `json:"user,omitempty"` + Mode string `json:"mode,omitempty"` + DockerImage string `json:"dockerImage,omitempty"` Resources `json:"resources,omitempty"` } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 3121691d9..b9f8e1992 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1861,9 +1861,9 @@ func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( *appsv1.Deployment, error) { podTemplate, err := c.generateConnPoolPodTemplate(spec) - numberOfInstances := spec.ConnectionPool.NumberOfInstances + numberOfInstances := spec.ConnectionPool.Replicas if numberOfInstances == nil { - numberOfInstances = c.OpConfig.ConnectionPool.NumberOfInstances + numberOfInstances = c.OpConfig.ConnectionPool.Replicas } if err != nil { diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index c6928a64e..f5887dede 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -99,14 +99,14 @@ func TestConnPoolSynchronization(t *testing.T) { oldSpec: &acidv1.Postgresql{ Spec: acidv1.PostgresSpec{ ConnectionPool: &acidv1.ConnectionPool{ - NumberOfInstances: int32ToPointer(1), + Replicas: int32ToPointer(1), }, }, }, newSpec: &acidv1.Postgresql{ Spec: acidv1.PostgresSpec{ ConnectionPool: &acidv1.ConnectionPool{ - NumberOfInstances: int32ToPointer(2), + Replicas: int32ToPointer(2), }, }, }, diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index 1748fbd1f..a4a32abba 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -146,13 +146,13 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur // Connection pool. Looks like we can't use defaulting in CRD before 1.17, // so ensure default values here. - result.ConnectionPool.NumberOfInstances = fromCRD.ConnectionPool.NumberOfInstances - if result.ConnectionPool.NumberOfInstances == nil || - *result.ConnectionPool.NumberOfInstances < 1 { + result.ConnectionPool.Replicas = fromCRD.ConnectionPool.Replicas + if result.ConnectionPool.Replicas == nil || + *result.ConnectionPool.Replicas < 1 { var value int32 value = 1 - result.ConnectionPool.NumberOfInstances = &value + result.ConnectionPool.Replicas = &value } result.ConnectionPool.Schema = util.Coalesce( @@ -163,10 +163,6 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur fromCRD.ConnectionPool.User, constants.ConnectionPoolUserName) - result.ConnectionPool.Type = util.Coalesce( - fromCRD.ConnectionPool.Type, - constants.ConnectionPoolDefaultType) - result.ConnectionPool.Image = util.Coalesce( fromCRD.ConnectionPool.Image, "pgbouncer:0.0.1") diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 2baf99931..b04206d53 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -85,10 +85,9 @@ type LogicalBackup struct { // Operator options for connection pooler type ConnectionPool struct { - NumberOfInstances *int32 `name:"connection_pool_instances_number" default:"1"` + Replicas *int32 `name:"connection_pool_replicas" default:"1"` Schema string `name:"connection_pool_schema" default:"pooler"` User string `name:"connection_pool_user" default:"pooler"` - Type string `name:"connection_pool_type" default:"pgbouncer"` Image string `name:"connection_pool_image" default:"pgbouncer:1.0"` Mode string `name:"connection_pool_mode" default:"session"` ConnPoolDefaultCPURequest string `name:"connection_pool_default_cpu_request" default:"100m"`