diff --git a/docker/DebugDockerfile b/docker/DebugDockerfile index 76dadf6df..0c11fe3b4 100644 --- a/docker/DebugDockerfile +++ b/docker/DebugDockerfile @@ -3,8 +3,17 @@ MAINTAINER Team ACID @ Zalando # We need root certificates to deal with teams api over https RUN apk --no-cache add ca-certificates go git musl-dev -RUN go get github.com/derekparker/delve/cmd/dlv COPY build/* / -CMD ["/root/go/bin/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"] +RUN addgroup -g 1000 pgo +RUN adduser -D -u 1000 -G pgo -g 'Postgres Operator' pgo + +RUN go get github.com/derekparker/delve/cmd/dlv +RUN cp /root/go/bin/dlv /dlv +RUN chown -R pgo:pgo /dlv + +USER pgo:pgo +RUN ls -l / + +CMD ["/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"] diff --git a/manifests/minimal-postgres-manifest.yaml b/manifests/minimal-postgres-manifest.yaml index 75dfdf07f..ff7785cec 100644 --- a/manifests/minimal-postgres-manifest.yaml +++ b/manifests/minimal-postgres-manifest.yaml @@ -17,3 +17,5 @@ spec: foo: zalando # dbname: owner postgresql: version: "11" + connectionPool: + type: "pgbouncer" diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index a37abe476..d7cd6fd74 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -119,6 +119,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 07b42d4d4..c50d9c902 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -27,6 +27,8 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` + ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` + TeamID string `json:"teamId"` DockerImage string `json:"dockerImage,omitempty"` @@ -154,3 +156,13 @@ type UserFlags []string type PostgresStatus struct { PostgresClusterStatus string `json:"PostgresClusterStatus"` } + +// Options for connection pooler +type ConnectionPool struct { + NumberOfInstances *int32 `json:"instancesNumber,omitempty"` + Schema *string `json:"schema,omitempty"` + User *string `json:"user,omitempty"` + Type *string `json:"type,omitempty"` + Mode *string `json:"mode,omitempty"` + PodTemplate *v1.PodTemplateSpec `json:"podTemplate,omitempty"` +} diff --git a/pkg/apis/acid.zalan.do/v1/register.go b/pkg/apis/acid.zalan.do/v1/register.go index 1c30e35fb..34def209d 100644 --- a/pkg/apis/acid.zalan.do/v1/register.go +++ b/pkg/apis/acid.zalan.do/v1/register.go @@ -10,7 +10,8 @@ import ( // APIVersion of the `postgresql` and `operator` CRDs const ( - APIVersion = "v1" + APIVersion = "v1" + PostgresqlKind = "postgresql" ) var ( @@ -42,7 +43,7 @@ func addKnownTypes(scheme *runtime.Scheme) error { // AddKnownType assumes derives the type kind from the type name, which is always uppercase. // For our CRDs we use lowercase names historically, therefore we have to supply the name separately. // TODO: User uppercase CRDResourceKind of our types in the next major API version - scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("postgresql"), &Postgresql{}) + scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind(PostgresqlKind), &Postgresql{}) scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("postgresqlList"), &PostgresqlList{}) scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("OperatorConfiguration"), &OperatorConfiguration{}) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index c560c4cdf..1681e7d2e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -48,11 +48,17 @@ type Config struct { PodServiceAccountRoleBinding *rbacv1beta1.RoleBinding } +type ConnectionPoolResources struct { + Deployment *appsv1.Deployment + Service *v1.Service +} + type kubeResources struct { Services map[PostgresRole]*v1.Service Endpoints map[PostgresRole]*v1.Endpoints Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet + ConnectionPool *ConnectionPoolResources PodDisruptionBudget *policybeta1.PodDisruptionBudget //Pods are treated separately //PVCs are treated separately @@ -184,7 +190,8 @@ func (c *Cluster) isNewCluster() bool { func (c *Cluster) initUsers() error { c.setProcessName("initializing users") - // clear our the previous state of the cluster users (in case we are running a sync). + // clear our the previous state of the cluster users (in case we are + // running a sync). c.systemUsers = map[string]spec.PgUser{} c.pgUsers = map[string]spec.PgUser{} @@ -292,8 +299,10 @@ func (c *Cluster) Create() error { } c.logger.Infof("pods are ready") - // create database objects unless we are running without pods or disabled that feature explicitly + // create database objects unless we are running without pods or disabled + // that feature explicitly if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) { + c.logger.Infof("Create roles") if err = c.createRoles(); err != nil { return fmt.Errorf("could not create users: %v", err) } @@ -316,6 +325,26 @@ func (c *Cluster) Create() error { c.logger.Errorf("could not list resources: %v", err) } + // Create connection pool deployment and services if necessary. Since we + // need to peform some operations with the database itself (e.g. install + // lookup function), do it as the last step, when everything is available. + // + // Do not consider connection pool as a strict requirement, and if + // something fails, report warning + if c.needConnectionPool() { + if c.ConnectionPool != nil { + c.logger.Warning("Connection pool already exists in the cluster") + return nil + } + connPool, err := c.createConnectionPool() + if err != nil { + c.logger.Warningf("could not create connection pool: %v", err) + return nil + } + c.logger.Infof("connection pool %q has been successfully created", + util.NameFromMeta(connPool.Deployment.ObjectMeta)) + } + return nil } @@ -745,6 +774,12 @@ func (c *Cluster) Delete() { c.logger.Warningf("could not remove leftover patroni objects; %v", err) } + // Delete connection pool objects anyway, even if it's not mentioned in the + // manifest, just to not keep orphaned components in case if something went + // wrong + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } } //NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status). @@ -811,6 +846,22 @@ func (c *Cluster) initSystemUsers() { Name: c.OpConfig.ReplicationUsername, Password: util.RandomPassword(constants.PasswordLength), } + + // Connection pool user is an exception, if requested it's going to be + // created by operator as a normal pgUser + if c.needConnectionPool() { + + username := c.Spec.ConnectionPool.User + if username == nil { + username = &c.OpConfig.ConnectionPool.User + } + + c.systemUsers[constants.ConnectionPoolUserKeyName] = spec.PgUser{ + Origin: spec.RoleConnectionPool, + Name: *username, + Password: util.RandomPassword(constants.PasswordLength), + } + } } func (c *Cluster) initRobotUsers() error { diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 07ea011a6..1b74bd6b6 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -1,10 +1,12 @@ package cluster import ( + "bytes" "database/sql" "fmt" "net" "strings" + "text/template" "time" "github.com/lib/pq" @@ -28,6 +30,25 @@ const ( getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` + connectionPoolLookup = ` + CREATE SCHEMA IF NOT EXISTS {{.pool_schema}}; + + CREATE OR REPLACE FUNCTION {{.pool_schema}}.user_lookup( + in i_username text, out uname text, out phash text) + RETURNS record AS $$ + BEGIN + SELECT usename, passwd FROM pg_catalog.pg_shadow + WHERE usename = i_username INTO uname, phash; + RETURN; + END; + $$ LANGUAGE plpgsql SECURITY DEFINER; + + REVOKE ALL ON FUNCTION {{.pool_schema}}.user_lookup(text) + FROM public, {{.pool_schema}}; + GRANT EXECUTE ON FUNCTION {{.pool_schema}}.user_lookup(text) + TO {{.pool_user}}; + GRANT USAGE ON SCHEMA {{.pool_schema}} TO {{.pool_user}}; + ` ) func (c *Cluster) pgConnectionString() string { @@ -243,3 +264,49 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin return result } + +// Creates a connection pool credentials lookup function in every database to +// perform remote authentification. +func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error { + var stmtBytes bytes.Buffer + + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init database connection") + } + defer func() { + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + + currentDatabases, err := c.getDatabases() + if err != nil { + msg := "could not get databases to install pool lookup function: %v" + return fmt.Errorf(msg, err) + } + + templater := template.Must(template.New("sql").Parse(connectionPoolLookup)) + + for dbname, _ := range currentDatabases { + c.logger.Infof("Install pool lookup function into %s", dbname) + + params := TemplateParams{ + "pool_schema": poolSchema, + "pool_user": poolUser, + } + + if err := templater.Execute(&stmtBytes, params); err != nil { + return fmt.Errorf("could not prepare sql statement %+v: %v", + params, err) + } + + if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { + return fmt.Errorf("could not execute sql statement %s: %v", + stmtBytes.String(), err) + } + + c.logger.Infof("Pool lookup function installed into %s", dbname) + } + + return nil +} diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index e6561e0f3..7d6b9be07 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -31,6 +31,8 @@ const ( patroniPGParametersParameterName = "parameters" patroniPGHBAConfParameterName = "pg_hba" localHost = "127.0.0.1/32" + connectionPoolContainer = "connection-pool" + pgPort = 5432 ) type pgUser struct { @@ -66,6 +68,10 @@ func (c *Cluster) statefulSetName() string { return c.Name } +func (c *Cluster) connPoolName() string { + return c.Name + "-pooler" +} + func (c *Cluster) endpointName(role PostgresRole) string { name := c.Name if role == Replica { @@ -84,6 +90,28 @@ func (c *Cluster) serviceName(role PostgresRole) string { return name } +func (c *Cluster) serviceAddress(role PostgresRole) string { + service, exist := c.Services[role] + + if exist { + return service.ObjectMeta.Name + } + + c.logger.Warningf("No service for role %s", role) + return "" +} + +func (c *Cluster) servicePort(role PostgresRole) string { + service, exist := c.Services[role] + + if exist { + return fmt.Sprint(service.Spec.Ports[0].Port) + } + + c.logger.Warningf("No service for role %s", role) + return "" +} + func (c *Cluster) podDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } @@ -315,7 +343,11 @@ func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]stri return *tolerationsSpec } - if len(podToleration["key"]) > 0 || len(podToleration["operator"]) > 0 || len(podToleration["value"]) > 0 || len(podToleration["effect"]) > 0 { + if len(podToleration["key"]) > 0 || + len(podToleration["operator"]) > 0 || + len(podToleration["value"]) > 0 || + len(podToleration["effect"]) > 0 { + return []v1.Toleration{ { Key: podToleration["key"], @@ -1669,3 +1701,185 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar { func (c *Cluster) getLogicalBackupJobName() (jobName string) { return "logical-backup-" + c.clusterName().Name } + +func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( + *v1.PodTemplateSpec, error) { + + podTemplate := spec.ConnectionPool.PodTemplate + + if podTemplate == nil { + gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) + resources, err := generateResourceRequirements( + c.Spec.Resources, + c.makeDefaultResources()) + + effectiveMode := spec.ConnectionPool.Mode + if effectiveMode == nil { + effectiveMode = &c.OpConfig.ConnectionPool.Mode + } + + if err != nil { + return nil, fmt.Errorf("could not generate resource requirements: %v", err) + } + + secretSelector := func(key string) *v1.SecretKeySelector { + return &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: c.credentialSecretName(c.OpConfig.SuperUsername), + }, + Key: key, + } + } + + envVars := []v1.EnvVar{ + { + Name: "PGHOST", + Value: c.serviceAddress(Master), + }, + { + Name: "PGPORT", + Value: c.servicePort(Master), + }, + { + Name: "PGUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + // the convention is to use the same schema name as + // connection pool username + { + Name: "PGSCHEMA", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + { + Name: "PGPASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("password"), + }, + }, + { + Name: "CONNECTION_POOL_MODE", + Value: *effectiveMode, + }, + { + Name: "CONNECTION_POOL_PORT", + Value: fmt.Sprint(pgPort), + }, + } + + poolerContainer := v1.Container{ + Name: connectionPoolContainer, + Image: c.OpConfig.ConnectionPool.Image, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: *resources, + Ports: []v1.ContainerPort{ + { + ContainerPort: pgPort, + Protocol: v1.ProtocolTCP, + }, + }, + Env: envVars, + } + + podTemplate = &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: c.connPoolLabelsSelector().MatchLabels, + Namespace: c.Namespace, + Annotations: c.generatePodAnnotations(spec), + }, + Spec: v1.PodSpec{ + ServiceAccountName: c.OpConfig.PodServiceAccountName, + TerminationGracePeriodSeconds: &gracePeriod, + Containers: []v1.Container{poolerContainer}, + // TODO: add tolerations to scheduler pooler on the same node + // as database + //Tolerations: *tolerationsSpec, + }, + } + } + + return podTemplate, nil +} + +func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( + *appsv1.Deployment, error) { + + podTemplate, err := c.generateConnPoolPodTemplate(spec) + numberOfInstances := spec.ConnectionPool.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = c.OpConfig.ConnectionPool.NumberOfInstances + } + + if err != nil { + return nil, err + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connPoolName(), + Namespace: c.Namespace, + Labels: c.labelsSet(true), + Annotations: map[string]string{}, + // make Postgresql CRD object its owner, so that if CRD object is + // deleted, this object will be deleted even if something went + // wrong and operator didn't deleted it. + OwnerReferences: []metav1.OwnerReference{ + { + UID: c.Statefulset.ObjectMeta.UID, + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: c.Statefulset.ObjectMeta.Name, + }, + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: numberOfInstances, + Selector: c.connPoolLabelsSelector(), + Template: *podTemplate, + }, + } + + return deployment, nil +} + +func (c *Cluster) generateConnPoolService(spec *acidv1.PostgresSpec) *v1.Service { + serviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: c.connPoolName(), + Port: pgPort, + TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, + }, + }, + Type: v1.ServiceTypeClusterIP, + Selector: map[string]string{ + "connection-pool": c.connPoolName(), + }, + } + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connPoolName(), + Namespace: c.Namespace, + Labels: c.labelsSet(true), + Annotations: map[string]string{}, + // make Postgresql CRD object its owner, so that if CRD object is + // deleted, this object will be deleted even if something went + // wrong and operator didn't deleted it. + OwnerReferences: []metav1.OwnerReference{ + { + UID: c.Postgresql.ObjectMeta.UID, + APIVersion: acidv1.APIVersion, + Kind: acidv1.PostgresqlKind, + Name: c.Postgresql.ObjectMeta.Name, + }, + }, + }, + Spec: serviceSpec, + } + + return service +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index c94a7bb46..7baa96c02 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -90,6 +90,102 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) { return statefulSet, nil } +// Prepare the database for connection pool to be used, i.e. install lookup +// function (do it first, because it should be fast and if it didn't succeed, +// it doesn't makes sense to create more K8S objects. At this moment we assume +// that necessary connection pool user exists. +// +// After that create all the objects for connection pool, namely a deployment +// with a chosen pooler and a service to expose it. +func (c *Cluster) createConnectionPool() (*ConnectionPoolResources, error) { + var msg string + c.setProcessName("creating connection pool") + + err := c.installLookupFunction( + c.OpConfig.ConnectionPool.Schema, + c.OpConfig.ConnectionPool.User) + + if err != nil { + msg = "could not prepare database for connection pool: %v" + return nil, fmt.Errorf(msg, err) + } + + deploymentSpec, err := c.generateConnPoolDeployment(&c.Spec) + if err != nil { + msg = "could not generate deployment for connection pool: %v" + return nil, fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(deploymentSpec) + + if err != nil { + return nil, err + } + + serviceSpec := c.generateConnPoolService(&c.Spec) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(serviceSpec) + + if err != nil { + return nil, err + } + + c.ConnectionPool = &ConnectionPoolResources{ + Deployment: deployment, + Service: service, + } + c.logger.Debugf("created new connection pool %q, uid: %q", + util.NameFromMeta(deployment.ObjectMeta), deployment.UID) + + return c.ConnectionPool, nil +} + +func (c *Cluster) deleteConnectionPool() (err error) { + c.setProcessName("deleting connection pool") + c.logger.Debugln("deleting connection pool") + + // Lack of connection pooler objects is not a fatal error, just log it if + // it was present before in the manifest + if c.needConnectionPool() && c.ConnectionPool == nil { + c.logger.Infof("No connection pool to delete") + return nil + } + + deployment := c.ConnectionPool.Deployment + err = c.KubeClient. + Deployments(deployment.Namespace). + Delete(deployment.Name, c.deleteOptions) + + if !k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pool deployment was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete deployment: %v", err) + } + + c.logger.Infof("Connection pool deployment %q has been deleted", + util.NameFromMeta(deployment.ObjectMeta)) + + service := c.ConnectionPool.Service + err = c.KubeClient. + Services(service.Namespace). + Delete(service.Name, c.deleteOptions) + + if !k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pool service was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete service: %v", err) + } + + c.logger.Infof("Connection pool service %q has been deleted", + util.NameFromMeta(deployment.ObjectMeta)) + + c.ConnectionPool = nil + return nil +} + func getPodIndex(podName string) (int32, error) { parts := strings.Split(podName, "-") if len(parts) == 0 { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index fa4fc9ec1..5ee827d4b 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -456,6 +456,12 @@ func (c *Cluster) syncRoles() (err error) { for _, u := range c.pgUsers { userNames = append(userNames, u.Name) } + + // An exception from system users, connection pool user + connPoolUser := c.systemUsers[constants.ConnectionPoolUserKeyName] + userNames = append(userNames, connPoolUser.Name) + c.pgUsers[connPoolUser.Name] = connPoolUser + dbUsers, err = c.readPgUsersFromDatabase(userNames) if err != nil { return fmt.Errorf("error getting users from the database: %v", err) diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 138b7015c..286505621 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -69,3 +69,5 @@ type ClusterStatus struct { Spec acidv1.PostgresSpec Error error } + +type TemplateParams map[string]interface{} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 8c02fed2e..d5f9c744f 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -408,7 +408,19 @@ func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set { } func (c *Cluster) labelsSelector() *metav1.LabelSelector { - return &metav1.LabelSelector{MatchLabels: c.labelsSet(false), MatchExpressions: nil} + return &metav1.LabelSelector{ + MatchLabels: c.labelsSet(false), + MatchExpressions: nil, + } +} + +func (c *Cluster) connPoolLabelsSelector() *metav1.LabelSelector { + return &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "connection-pool": c.connPoolName(), + }, + MatchExpressions: nil, + } } func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) labels.Set { @@ -483,3 +495,7 @@ func (c *Cluster) GetSpec() (*acidv1.Postgresql, error) { func (c *Cluster) patroniUsesKubernetes() bool { return c.OpConfig.EtcdHost == "" } + +func (c *Cluster) needConnectionPool() bool { + return c.Spec.ConnectionPool != nil +} diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 3e6bec8db..6f071c44a 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -23,13 +23,15 @@ const fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespa // RoleOrigin contains the code of the origin of a role type RoleOrigin int -// The rolesOrigin constant values must be sorted by the role priority for resolveNameConflict(...) to work. +// The rolesOrigin constant values must be sorted by the role priority for +// resolveNameConflict(...) to work. const ( RoleOriginUnknown RoleOrigin = iota RoleOriginManifest RoleOriginInfrastructure RoleOriginTeamsAPI RoleOriginSystem + RoleConnectionPool ) type syncUserOperation int diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index e4e429abb..a7a522566 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -83,6 +83,16 @@ type LogicalBackup struct { LogicalBackupS3SSE string `name:"logical_backup_s3_sse" default:"AES256"` } +// Operator options for connection pooler +type ConnectionPool struct { + NumberOfInstances *int32 `name:"connection_pool_instances_number" default:"1"` + Schema string `name:"connection_pool_schema" default:"pooler"` + User string `name:"connection_pool_user" default:"pooler"` + Type string `name:"connection_pool_type" default:"pgbouncer"` + Image string `name:"connection_pool_image" default:"pgbouncer:1.0"` + Mode string `name:"connection_pool_mode" default:"session"` +} + // Config describes operator config type Config struct { CRD @@ -90,6 +100,7 @@ type Config struct { Auth Scalyr LogicalBackup + ConnectionPool WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to' EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS diff --git a/pkg/util/constants/roles.go b/pkg/util/constants/roles.go index 2c20d69db..3d201142c 100644 --- a/pkg/util/constants/roles.go +++ b/pkg/util/constants/roles.go @@ -2,15 +2,16 @@ package constants // Roles specific constants const ( - PasswordLength = 64 - SuperuserKeyName = "superuser" - ReplicationUserKeyName = "replication" - RoleFlagSuperuser = "SUPERUSER" - RoleFlagInherit = "INHERIT" - RoleFlagLogin = "LOGIN" - RoleFlagNoLogin = "NOLOGIN" - RoleFlagCreateRole = "CREATEROLE" - RoleFlagCreateDB = "CREATEDB" - RoleFlagReplication = "REPLICATION" - RoleFlagByPassRLS = "BYPASSRLS" + PasswordLength = 64 + SuperuserKeyName = "superuser" + ConnectionPoolUserKeyName = "pooler" + ReplicationUserKeyName = "replication" + RoleFlagSuperuser = "SUPERUSER" + RoleFlagInherit = "INHERIT" + RoleFlagLogin = "LOGIN" + RoleFlagNoLogin = "NOLOGIN" + RoleFlagCreateRole = "CREATEROLE" + RoleFlagCreateDB = "CREATEDB" + RoleFlagReplication = "REPLICATION" + RoleFlagByPassRLS = "BYPASSRLS" ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 118d1df53..672d94634 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -40,6 +40,7 @@ type KubernetesClient struct { corev1.NamespacesGetter corev1.ServiceAccountsGetter appsv1.StatefulSetsGetter + appsv1.DeploymentsGetter rbacv1beta1.RoleBindingsGetter policyv1beta1.PodDisruptionBudgetsGetter apiextbeta1.CustomResourceDefinitionsGetter @@ -102,6 +103,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.NodesGetter = client.CoreV1() kubeClient.NamespacesGetter = client.CoreV1() kubeClient.StatefulSetsGetter = client.AppsV1() + kubeClient.DeploymentsGetter = client.AppsV1() kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1() kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RoleBindingsGetter = client.RbacV1beta1()