WIP Connection pooler support

Add an initial support for a connection pooler. The idea is to make it
generic enough to be able to switch a corresponding docker image to
change from pgbouncer to e.g. odyssey. Operator needs to create a
deployment with pooler and a service for it to access.
This commit is contained in:
Dmitrii Dolgov 2020-01-14 15:36:30 +01:00
parent 00f00af2e8
commit fb43ee92d6
16 changed files with 513 additions and 20 deletions

View File

@ -3,8 +3,17 @@ MAINTAINER Team ACID @ Zalando <team-acid@zalando.de>
# We need root certificates to deal with teams api over https
RUN apk --no-cache add ca-certificates go git musl-dev
RUN go get github.com/derekparker/delve/cmd/dlv
COPY build/* /
CMD ["/root/go/bin/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"]
RUN addgroup -g 1000 pgo
RUN adduser -D -u 1000 -G pgo -g 'Postgres Operator' pgo
RUN go get github.com/derekparker/delve/cmd/dlv
RUN cp /root/go/bin/dlv /dlv
RUN chown -R pgo:pgo /dlv
USER pgo:pgo
RUN ls -l /
CMD ["/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"]

View File

@ -17,3 +17,5 @@ spec:
foo: zalando # dbname: owner
postgresql:
version: "11"
connectionPool:
type: "pgbouncer"

View File

@ -119,6 +119,7 @@ rules:
- apps
resources:
- statefulsets
- deployments
verbs:
- create
- delete

View File

@ -27,6 +27,8 @@ type PostgresSpec struct {
Patroni `json:"patroni,omitempty"`
Resources `json:"resources,omitempty"`
ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"`
TeamID string `json:"teamId"`
DockerImage string `json:"dockerImage,omitempty"`
@ -154,3 +156,13 @@ type UserFlags []string
type PostgresStatus struct {
PostgresClusterStatus string `json:"PostgresClusterStatus"`
}
// Options for connection pooler
type ConnectionPool struct {
NumberOfInstances *int32 `json:"instancesNumber,omitempty"`
Schema *string `json:"schema,omitempty"`
User *string `json:"user,omitempty"`
Type *string `json:"type,omitempty"`
Mode *string `json:"mode,omitempty"`
PodTemplate *v1.PodTemplateSpec `json:"podTemplate,omitempty"`
}

View File

@ -11,6 +11,7 @@ import (
// APIVersion of the `postgresql` and `operator` CRDs
const (
APIVersion = "v1"
PostgresqlKind = "postgresql"
)
var (
@ -42,7 +43,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
// AddKnownType assumes derives the type kind from the type name, which is always uppercase.
// For our CRDs we use lowercase names historically, therefore we have to supply the name separately.
// TODO: User uppercase CRDResourceKind of our types in the next major API version
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("postgresql"), &Postgresql{})
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind(PostgresqlKind), &Postgresql{})
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("postgresqlList"), &PostgresqlList{})
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("OperatorConfiguration"),
&OperatorConfiguration{})

View File

@ -48,11 +48,17 @@ type Config struct {
PodServiceAccountRoleBinding *rbacv1beta1.RoleBinding
}
type ConnectionPoolResources struct {
Deployment *appsv1.Deployment
Service *v1.Service
}
type kubeResources struct {
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
ConnectionPool *ConnectionPoolResources
PodDisruptionBudget *policybeta1.PodDisruptionBudget
//Pods are treated separately
//PVCs are treated separately
@ -184,7 +190,8 @@ func (c *Cluster) isNewCluster() bool {
func (c *Cluster) initUsers() error {
c.setProcessName("initializing users")
// clear our the previous state of the cluster users (in case we are running a sync).
// clear our the previous state of the cluster users (in case we are
// running a sync).
c.systemUsers = map[string]spec.PgUser{}
c.pgUsers = map[string]spec.PgUser{}
@ -292,8 +299,10 @@ func (c *Cluster) Create() error {
}
c.logger.Infof("pods are ready")
// create database objects unless we are running without pods or disabled that feature explicitly
// create database objects unless we are running without pods or disabled
// that feature explicitly
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
c.logger.Infof("Create roles")
if err = c.createRoles(); err != nil {
return fmt.Errorf("could not create users: %v", err)
}
@ -316,6 +325,26 @@ func (c *Cluster) Create() error {
c.logger.Errorf("could not list resources: %v", err)
}
// Create connection pool deployment and services if necessary. Since we
// need to peform some operations with the database itself (e.g. install
// lookup function), do it as the last step, when everything is available.
//
// Do not consider connection pool as a strict requirement, and if
// something fails, report warning
if c.needConnectionPool() {
if c.ConnectionPool != nil {
c.logger.Warning("Connection pool already exists in the cluster")
return nil
}
connPool, err := c.createConnectionPool()
if err != nil {
c.logger.Warningf("could not create connection pool: %v", err)
return nil
}
c.logger.Infof("connection pool %q has been successfully created",
util.NameFromMeta(connPool.Deployment.ObjectMeta))
}
return nil
}
@ -745,6 +774,12 @@ func (c *Cluster) Delete() {
c.logger.Warningf("could not remove leftover patroni objects; %v", err)
}
// Delete connection pool objects anyway, even if it's not mentioned in the
// manifest, just to not keep orphaned components in case if something went
// wrong
if err := c.deleteConnectionPool(); err != nil {
c.logger.Warningf("could not remove connection pool: %v", err)
}
}
//NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status).
@ -811,6 +846,22 @@ func (c *Cluster) initSystemUsers() {
Name: c.OpConfig.ReplicationUsername,
Password: util.RandomPassword(constants.PasswordLength),
}
// Connection pool user is an exception, if requested it's going to be
// created by operator as a normal pgUser
if c.needConnectionPool() {
username := c.Spec.ConnectionPool.User
if username == nil {
username = &c.OpConfig.ConnectionPool.User
}
c.systemUsers[constants.ConnectionPoolUserKeyName] = spec.PgUser{
Origin: spec.RoleConnectionPool,
Name: *username,
Password: util.RandomPassword(constants.PasswordLength),
}
}
}
func (c *Cluster) initRobotUsers() error {

View File

@ -1,10 +1,12 @@
package cluster
import (
"bytes"
"database/sql"
"fmt"
"net"
"strings"
"text/template"
"time"
"github.com/lib/pq"
@ -28,6 +30,25 @@ const (
getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;`
createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";`
alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";`
connectionPoolLookup = `
CREATE SCHEMA IF NOT EXISTS {{.pool_schema}};
CREATE OR REPLACE FUNCTION {{.pool_schema}}.user_lookup(
in i_username text, out uname text, out phash text)
RETURNS record AS $$
BEGIN
SELECT usename, passwd FROM pg_catalog.pg_shadow
WHERE usename = i_username INTO uname, phash;
RETURN;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
REVOKE ALL ON FUNCTION {{.pool_schema}}.user_lookup(text)
FROM public, {{.pool_schema}};
GRANT EXECUTE ON FUNCTION {{.pool_schema}}.user_lookup(text)
TO {{.pool_user}};
GRANT USAGE ON SCHEMA {{.pool_schema}} TO {{.pool_user}};
`
)
func (c *Cluster) pgConnectionString() string {
@ -243,3 +264,49 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin
return result
}
// Creates a connection pool credentials lookup function in every database to
// perform remote authentification.
func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
var stmtBytes bytes.Buffer
if err := c.initDbConn(); err != nil {
return fmt.Errorf("could not init database connection")
}
defer func() {
if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection: %v", err)
}
}()
currentDatabases, err := c.getDatabases()
if err != nil {
msg := "could not get databases to install pool lookup function: %v"
return fmt.Errorf(msg, err)
}
templater := template.Must(template.New("sql").Parse(connectionPoolLookup))
for dbname, _ := range currentDatabases {
c.logger.Infof("Install pool lookup function into %s", dbname)
params := TemplateParams{
"pool_schema": poolSchema,
"pool_user": poolUser,
}
if err := templater.Execute(&stmtBytes, params); err != nil {
return fmt.Errorf("could not prepare sql statement %+v: %v",
params, err)
}
if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil {
return fmt.Errorf("could not execute sql statement %s: %v",
stmtBytes.String(), err)
}
c.logger.Infof("Pool lookup function installed into %s", dbname)
}
return nil
}

View File

@ -31,6 +31,8 @@ const (
patroniPGParametersParameterName = "parameters"
patroniPGHBAConfParameterName = "pg_hba"
localHost = "127.0.0.1/32"
connectionPoolContainer = "connection-pool"
pgPort = 5432
)
type pgUser struct {
@ -66,6 +68,10 @@ func (c *Cluster) statefulSetName() string {
return c.Name
}
func (c *Cluster) connPoolName() string {
return c.Name + "-pooler"
}
func (c *Cluster) endpointName(role PostgresRole) string {
name := c.Name
if role == Replica {
@ -84,6 +90,28 @@ func (c *Cluster) serviceName(role PostgresRole) string {
return name
}
func (c *Cluster) serviceAddress(role PostgresRole) string {
service, exist := c.Services[role]
if exist {
return service.ObjectMeta.Name
}
c.logger.Warningf("No service for role %s", role)
return ""
}
func (c *Cluster) servicePort(role PostgresRole) string {
service, exist := c.Services[role]
if exist {
return fmt.Sprint(service.Spec.Ports[0].Port)
}
c.logger.Warningf("No service for role %s", role)
return ""
}
func (c *Cluster) podDisruptionBudgetName() string {
return c.OpConfig.PDBNameFormat.Format("cluster", c.Name)
}
@ -315,7 +343,11 @@ func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]stri
return *tolerationsSpec
}
if len(podToleration["key"]) > 0 || len(podToleration["operator"]) > 0 || len(podToleration["value"]) > 0 || len(podToleration["effect"]) > 0 {
if len(podToleration["key"]) > 0 ||
len(podToleration["operator"]) > 0 ||
len(podToleration["value"]) > 0 ||
len(podToleration["effect"]) > 0 {
return []v1.Toleration{
{
Key: podToleration["key"],
@ -1669,3 +1701,185 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar {
func (c *Cluster) getLogicalBackupJobName() (jobName string) {
return "logical-backup-" + c.clusterName().Name
}
func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) (
*v1.PodTemplateSpec, error) {
podTemplate := spec.ConnectionPool.PodTemplate
if podTemplate == nil {
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
resources, err := generateResourceRequirements(
c.Spec.Resources,
c.makeDefaultResources())
effectiveMode := spec.ConnectionPool.Mode
if effectiveMode == nil {
effectiveMode = &c.OpConfig.ConnectionPool.Mode
}
if err != nil {
return nil, fmt.Errorf("could not generate resource requirements: %v", err)
}
secretSelector := func(key string) *v1.SecretKeySelector {
return &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: c.credentialSecretName(c.OpConfig.SuperUsername),
},
Key: key,
}
}
envVars := []v1.EnvVar{
{
Name: "PGHOST",
Value: c.serviceAddress(Master),
},
{
Name: "PGPORT",
Value: c.servicePort(Master),
},
{
Name: "PGUSER",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("username"),
},
},
// the convention is to use the same schema name as
// connection pool username
{
Name: "PGSCHEMA",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("username"),
},
},
{
Name: "PGPASSWORD",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("password"),
},
},
{
Name: "CONNECTION_POOL_MODE",
Value: *effectiveMode,
},
{
Name: "CONNECTION_POOL_PORT",
Value: fmt.Sprint(pgPort),
},
}
poolerContainer := v1.Container{
Name: connectionPoolContainer,
Image: c.OpConfig.ConnectionPool.Image,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: *resources,
Ports: []v1.ContainerPort{
{
ContainerPort: pgPort,
Protocol: v1.ProtocolTCP,
},
},
Env: envVars,
}
podTemplate = &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: c.connPoolLabelsSelector().MatchLabels,
Namespace: c.Namespace,
Annotations: c.generatePodAnnotations(spec),
},
Spec: v1.PodSpec{
ServiceAccountName: c.OpConfig.PodServiceAccountName,
TerminationGracePeriodSeconds: &gracePeriod,
Containers: []v1.Container{poolerContainer},
// TODO: add tolerations to scheduler pooler on the same node
// as database
//Tolerations: *tolerationsSpec,
},
}
}
return podTemplate, nil
}
func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) (
*appsv1.Deployment, error) {
podTemplate, err := c.generateConnPoolPodTemplate(spec)
numberOfInstances := spec.ConnectionPool.NumberOfInstances
if numberOfInstances == nil {
numberOfInstances = c.OpConfig.ConnectionPool.NumberOfInstances
}
if err != nil {
return nil, err
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: c.connPoolName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: map[string]string{},
// make Postgresql CRD object its owner, so that if CRD object is
// deleted, this object will be deleted even if something went
// wrong and operator didn't deleted it.
OwnerReferences: []metav1.OwnerReference{
{
UID: c.Statefulset.ObjectMeta.UID,
APIVersion: "apps/v1",
Kind: "StatefulSet",
Name: c.Statefulset.ObjectMeta.Name,
},
},
},
Spec: appsv1.DeploymentSpec{
Replicas: numberOfInstances,
Selector: c.connPoolLabelsSelector(),
Template: *podTemplate,
},
}
return deployment, nil
}
func (c *Cluster) generateConnPoolService(spec *acidv1.PostgresSpec) *v1.Service {
serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{
{
Name: c.connPoolName(),
Port: pgPort,
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)},
},
},
Type: v1.ServiceTypeClusterIP,
Selector: map[string]string{
"connection-pool": c.connPoolName(),
},
}
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: c.connPoolName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: map[string]string{},
// make Postgresql CRD object its owner, so that if CRD object is
// deleted, this object will be deleted even if something went
// wrong and operator didn't deleted it.
OwnerReferences: []metav1.OwnerReference{
{
UID: c.Postgresql.ObjectMeta.UID,
APIVersion: acidv1.APIVersion,
Kind: acidv1.PostgresqlKind,
Name: c.Postgresql.ObjectMeta.Name,
},
},
},
Spec: serviceSpec,
}
return service
}

View File

@ -90,6 +90,102 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) {
return statefulSet, nil
}
// Prepare the database for connection pool to be used, i.e. install lookup
// function (do it first, because it should be fast and if it didn't succeed,
// it doesn't makes sense to create more K8S objects. At this moment we assume
// that necessary connection pool user exists.
//
// After that create all the objects for connection pool, namely a deployment
// with a chosen pooler and a service to expose it.
func (c *Cluster) createConnectionPool() (*ConnectionPoolResources, error) {
var msg string
c.setProcessName("creating connection pool")
err := c.installLookupFunction(
c.OpConfig.ConnectionPool.Schema,
c.OpConfig.ConnectionPool.User)
if err != nil {
msg = "could not prepare database for connection pool: %v"
return nil, fmt.Errorf(msg, err)
}
deploymentSpec, err := c.generateConnPoolDeployment(&c.Spec)
if err != nil {
msg = "could not generate deployment for connection pool: %v"
return nil, fmt.Errorf(msg, err)
}
deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(deploymentSpec)
if err != nil {
return nil, err
}
serviceSpec := c.generateConnPoolService(&c.Spec)
service, err := c.KubeClient.
Services(serviceSpec.Namespace).
Create(serviceSpec)
if err != nil {
return nil, err
}
c.ConnectionPool = &ConnectionPoolResources{
Deployment: deployment,
Service: service,
}
c.logger.Debugf("created new connection pool %q, uid: %q",
util.NameFromMeta(deployment.ObjectMeta), deployment.UID)
return c.ConnectionPool, nil
}
func (c *Cluster) deleteConnectionPool() (err error) {
c.setProcessName("deleting connection pool")
c.logger.Debugln("deleting connection pool")
// Lack of connection pooler objects is not a fatal error, just log it if
// it was present before in the manifest
if c.needConnectionPool() && c.ConnectionPool == nil {
c.logger.Infof("No connection pool to delete")
return nil
}
deployment := c.ConnectionPool.Deployment
err = c.KubeClient.
Deployments(deployment.Namespace).
Delete(deployment.Name, c.deleteOptions)
if !k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Connection pool deployment was already deleted")
} else if err != nil {
return fmt.Errorf("could not delete deployment: %v", err)
}
c.logger.Infof("Connection pool deployment %q has been deleted",
util.NameFromMeta(deployment.ObjectMeta))
service := c.ConnectionPool.Service
err = c.KubeClient.
Services(service.Namespace).
Delete(service.Name, c.deleteOptions)
if !k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Connection pool service was already deleted")
} else if err != nil {
return fmt.Errorf("could not delete service: %v", err)
}
c.logger.Infof("Connection pool service %q has been deleted",
util.NameFromMeta(deployment.ObjectMeta))
c.ConnectionPool = nil
return nil
}
func getPodIndex(podName string) (int32, error) {
parts := strings.Split(podName, "-")
if len(parts) == 0 {

View File

@ -456,6 +456,12 @@ func (c *Cluster) syncRoles() (err error) {
for _, u := range c.pgUsers {
userNames = append(userNames, u.Name)
}
// An exception from system users, connection pool user
connPoolUser := c.systemUsers[constants.ConnectionPoolUserKeyName]
userNames = append(userNames, connPoolUser.Name)
c.pgUsers[connPoolUser.Name] = connPoolUser
dbUsers, err = c.readPgUsersFromDatabase(userNames)
if err != nil {
return fmt.Errorf("error getting users from the database: %v", err)

View File

@ -69,3 +69,5 @@ type ClusterStatus struct {
Spec acidv1.PostgresSpec
Error error
}
type TemplateParams map[string]interface{}

View File

@ -408,7 +408,19 @@ func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set {
}
func (c *Cluster) labelsSelector() *metav1.LabelSelector {
return &metav1.LabelSelector{MatchLabels: c.labelsSet(false), MatchExpressions: nil}
return &metav1.LabelSelector{
MatchLabels: c.labelsSet(false),
MatchExpressions: nil,
}
}
func (c *Cluster) connPoolLabelsSelector() *metav1.LabelSelector {
return &metav1.LabelSelector{
MatchLabels: map[string]string{
"connection-pool": c.connPoolName(),
},
MatchExpressions: nil,
}
}
func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) labels.Set {
@ -483,3 +495,7 @@ func (c *Cluster) GetSpec() (*acidv1.Postgresql, error) {
func (c *Cluster) patroniUsesKubernetes() bool {
return c.OpConfig.EtcdHost == ""
}
func (c *Cluster) needConnectionPool() bool {
return c.Spec.ConnectionPool != nil
}

View File

@ -23,13 +23,15 @@ const fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespa
// RoleOrigin contains the code of the origin of a role
type RoleOrigin int
// The rolesOrigin constant values must be sorted by the role priority for resolveNameConflict(...) to work.
// The rolesOrigin constant values must be sorted by the role priority for
// resolveNameConflict(...) to work.
const (
RoleOriginUnknown RoleOrigin = iota
RoleOriginManifest
RoleOriginInfrastructure
RoleOriginTeamsAPI
RoleOriginSystem
RoleConnectionPool
)
type syncUserOperation int

View File

@ -83,6 +83,16 @@ type LogicalBackup struct {
LogicalBackupS3SSE string `name:"logical_backup_s3_sse" default:"AES256"`
}
// Operator options for connection pooler
type ConnectionPool struct {
NumberOfInstances *int32 `name:"connection_pool_instances_number" default:"1"`
Schema string `name:"connection_pool_schema" default:"pooler"`
User string `name:"connection_pool_user" default:"pooler"`
Type string `name:"connection_pool_type" default:"pgbouncer"`
Image string `name:"connection_pool_image" default:"pgbouncer:1.0"`
Mode string `name:"connection_pool_mode" default:"session"`
}
// Config describes operator config
type Config struct {
CRD
@ -90,6 +100,7 @@ type Config struct {
Auth
Scalyr
LogicalBackup
ConnectionPool
WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to'
EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS

View File

@ -4,6 +4,7 @@ package constants
const (
PasswordLength = 64
SuperuserKeyName = "superuser"
ConnectionPoolUserKeyName = "pooler"
ReplicationUserKeyName = "replication"
RoleFlagSuperuser = "SUPERUSER"
RoleFlagInherit = "INHERIT"

View File

@ -40,6 +40,7 @@ type KubernetesClient struct {
corev1.NamespacesGetter
corev1.ServiceAccountsGetter
appsv1.StatefulSetsGetter
appsv1.DeploymentsGetter
rbacv1beta1.RoleBindingsGetter
policyv1beta1.PodDisruptionBudgetsGetter
apiextbeta1.CustomResourceDefinitionsGetter
@ -102,6 +103,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
kubeClient.NodesGetter = client.CoreV1()
kubeClient.NamespacesGetter = client.CoreV1()
kubeClient.StatefulSetsGetter = client.AppsV1()
kubeClient.DeploymentsGetter = client.AppsV1()
kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1()
kubeClient.RESTClient = client.CoreV1().RESTClient()
kubeClient.RoleBindingsGetter = client.RbacV1beta1()