WIP Connection pooler support
Add an initial support for a connection pooler. The idea is to make it generic enough to be able to switch a corresponding docker image to change from pgbouncer to e.g. odyssey. Operator needs to create a deployment with pooler and a service for it to access.
This commit is contained in:
parent
744c71d16b
commit
be438b77e6
|
|
@ -3,8 +3,17 @@ MAINTAINER Team ACID @ Zalando <team-acid@zalando.de>
|
|||
|
||||
# We need root certificates to deal with teams api over https
|
||||
RUN apk --no-cache add ca-certificates go git musl-dev
|
||||
RUN go get github.com/derekparker/delve/cmd/dlv
|
||||
|
||||
COPY build/* /
|
||||
|
||||
CMD ["/root/go/bin/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"]
|
||||
RUN addgroup -g 1000 pgo
|
||||
RUN adduser -D -u 1000 -G pgo -g 'Postgres Operator' pgo
|
||||
|
||||
RUN go get github.com/derekparker/delve/cmd/dlv
|
||||
RUN cp /root/go/bin/dlv /dlv
|
||||
RUN chown -R pgo:pgo /dlv
|
||||
|
||||
USER pgo:pgo
|
||||
RUN ls -l /
|
||||
|
||||
CMD ["/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"]
|
||||
|
|
|
|||
|
|
@ -17,3 +17,5 @@ spec:
|
|||
foo: zalando # dbname: owner
|
||||
postgresql:
|
||||
version: "11"
|
||||
connectionPool:
|
||||
type: "pgbouncer"
|
||||
|
|
|
|||
|
|
@ -119,6 +119,7 @@ rules:
|
|||
- apps
|
||||
resources:
|
||||
- statefulsets
|
||||
- deployments
|
||||
verbs:
|
||||
- create
|
||||
- delete
|
||||
|
|
|
|||
|
|
@ -27,6 +27,8 @@ type PostgresSpec struct {
|
|||
Patroni `json:"patroni,omitempty"`
|
||||
Resources `json:"resources,omitempty"`
|
||||
|
||||
ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"`
|
||||
|
||||
TeamID string `json:"teamId"`
|
||||
DockerImage string `json:"dockerImage,omitempty"`
|
||||
|
||||
|
|
@ -154,3 +156,13 @@ type UserFlags []string
|
|||
type PostgresStatus struct {
|
||||
PostgresClusterStatus string `json:"PostgresClusterStatus"`
|
||||
}
|
||||
|
||||
// Options for connection pooler
|
||||
type ConnectionPool struct {
|
||||
NumberOfInstances *int32 `json:"instancesNumber,omitempty"`
|
||||
Schema *string `json:"schema,omitempty"`
|
||||
User *string `json:"user,omitempty"`
|
||||
Type *string `json:"type,omitempty"`
|
||||
Mode *string `json:"mode,omitempty"`
|
||||
PodTemplate *v1.PodTemplateSpec `json:"podTemplate,omitempty"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,8 @@ import (
|
|||
|
||||
// APIVersion of the `postgresql` and `operator` CRDs
|
||||
const (
|
||||
APIVersion = "v1"
|
||||
APIVersion = "v1"
|
||||
PostgresqlKind = "postgresql"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -42,7 +43,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
|
|||
// AddKnownType assumes derives the type kind from the type name, which is always uppercase.
|
||||
// For our CRDs we use lowercase names historically, therefore we have to supply the name separately.
|
||||
// TODO: User uppercase CRDResourceKind of our types in the next major API version
|
||||
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("postgresql"), &Postgresql{})
|
||||
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind(PostgresqlKind), &Postgresql{})
|
||||
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("postgresqlList"), &PostgresqlList{})
|
||||
scheme.AddKnownTypeWithName(SchemeGroupVersion.WithKind("OperatorConfiguration"),
|
||||
&OperatorConfiguration{})
|
||||
|
|
|
|||
|
|
@ -48,11 +48,17 @@ type Config struct {
|
|||
PodServiceAccountRoleBinding *rbacv1beta1.RoleBinding
|
||||
}
|
||||
|
||||
type ConnectionPoolResources struct {
|
||||
Deployment *appsv1.Deployment
|
||||
Service *v1.Service
|
||||
}
|
||||
|
||||
type kubeResources struct {
|
||||
Services map[PostgresRole]*v1.Service
|
||||
Endpoints map[PostgresRole]*v1.Endpoints
|
||||
Secrets map[types.UID]*v1.Secret
|
||||
Statefulset *appsv1.StatefulSet
|
||||
ConnectionPool *ConnectionPoolResources
|
||||
PodDisruptionBudget *policybeta1.PodDisruptionBudget
|
||||
//Pods are treated separately
|
||||
//PVCs are treated separately
|
||||
|
|
@ -184,7 +190,8 @@ func (c *Cluster) isNewCluster() bool {
|
|||
func (c *Cluster) initUsers() error {
|
||||
c.setProcessName("initializing users")
|
||||
|
||||
// clear our the previous state of the cluster users (in case we are running a sync).
|
||||
// clear our the previous state of the cluster users (in case we are
|
||||
// running a sync).
|
||||
c.systemUsers = map[string]spec.PgUser{}
|
||||
c.pgUsers = map[string]spec.PgUser{}
|
||||
|
||||
|
|
@ -292,8 +299,10 @@ func (c *Cluster) Create() error {
|
|||
}
|
||||
c.logger.Infof("pods are ready")
|
||||
|
||||
// create database objects unless we are running without pods or disabled that feature explicitly
|
||||
// create database objects unless we are running without pods or disabled
|
||||
// that feature explicitly
|
||||
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
|
||||
c.logger.Infof("Create roles")
|
||||
if err = c.createRoles(); err != nil {
|
||||
return fmt.Errorf("could not create users: %v", err)
|
||||
}
|
||||
|
|
@ -316,6 +325,26 @@ func (c *Cluster) Create() error {
|
|||
c.logger.Errorf("could not list resources: %v", err)
|
||||
}
|
||||
|
||||
// Create connection pool deployment and services if necessary. Since we
|
||||
// need to peform some operations with the database itself (e.g. install
|
||||
// lookup function), do it as the last step, when everything is available.
|
||||
//
|
||||
// Do not consider connection pool as a strict requirement, and if
|
||||
// something fails, report warning
|
||||
if c.needConnectionPool() {
|
||||
if c.ConnectionPool != nil {
|
||||
c.logger.Warning("Connection pool already exists in the cluster")
|
||||
return nil
|
||||
}
|
||||
connPool, err := c.createConnectionPool()
|
||||
if err != nil {
|
||||
c.logger.Warningf("could not create connection pool: %v", err)
|
||||
return nil
|
||||
}
|
||||
c.logger.Infof("connection pool %q has been successfully created",
|
||||
util.NameFromMeta(connPool.Deployment.ObjectMeta))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -745,6 +774,12 @@ func (c *Cluster) Delete() {
|
|||
c.logger.Warningf("could not remove leftover patroni objects; %v", err)
|
||||
}
|
||||
|
||||
// Delete connection pool objects anyway, even if it's not mentioned in the
|
||||
// manifest, just to not keep orphaned components in case if something went
|
||||
// wrong
|
||||
if err := c.deleteConnectionPool(); err != nil {
|
||||
c.logger.Warningf("could not remove connection pool: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
//NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status).
|
||||
|
|
@ -811,6 +846,22 @@ func (c *Cluster) initSystemUsers() {
|
|||
Name: c.OpConfig.ReplicationUsername,
|
||||
Password: util.RandomPassword(constants.PasswordLength),
|
||||
}
|
||||
|
||||
// Connection pool user is an exception, if requested it's going to be
|
||||
// created by operator as a normal pgUser
|
||||
if c.needConnectionPool() {
|
||||
|
||||
username := c.Spec.ConnectionPool.User
|
||||
if username == nil {
|
||||
username = &c.OpConfig.ConnectionPool.User
|
||||
}
|
||||
|
||||
c.systemUsers[constants.ConnectionPoolUserKeyName] = spec.PgUser{
|
||||
Origin: spec.RoleConnectionPool,
|
||||
Name: *username,
|
||||
Password: util.RandomPassword(constants.PasswordLength),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) initRobotUsers() error {
|
||||
|
|
|
|||
|
|
@ -1,10 +1,12 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/lib/pq"
|
||||
|
|
@ -28,6 +30,25 @@ const (
|
|||
getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;`
|
||||
createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";`
|
||||
alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";`
|
||||
connectionPoolLookup = `
|
||||
CREATE SCHEMA IF NOT EXISTS {{.pool_schema}};
|
||||
|
||||
CREATE OR REPLACE FUNCTION {{.pool_schema}}.user_lookup(
|
||||
in i_username text, out uname text, out phash text)
|
||||
RETURNS record AS $$
|
||||
BEGIN
|
||||
SELECT usename, passwd FROM pg_catalog.pg_shadow
|
||||
WHERE usename = i_username INTO uname, phash;
|
||||
RETURN;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql SECURITY DEFINER;
|
||||
|
||||
REVOKE ALL ON FUNCTION {{.pool_schema}}.user_lookup(text)
|
||||
FROM public, {{.pool_schema}};
|
||||
GRANT EXECUTE ON FUNCTION {{.pool_schema}}.user_lookup(text)
|
||||
TO {{.pool_user}};
|
||||
GRANT USAGE ON SCHEMA {{.pool_schema}} TO {{.pool_user}};
|
||||
`
|
||||
)
|
||||
|
||||
func (c *Cluster) pgConnectionString() string {
|
||||
|
|
@ -243,3 +264,49 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin
|
|||
|
||||
return result
|
||||
}
|
||||
|
||||
// Creates a connection pool credentials lookup function in every database to
|
||||
// perform remote authentification.
|
||||
func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
|
||||
var stmtBytes bytes.Buffer
|
||||
|
||||
if err := c.initDbConn(); err != nil {
|
||||
return fmt.Errorf("could not init database connection")
|
||||
}
|
||||
defer func() {
|
||||
if err := c.closeDbConn(); err != nil {
|
||||
c.logger.Errorf("could not close database connection: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
currentDatabases, err := c.getDatabases()
|
||||
if err != nil {
|
||||
msg := "could not get databases to install pool lookup function: %v"
|
||||
return fmt.Errorf(msg, err)
|
||||
}
|
||||
|
||||
templater := template.Must(template.New("sql").Parse(connectionPoolLookup))
|
||||
|
||||
for dbname, _ := range currentDatabases {
|
||||
c.logger.Infof("Install pool lookup function into %s", dbname)
|
||||
|
||||
params := TemplateParams{
|
||||
"pool_schema": poolSchema,
|
||||
"pool_user": poolUser,
|
||||
}
|
||||
|
||||
if err := templater.Execute(&stmtBytes, params); err != nil {
|
||||
return fmt.Errorf("could not prepare sql statement %+v: %v",
|
||||
params, err)
|
||||
}
|
||||
|
||||
if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil {
|
||||
return fmt.Errorf("could not execute sql statement %s: %v",
|
||||
stmtBytes.String(), err)
|
||||
}
|
||||
|
||||
c.logger.Infof("Pool lookup function installed into %s", dbname)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ const (
|
|||
patroniPGParametersParameterName = "parameters"
|
||||
patroniPGHBAConfParameterName = "pg_hba"
|
||||
localHost = "127.0.0.1/32"
|
||||
connectionPoolContainer = "connection-pool"
|
||||
pgPort = 5432
|
||||
)
|
||||
|
||||
type pgUser struct {
|
||||
|
|
@ -66,6 +68,10 @@ func (c *Cluster) statefulSetName() string {
|
|||
return c.Name
|
||||
}
|
||||
|
||||
func (c *Cluster) connPoolName() string {
|
||||
return c.Name + "-pooler"
|
||||
}
|
||||
|
||||
func (c *Cluster) endpointName(role PostgresRole) string {
|
||||
name := c.Name
|
||||
if role == Replica {
|
||||
|
|
@ -84,6 +90,28 @@ func (c *Cluster) serviceName(role PostgresRole) string {
|
|||
return name
|
||||
}
|
||||
|
||||
func (c *Cluster) serviceAddress(role PostgresRole) string {
|
||||
service, exist := c.Services[role]
|
||||
|
||||
if exist {
|
||||
return service.ObjectMeta.Name
|
||||
}
|
||||
|
||||
c.logger.Warningf("No service for role %s", role)
|
||||
return ""
|
||||
}
|
||||
|
||||
func (c *Cluster) servicePort(role PostgresRole) string {
|
||||
service, exist := c.Services[role]
|
||||
|
||||
if exist {
|
||||
return fmt.Sprint(service.Spec.Ports[0].Port)
|
||||
}
|
||||
|
||||
c.logger.Warningf("No service for role %s", role)
|
||||
return ""
|
||||
}
|
||||
|
||||
func (c *Cluster) podDisruptionBudgetName() string {
|
||||
return c.OpConfig.PDBNameFormat.Format("cluster", c.Name)
|
||||
}
|
||||
|
|
@ -315,7 +343,11 @@ func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]stri
|
|||
return *tolerationsSpec
|
||||
}
|
||||
|
||||
if len(podToleration["key"]) > 0 || len(podToleration["operator"]) > 0 || len(podToleration["value"]) > 0 || len(podToleration["effect"]) > 0 {
|
||||
if len(podToleration["key"]) > 0 ||
|
||||
len(podToleration["operator"]) > 0 ||
|
||||
len(podToleration["value"]) > 0 ||
|
||||
len(podToleration["effect"]) > 0 {
|
||||
|
||||
return []v1.Toleration{
|
||||
{
|
||||
Key: podToleration["key"],
|
||||
|
|
@ -1669,3 +1701,185 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar {
|
|||
func (c *Cluster) getLogicalBackupJobName() (jobName string) {
|
||||
return "logical-backup-" + c.clusterName().Name
|
||||
}
|
||||
|
||||
func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) (
|
||||
*v1.PodTemplateSpec, error) {
|
||||
|
||||
podTemplate := spec.ConnectionPool.PodTemplate
|
||||
|
||||
if podTemplate == nil {
|
||||
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
|
||||
resources, err := generateResourceRequirements(
|
||||
c.Spec.Resources,
|
||||
c.makeDefaultResources())
|
||||
|
||||
effectiveMode := spec.ConnectionPool.Mode
|
||||
if effectiveMode == nil {
|
||||
effectiveMode = &c.OpConfig.ConnectionPool.Mode
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate resource requirements: %v", err)
|
||||
}
|
||||
|
||||
secretSelector := func(key string) *v1.SecretKeySelector {
|
||||
return &v1.SecretKeySelector{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: c.credentialSecretName(c.OpConfig.SuperUsername),
|
||||
},
|
||||
Key: key,
|
||||
}
|
||||
}
|
||||
|
||||
envVars := []v1.EnvVar{
|
||||
{
|
||||
Name: "PGHOST",
|
||||
Value: c.serviceAddress(Master),
|
||||
},
|
||||
{
|
||||
Name: "PGPORT",
|
||||
Value: c.servicePort(Master),
|
||||
},
|
||||
{
|
||||
Name: "PGUSER",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: secretSelector("username"),
|
||||
},
|
||||
},
|
||||
// the convention is to use the same schema name as
|
||||
// connection pool username
|
||||
{
|
||||
Name: "PGSCHEMA",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: secretSelector("username"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PGPASSWORD",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: secretSelector("password"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "CONNECTION_POOL_MODE",
|
||||
Value: *effectiveMode,
|
||||
},
|
||||
{
|
||||
Name: "CONNECTION_POOL_PORT",
|
||||
Value: fmt.Sprint(pgPort),
|
||||
},
|
||||
}
|
||||
|
||||
poolerContainer := v1.Container{
|
||||
Name: connectionPoolContainer,
|
||||
Image: c.OpConfig.ConnectionPool.Image,
|
||||
ImagePullPolicy: v1.PullIfNotPresent,
|
||||
Resources: *resources,
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
ContainerPort: pgPort,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
Env: envVars,
|
||||
}
|
||||
|
||||
podTemplate = &v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: c.connPoolLabelsSelector().MatchLabels,
|
||||
Namespace: c.Namespace,
|
||||
Annotations: c.generatePodAnnotations(spec),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
ServiceAccountName: c.OpConfig.PodServiceAccountName,
|
||||
TerminationGracePeriodSeconds: &gracePeriod,
|
||||
Containers: []v1.Container{poolerContainer},
|
||||
// TODO: add tolerations to scheduler pooler on the same node
|
||||
// as database
|
||||
//Tolerations: *tolerationsSpec,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return podTemplate, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) (
|
||||
*appsv1.Deployment, error) {
|
||||
|
||||
podTemplate, err := c.generateConnPoolPodTemplate(spec)
|
||||
numberOfInstances := spec.ConnectionPool.NumberOfInstances
|
||||
if numberOfInstances == nil {
|
||||
numberOfInstances = c.OpConfig.ConnectionPool.NumberOfInstances
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
deployment := &appsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: c.connPoolName(),
|
||||
Namespace: c.Namespace,
|
||||
Labels: c.labelsSet(true),
|
||||
Annotations: map[string]string{},
|
||||
// make Postgresql CRD object its owner, so that if CRD object is
|
||||
// deleted, this object will be deleted even if something went
|
||||
// wrong and operator didn't deleted it.
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
UID: c.Statefulset.ObjectMeta.UID,
|
||||
APIVersion: "apps/v1",
|
||||
Kind: "StatefulSet",
|
||||
Name: c.Statefulset.ObjectMeta.Name,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: appsv1.DeploymentSpec{
|
||||
Replicas: numberOfInstances,
|
||||
Selector: c.connPoolLabelsSelector(),
|
||||
Template: *podTemplate,
|
||||
},
|
||||
}
|
||||
|
||||
return deployment, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) generateConnPoolService(spec *acidv1.PostgresSpec) *v1.Service {
|
||||
serviceSpec := v1.ServiceSpec{
|
||||
Ports: []v1.ServicePort{
|
||||
{
|
||||
Name: c.connPoolName(),
|
||||
Port: pgPort,
|
||||
TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)},
|
||||
},
|
||||
},
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
Selector: map[string]string{
|
||||
"connection-pool": c.connPoolName(),
|
||||
},
|
||||
}
|
||||
|
||||
service := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: c.connPoolName(),
|
||||
Namespace: c.Namespace,
|
||||
Labels: c.labelsSet(true),
|
||||
Annotations: map[string]string{},
|
||||
// make Postgresql CRD object its owner, so that if CRD object is
|
||||
// deleted, this object will be deleted even if something went
|
||||
// wrong and operator didn't deleted it.
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
UID: c.Postgresql.ObjectMeta.UID,
|
||||
APIVersion: acidv1.APIVersion,
|
||||
Kind: acidv1.PostgresqlKind,
|
||||
Name: c.Postgresql.ObjectMeta.Name,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: serviceSpec,
|
||||
}
|
||||
|
||||
return service
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,6 +90,102 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) {
|
|||
return statefulSet, nil
|
||||
}
|
||||
|
||||
// Prepare the database for connection pool to be used, i.e. install lookup
|
||||
// function (do it first, because it should be fast and if it didn't succeed,
|
||||
// it doesn't makes sense to create more K8S objects. At this moment we assume
|
||||
// that necessary connection pool user exists.
|
||||
//
|
||||
// After that create all the objects for connection pool, namely a deployment
|
||||
// with a chosen pooler and a service to expose it.
|
||||
func (c *Cluster) createConnectionPool() (*ConnectionPoolResources, error) {
|
||||
var msg string
|
||||
c.setProcessName("creating connection pool")
|
||||
|
||||
err := c.installLookupFunction(
|
||||
c.OpConfig.ConnectionPool.Schema,
|
||||
c.OpConfig.ConnectionPool.User)
|
||||
|
||||
if err != nil {
|
||||
msg = "could not prepare database for connection pool: %v"
|
||||
return nil, fmt.Errorf(msg, err)
|
||||
}
|
||||
|
||||
deploymentSpec, err := c.generateConnPoolDeployment(&c.Spec)
|
||||
if err != nil {
|
||||
msg = "could not generate deployment for connection pool: %v"
|
||||
return nil, fmt.Errorf(msg, err)
|
||||
}
|
||||
|
||||
deployment, err := c.KubeClient.
|
||||
Deployments(deploymentSpec.Namespace).
|
||||
Create(deploymentSpec)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serviceSpec := c.generateConnPoolService(&c.Spec)
|
||||
service, err := c.KubeClient.
|
||||
Services(serviceSpec.Namespace).
|
||||
Create(serviceSpec)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.ConnectionPool = &ConnectionPoolResources{
|
||||
Deployment: deployment,
|
||||
Service: service,
|
||||
}
|
||||
c.logger.Debugf("created new connection pool %q, uid: %q",
|
||||
util.NameFromMeta(deployment.ObjectMeta), deployment.UID)
|
||||
|
||||
return c.ConnectionPool, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) deleteConnectionPool() (err error) {
|
||||
c.setProcessName("deleting connection pool")
|
||||
c.logger.Debugln("deleting connection pool")
|
||||
|
||||
// Lack of connection pooler objects is not a fatal error, just log it if
|
||||
// it was present before in the manifest
|
||||
if c.needConnectionPool() && c.ConnectionPool == nil {
|
||||
c.logger.Infof("No connection pool to delete")
|
||||
return nil
|
||||
}
|
||||
|
||||
deployment := c.ConnectionPool.Deployment
|
||||
err = c.KubeClient.
|
||||
Deployments(deployment.Namespace).
|
||||
Delete(deployment.Name, c.deleteOptions)
|
||||
|
||||
if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Debugf("Connection pool deployment was already deleted")
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("could not delete deployment: %v", err)
|
||||
}
|
||||
|
||||
c.logger.Infof("Connection pool deployment %q has been deleted",
|
||||
util.NameFromMeta(deployment.ObjectMeta))
|
||||
|
||||
service := c.ConnectionPool.Service
|
||||
err = c.KubeClient.
|
||||
Services(service.Namespace).
|
||||
Delete(service.Name, c.deleteOptions)
|
||||
|
||||
if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Debugf("Connection pool service was already deleted")
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("could not delete service: %v", err)
|
||||
}
|
||||
|
||||
c.logger.Infof("Connection pool service %q has been deleted",
|
||||
util.NameFromMeta(deployment.ObjectMeta))
|
||||
|
||||
c.ConnectionPool = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func getPodIndex(podName string) (int32, error) {
|
||||
parts := strings.Split(podName, "-")
|
||||
if len(parts) == 0 {
|
||||
|
|
|
|||
|
|
@ -456,6 +456,12 @@ func (c *Cluster) syncRoles() (err error) {
|
|||
for _, u := range c.pgUsers {
|
||||
userNames = append(userNames, u.Name)
|
||||
}
|
||||
|
||||
// An exception from system users, connection pool user
|
||||
connPoolUser := c.systemUsers[constants.ConnectionPoolUserKeyName]
|
||||
userNames = append(userNames, connPoolUser.Name)
|
||||
c.pgUsers[connPoolUser.Name] = connPoolUser
|
||||
|
||||
dbUsers, err = c.readPgUsersFromDatabase(userNames)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting users from the database: %v", err)
|
||||
|
|
|
|||
|
|
@ -69,3 +69,5 @@ type ClusterStatus struct {
|
|||
Spec acidv1.PostgresSpec
|
||||
Error error
|
||||
}
|
||||
|
||||
type TemplateParams map[string]interface{}
|
||||
|
|
|
|||
|
|
@ -408,7 +408,19 @@ func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set {
|
|||
}
|
||||
|
||||
func (c *Cluster) labelsSelector() *metav1.LabelSelector {
|
||||
return &metav1.LabelSelector{MatchLabels: c.labelsSet(false), MatchExpressions: nil}
|
||||
return &metav1.LabelSelector{
|
||||
MatchLabels: c.labelsSet(false),
|
||||
MatchExpressions: nil,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) connPoolLabelsSelector() *metav1.LabelSelector {
|
||||
return &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{
|
||||
"connection-pool": c.connPoolName(),
|
||||
},
|
||||
MatchExpressions: nil,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) labels.Set {
|
||||
|
|
@ -483,3 +495,7 @@ func (c *Cluster) GetSpec() (*acidv1.Postgresql, error) {
|
|||
func (c *Cluster) patroniUsesKubernetes() bool {
|
||||
return c.OpConfig.EtcdHost == ""
|
||||
}
|
||||
|
||||
func (c *Cluster) needConnectionPool() bool {
|
||||
return c.Spec.ConnectionPool != nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,13 +23,15 @@ const fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespa
|
|||
// RoleOrigin contains the code of the origin of a role
|
||||
type RoleOrigin int
|
||||
|
||||
// The rolesOrigin constant values must be sorted by the role priority for resolveNameConflict(...) to work.
|
||||
// The rolesOrigin constant values must be sorted by the role priority for
|
||||
// resolveNameConflict(...) to work.
|
||||
const (
|
||||
RoleOriginUnknown RoleOrigin = iota
|
||||
RoleOriginManifest
|
||||
RoleOriginInfrastructure
|
||||
RoleOriginTeamsAPI
|
||||
RoleOriginSystem
|
||||
RoleConnectionPool
|
||||
)
|
||||
|
||||
type syncUserOperation int
|
||||
|
|
|
|||
|
|
@ -83,6 +83,16 @@ type LogicalBackup struct {
|
|||
LogicalBackupS3SSE string `name:"logical_backup_s3_sse" default:"AES256"`
|
||||
}
|
||||
|
||||
// Operator options for connection pooler
|
||||
type ConnectionPool struct {
|
||||
NumberOfInstances *int32 `name:"connection_pool_instances_number" default:"1"`
|
||||
Schema string `name:"connection_pool_schema" default:"pooler"`
|
||||
User string `name:"connection_pool_user" default:"pooler"`
|
||||
Type string `name:"connection_pool_type" default:"pgbouncer"`
|
||||
Image string `name:"connection_pool_image" default:"pgbouncer:1.0"`
|
||||
Mode string `name:"connection_pool_mode" default:"session"`
|
||||
}
|
||||
|
||||
// Config describes operator config
|
||||
type Config struct {
|
||||
CRD
|
||||
|
|
@ -90,6 +100,7 @@ type Config struct {
|
|||
Auth
|
||||
Scalyr
|
||||
LogicalBackup
|
||||
ConnectionPool
|
||||
|
||||
WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to'
|
||||
EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS
|
||||
|
|
|
|||
|
|
@ -2,15 +2,16 @@ package constants
|
|||
|
||||
// Roles specific constants
|
||||
const (
|
||||
PasswordLength = 64
|
||||
SuperuserKeyName = "superuser"
|
||||
ReplicationUserKeyName = "replication"
|
||||
RoleFlagSuperuser = "SUPERUSER"
|
||||
RoleFlagInherit = "INHERIT"
|
||||
RoleFlagLogin = "LOGIN"
|
||||
RoleFlagNoLogin = "NOLOGIN"
|
||||
RoleFlagCreateRole = "CREATEROLE"
|
||||
RoleFlagCreateDB = "CREATEDB"
|
||||
RoleFlagReplication = "REPLICATION"
|
||||
RoleFlagByPassRLS = "BYPASSRLS"
|
||||
PasswordLength = 64
|
||||
SuperuserKeyName = "superuser"
|
||||
ConnectionPoolUserKeyName = "pooler"
|
||||
ReplicationUserKeyName = "replication"
|
||||
RoleFlagSuperuser = "SUPERUSER"
|
||||
RoleFlagInherit = "INHERIT"
|
||||
RoleFlagLogin = "LOGIN"
|
||||
RoleFlagNoLogin = "NOLOGIN"
|
||||
RoleFlagCreateRole = "CREATEROLE"
|
||||
RoleFlagCreateDB = "CREATEDB"
|
||||
RoleFlagReplication = "REPLICATION"
|
||||
RoleFlagByPassRLS = "BYPASSRLS"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ type KubernetesClient struct {
|
|||
corev1.NamespacesGetter
|
||||
corev1.ServiceAccountsGetter
|
||||
appsv1.StatefulSetsGetter
|
||||
appsv1.DeploymentsGetter
|
||||
rbacv1beta1.RoleBindingsGetter
|
||||
policyv1beta1.PodDisruptionBudgetsGetter
|
||||
apiextbeta1.CustomResourceDefinitionsGetter
|
||||
|
|
@ -101,6 +102,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
|
|||
kubeClient.NodesGetter = client.CoreV1()
|
||||
kubeClient.NamespacesGetter = client.CoreV1()
|
||||
kubeClient.StatefulSetsGetter = client.AppsV1()
|
||||
kubeClient.DeploymentsGetter = client.AppsV1()
|
||||
kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1()
|
||||
kubeClient.RESTClient = client.CoreV1().RESTClient()
|
||||
kubeClient.RoleBindingsGetter = client.RbacV1beta1()
|
||||
|
|
|
|||
Loading…
Reference in New Issue