Various improvements
Add synchronization logic. For now get rid of podTemplate, type fields. Add crd validation & configuration part, put retry on top of lookup function installation.
This commit is contained in:
parent
8bd2086cd2
commit
3ff1147bce
|
|
@ -106,6 +106,7 @@ rules:
|
|||
- apps
|
||||
resources:
|
||||
- statefulsets
|
||||
- deployments
|
||||
verbs:
|
||||
- create
|
||||
- delete
|
||||
|
|
|
|||
|
|
@ -294,6 +294,43 @@ spec:
|
|||
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
|
||||
scalyr_server_url:
|
||||
type: string
|
||||
connection_pool:
|
||||
type: object
|
||||
properties:
|
||||
connection_pool_schema:
|
||||
type: string
|
||||
#default: "pooler"
|
||||
connection_pool_user:
|
||||
type: string
|
||||
#default: "pooler"
|
||||
connection_pool_instances_number:
|
||||
type: integer
|
||||
#default: 1
|
||||
connection_pool_image:
|
||||
type: string
|
||||
#default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1"
|
||||
connection_pool_mode:
|
||||
type: string
|
||||
enum:
|
||||
- "session"
|
||||
- "transaction"
|
||||
#default: "transaction"
|
||||
connection_pool_default_cpu_limit:
|
||||
type: string
|
||||
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
|
||||
#default: "1"
|
||||
connection_pool_default_cpu_request:
|
||||
type: string
|
||||
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
|
||||
#default: "1"
|
||||
connection_pool_default_memory_limit:
|
||||
type: string
|
||||
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
|
||||
#default: "100m"
|
||||
connection_pool_default_memory_request:
|
||||
type: string
|
||||
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
|
||||
#default: "100m"
|
||||
status:
|
||||
type: object
|
||||
additionalProperties:
|
||||
|
|
|
|||
|
|
@ -27,7 +27,8 @@ type PostgresSpec struct {
|
|||
Patroni `json:"patroni,omitempty"`
|
||||
Resources `json:"resources,omitempty"`
|
||||
|
||||
ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"`
|
||||
EnableConnectionPool bool `json:"enable_connection_pool,omitempty"`
|
||||
ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"`
|
||||
|
||||
TeamID string `json:"teamId"`
|
||||
DockerImage string `json:"dockerImage,omitempty"`
|
||||
|
|
@ -159,12 +160,15 @@ type PostgresStatus struct {
|
|||
|
||||
// Options for connection pooler
|
||||
type ConnectionPool struct {
|
||||
NumberOfInstances *int32 `json:"instancesNumber,omitempty"`
|
||||
Schema *string `json:"schema,omitempty"`
|
||||
User *string `json:"user,omitempty"`
|
||||
Type *string `json:"type,omitempty"`
|
||||
Mode *string `json:"mode,omitempty"`
|
||||
PodTemplate *v1.PodTemplateSpec `json:"podTemplate,omitempty"`
|
||||
NumberOfInstances *int32 `json:"instancesNumber,omitempty"`
|
||||
Schema string `json:"schema,omitempty"`
|
||||
User string `json:"user,omitempty"`
|
||||
Mode string `json:"mode,omitempty"`
|
||||
DockerImage string `json:"dockerImage,omitempty"`
|
||||
// TODO: prepared snippets of configuration, one can choose via type, e.g.
|
||||
// pgbouncer-large (with higher resources) or odyssey-small (with smaller
|
||||
// resources)
|
||||
// Type string `json:"type,omitempty"`
|
||||
|
||||
Resources `json:"resources,omitempty"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/r3labs/diff"
|
||||
"github.com/sirupsen/logrus"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
|
|
@ -723,6 +724,17 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
}
|
||||
}
|
||||
|
||||
// connection pool
|
||||
if !reflect.DeepEqual(oldSpec.Spec.ConnectionPool,
|
||||
newSpec.Spec.ConnectionPool) {
|
||||
c.logger.Debug("syncing connection pool")
|
||||
|
||||
if err := c.syncConnectionPool(oldSpec, newSpec); err != nil {
|
||||
c.logger.Errorf("could not sync connection pool: %v", err)
|
||||
updateFailed = true
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -852,13 +864,13 @@ func (c *Cluster) initSystemUsers() {
|
|||
if c.needConnectionPool() {
|
||||
|
||||
username := c.Spec.ConnectionPool.User
|
||||
if username == nil {
|
||||
username = &c.OpConfig.ConnectionPool.User
|
||||
if username == "" {
|
||||
username = c.OpConfig.ConnectionPool.User
|
||||
}
|
||||
|
||||
c.systemUsers[constants.ConnectionPoolUserKeyName] = spec.PgUser{
|
||||
Origin: spec.RoleConnectionPool,
|
||||
Name: *username,
|
||||
Name: username,
|
||||
Password: util.RandomPassword(constants.PasswordLength),
|
||||
}
|
||||
}
|
||||
|
|
@ -1188,3 +1200,29 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error {
|
|||
|
||||
return c.deleteClusterObject(get, deleteConfigMapFn, "configmap")
|
||||
}
|
||||
|
||||
// Test if two connection pool configuration needs to be synced. For simplicity
|
||||
// compare not the actual K8S objects, but the configuration itself and request
|
||||
// sync if there is any difference.
|
||||
func (c *Cluster) needSyncConnPoolDeployments(oldSpec, newSpec *acidv1.ConnectionPool) (sync bool, reasons []string) {
|
||||
reasons = []string{}
|
||||
sync = false
|
||||
|
||||
changelog, err := diff.Diff(oldSpec, newSpec)
|
||||
if err != nil {
|
||||
c.logger.Infof("Cannot get diff, do not do anything, %+v", err)
|
||||
return false, reasons
|
||||
} else {
|
||||
if len(changelog) > 0 {
|
||||
sync = true
|
||||
}
|
||||
|
||||
for _, change := range changelog {
|
||||
msg := fmt.Sprintf("%s %+v from %s to %s",
|
||||
change.Type, change.Path, change.From, change.To)
|
||||
reasons = append(reasons, msg)
|
||||
}
|
||||
}
|
||||
|
||||
return sync, reasons
|
||||
}
|
||||
|
|
|
|||
|
|
@ -300,8 +300,26 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
|
|||
params, err)
|
||||
}
|
||||
|
||||
if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil {
|
||||
return fmt.Errorf("could not execute sql statement %s: %v",
|
||||
// golang sql will do retries couple of times if pq driver reports
|
||||
// connections issues (driver.ErrBadConn), but since our query is
|
||||
// idempotent, we can retry in a view of other errors (e.g. due to
|
||||
// failover a db is temporary in a read-only mode or so) to make sure
|
||||
// it was applied.
|
||||
execErr := retryutil.Retry(
|
||||
constants.PostgresConnectTimeout,
|
||||
constants.PostgresConnectRetryTimeout,
|
||||
func() (bool, error) {
|
||||
if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil {
|
||||
msg := fmt.Errorf("could not execute sql statement %s: %v",
|
||||
stmtBytes.String(), err)
|
||||
return false, msg
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
|
||||
if execErr != nil {
|
||||
return fmt.Errorf("could not execute after retries %s: %v",
|
||||
stmtBytes.String(), err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1734,100 +1734,99 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) {
|
|||
func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) (
|
||||
*v1.PodTemplateSpec, error) {
|
||||
|
||||
podTemplate := spec.ConnectionPool.PodTemplate
|
||||
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
|
||||
resources, err := generateResourceRequirements(
|
||||
spec.ConnectionPool.Resources,
|
||||
c.makeDefaultConnPoolResources())
|
||||
|
||||
if podTemplate == nil {
|
||||
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
|
||||
resources, err := generateResourceRequirements(
|
||||
spec.ConnectionPool.Resources,
|
||||
c.makeDefaultConnPoolResources())
|
||||
effectiveMode := util.Coalesce(
|
||||
spec.ConnectionPool.Mode,
|
||||
c.OpConfig.ConnectionPool.Mode)
|
||||
|
||||
effectiveMode := spec.ConnectionPool.Mode
|
||||
if effectiveMode == nil {
|
||||
effectiveMode = &c.OpConfig.ConnectionPool.Mode
|
||||
effectiveDockerImage := util.Coalesce(
|
||||
spec.ConnectionPool.DockerImage,
|
||||
c.OpConfig.ConnectionPool.Image)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate resource requirements: %v", err)
|
||||
}
|
||||
|
||||
secretSelector := func(key string) *v1.SecretKeySelector {
|
||||
return &v1.SecretKeySelector{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: c.credentialSecretName(c.OpConfig.SuperUsername),
|
||||
},
|
||||
Key: key,
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate resource requirements: %v", err)
|
||||
}
|
||||
envVars := []v1.EnvVar{
|
||||
{
|
||||
Name: "PGHOST",
|
||||
Value: c.serviceAddress(Master),
|
||||
},
|
||||
{
|
||||
Name: "PGPORT",
|
||||
Value: c.servicePort(Master),
|
||||
},
|
||||
{
|
||||
Name: "PGUSER",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: secretSelector("username"),
|
||||
},
|
||||
},
|
||||
// the convention is to use the same schema name as
|
||||
// connection pool username
|
||||
{
|
||||
Name: "PGSCHEMA",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: secretSelector("username"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PGPASSWORD",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: secretSelector("password"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "CONNECTION_POOL_MODE",
|
||||
Value: effectiveMode,
|
||||
},
|
||||
{
|
||||
Name: "CONNECTION_POOL_PORT",
|
||||
Value: fmt.Sprint(pgPort),
|
||||
},
|
||||
}
|
||||
|
||||
secretSelector := func(key string) *v1.SecretKeySelector {
|
||||
return &v1.SecretKeySelector{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: c.credentialSecretName(c.OpConfig.SuperUsername),
|
||||
},
|
||||
Key: key,
|
||||
}
|
||||
}
|
||||
poolerContainer := v1.Container{
|
||||
Name: connectionPoolContainer,
|
||||
Image: effectiveDockerImage,
|
||||
ImagePullPolicy: v1.PullIfNotPresent,
|
||||
Resources: *resources,
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
ContainerPort: pgPort,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
Env: envVars,
|
||||
}
|
||||
|
||||
envVars := []v1.EnvVar{
|
||||
{
|
||||
Name: "PGHOST",
|
||||
Value: c.serviceAddress(Master),
|
||||
},
|
||||
{
|
||||
Name: "PGPORT",
|
||||
Value: c.servicePort(Master),
|
||||
},
|
||||
{
|
||||
Name: "PGUSER",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: secretSelector("username"),
|
||||
},
|
||||
},
|
||||
// the convention is to use the same schema name as
|
||||
// connection pool username
|
||||
{
|
||||
Name: "PGSCHEMA",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: secretSelector("username"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PGPASSWORD",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: secretSelector("password"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "CONNECTION_POOL_MODE",
|
||||
Value: *effectiveMode,
|
||||
},
|
||||
{
|
||||
Name: "CONNECTION_POOL_PORT",
|
||||
Value: fmt.Sprint(pgPort),
|
||||
},
|
||||
}
|
||||
|
||||
poolerContainer := v1.Container{
|
||||
Name: connectionPoolContainer,
|
||||
Image: c.OpConfig.ConnectionPool.Image,
|
||||
ImagePullPolicy: v1.PullIfNotPresent,
|
||||
Resources: *resources,
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
ContainerPort: pgPort,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
Env: envVars,
|
||||
}
|
||||
|
||||
podTemplate = &v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: c.connPoolLabelsSelector().MatchLabels,
|
||||
Namespace: c.Namespace,
|
||||
Annotations: c.generatePodAnnotations(spec),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
ServiceAccountName: c.OpConfig.PodServiceAccountName,
|
||||
TerminationGracePeriodSeconds: &gracePeriod,
|
||||
Containers: []v1.Container{poolerContainer},
|
||||
// TODO: add tolerations to scheduler pooler on the same node
|
||||
// as database
|
||||
//Tolerations: *tolerationsSpec,
|
||||
},
|
||||
}
|
||||
podTemplate := &v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: c.connPoolLabelsSelector().MatchLabels,
|
||||
Namespace: c.Namespace,
|
||||
Annotations: c.generatePodAnnotations(spec),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
ServiceAccountName: c.OpConfig.PodServiceAccountName,
|
||||
TerminationGracePeriodSeconds: &gracePeriod,
|
||||
Containers: []v1.Container{poolerContainer},
|
||||
// TODO: add tolerations to scheduler pooler on the same node
|
||||
// as database
|
||||
//Tolerations: *tolerationsSpec,
|
||||
},
|
||||
}
|
||||
|
||||
return podTemplate, nil
|
||||
|
|
|
|||
|
|
@ -613,21 +613,6 @@ func TestConnPoolPodSpec(t *testing.T) {
|
|||
cluster: cluster,
|
||||
check: testEnvs,
|
||||
},
|
||||
{
|
||||
subTest: "custom pod template",
|
||||
spec: &acidv1.PostgresSpec{
|
||||
ConnectionPool: &acidv1.ConnectionPool{
|
||||
PodTemplate: &v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-pod-template",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: nil,
|
||||
cluster: cluster,
|
||||
check: testCustomPodTemplate,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
podSpec, err := tt.cluster.generateConnPoolPodTemplate(tt.spec)
|
||||
|
|
|
|||
|
|
@ -101,9 +101,17 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolR
|
|||
var msg string
|
||||
c.setProcessName("creating connection pool")
|
||||
|
||||
err := lookup(
|
||||
c.OpConfig.ConnectionPool.Schema,
|
||||
c.OpConfig.ConnectionPool.User)
|
||||
schema := c.Spec.ConnectionPool.Schema
|
||||
if schema == "" {
|
||||
schema = c.OpConfig.ConnectionPool.Schema
|
||||
}
|
||||
|
||||
user := c.Spec.ConnectionPool.User
|
||||
if user == "" {
|
||||
user = c.OpConfig.ConnectionPool.User
|
||||
}
|
||||
|
||||
err := lookup(schema, user)
|
||||
|
||||
if err != nil {
|
||||
msg = "could not prepare database for connection pool: %v"
|
||||
|
|
@ -116,6 +124,9 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolR
|
|||
return nil, fmt.Errorf(msg, err)
|
||||
}
|
||||
|
||||
// client-go does retry 10 times (with NoBackoff by default) when the API
|
||||
// believe a request can be retried and returns Retry-After header. This
|
||||
// should be good enough to not think about it here.
|
||||
deployment, err := c.KubeClient.
|
||||
Deployments(deploymentSpec.Namespace).
|
||||
Create(deploymentSpec)
|
||||
|
|
@ -154,14 +165,22 @@ func (c *Cluster) deleteConnectionPool() (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Clean up the deployment object. If deployment resource we've remembered
|
||||
// is somehow empty, try to delete based on what would we generate
|
||||
deploymentName := c.connPoolName()
|
||||
deployment := c.ConnectionPool.Deployment
|
||||
|
||||
if deployment != nil {
|
||||
deploymentName = deployment.Name
|
||||
}
|
||||
|
||||
// set delete propagation policy to foreground, so that replica set will be
|
||||
// also deleted.
|
||||
policy := metav1.DeletePropagationForeground
|
||||
options := metav1.DeleteOptions{PropagationPolicy: &policy}
|
||||
deployment := c.ConnectionPool.Deployment
|
||||
err = c.KubeClient.
|
||||
Deployments(deployment.Namespace).
|
||||
Delete(deployment.Name, &options)
|
||||
Deployments(c.Namespace).
|
||||
Delete(deploymentName, &options)
|
||||
|
||||
if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Debugf("Connection pool deployment was already deleted")
|
||||
|
|
@ -172,12 +191,19 @@ func (c *Cluster) deleteConnectionPool() (err error) {
|
|||
c.logger.Infof("Connection pool deployment %q has been deleted",
|
||||
util.NameFromMeta(deployment.ObjectMeta))
|
||||
|
||||
// Repeat the same for the service object
|
||||
service := c.ConnectionPool.Service
|
||||
serviceName := c.connPoolName()
|
||||
|
||||
if service != nil {
|
||||
serviceName = service.Name
|
||||
}
|
||||
|
||||
// set delete propagation policy to foreground, so that all the dependant
|
||||
// will be deleted.
|
||||
service := c.ConnectionPool.Service
|
||||
err = c.KubeClient.
|
||||
Services(service.Namespace).
|
||||
Delete(service.Name, &options)
|
||||
Services(c.Namespace).
|
||||
Delete(serviceName, &options)
|
||||
|
||||
if !k8sutil.ResourceNotFound(err) {
|
||||
c.logger.Debugf("Connection pool service was already deleted")
|
||||
|
|
@ -823,3 +849,34 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet {
|
|||
func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget {
|
||||
return c.PodDisruptionBudget
|
||||
}
|
||||
|
||||
// Perform actual patching of a connection pool deployment, assuming that all
|
||||
// the check were already done before.
|
||||
func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) {
|
||||
c.setProcessName("updating connection pool")
|
||||
if c.ConnectionPool == nil || c.ConnectionPool.Deployment == nil {
|
||||
return nil, fmt.Errorf("there is no connection pool in the cluster")
|
||||
}
|
||||
|
||||
patchData, err := specPatch(newDeployment.Spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not form patch for the deployment: %v", err)
|
||||
}
|
||||
|
||||
// An update probably requires RetryOnConflict, but since only one operator
|
||||
// worker at one time will try to update it changes of conflicts are
|
||||
// minimal.
|
||||
deployment, err := c.KubeClient.
|
||||
Deployments(c.ConnectionPool.Deployment.Namespace).
|
||||
Patch(
|
||||
c.ConnectionPool.Deployment.Name,
|
||||
types.MergePatchType,
|
||||
patchData, "")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not patch deployment: %v", err)
|
||||
}
|
||||
|
||||
c.ConnectionPool.Deployment = deployment
|
||||
|
||||
return deployment, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package cluster
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
|
|
@ -23,6 +24,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
oldSpec := c.Postgresql
|
||||
c.setSpec(newSpec)
|
||||
|
||||
defer func() {
|
||||
|
|
@ -108,6 +110,20 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
|||
}
|
||||
}
|
||||
|
||||
// connection pool
|
||||
oldPool := oldSpec.Spec.ConnectionPool
|
||||
newPool := newSpec.Spec.ConnectionPool
|
||||
if c.needConnectionPool() &&
|
||||
(c.ConnectionPool == nil || !reflect.DeepEqual(oldPool, newPool)) {
|
||||
|
||||
c.logger.Debug("syncing connection pool")
|
||||
|
||||
if err := c.syncConnectionPool(&oldSpec, newSpec); err != nil {
|
||||
c.logger.Errorf("could not sync connection pool: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -594,3 +610,98 @@ func (c *Cluster) syncLogicalBackupJob() error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Synchronize connection pool resources. Effectively we're interested only in
|
||||
// synchronizing the corresponding deployment, but in case of deployment or
|
||||
// service is missing, create it. After checking, also remember an object for
|
||||
// the future references.
|
||||
func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||
if c.ConnectionPool == nil {
|
||||
c.logger.Warning("Connection pool resources are empty")
|
||||
c.ConnectionPool = &ConnectionPoolResources{}
|
||||
}
|
||||
|
||||
deployment, err := c.KubeClient.
|
||||
Deployments(c.Namespace).
|
||||
Get(c.connPoolName(), metav1.GetOptions{})
|
||||
|
||||
if err != nil && k8sutil.ResourceNotFound(err) {
|
||||
msg := "Deployment %s for connection pool synchronization is not found, create it"
|
||||
c.logger.Warningf(msg, c.connPoolName())
|
||||
|
||||
deploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec)
|
||||
if err != nil {
|
||||
msg = "could not generate deployment for connection pool: %v"
|
||||
return fmt.Errorf(msg, err)
|
||||
}
|
||||
|
||||
deployment, err := c.KubeClient.
|
||||
Deployments(deploymentSpec.Namespace).
|
||||
Create(deploymentSpec)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.ConnectionPool.Deployment = deployment
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("could not get connection pool deployment to sync: %v", err)
|
||||
} else {
|
||||
c.ConnectionPool.Deployment = deployment
|
||||
|
||||
// actual synchronization
|
||||
oldConnPool := oldSpec.Spec.ConnectionPool
|
||||
newConnPool := newSpec.Spec.ConnectionPool
|
||||
sync, reason := c.needSyncConnPoolDeployments(oldConnPool, newConnPool)
|
||||
if sync {
|
||||
c.logger.Infof("Update connection pool deployment %s, reason: %s",
|
||||
c.connPoolName(), reason)
|
||||
|
||||
newDeploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec)
|
||||
if err != nil {
|
||||
msg := "could not generate deployment for connection pool: %v"
|
||||
return fmt.Errorf(msg, err)
|
||||
}
|
||||
|
||||
oldDeploymentSpec := c.ConnectionPool.Deployment
|
||||
|
||||
deployment, err := c.updateConnPoolDeployment(
|
||||
oldDeploymentSpec,
|
||||
newDeploymentSpec)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.ConnectionPool.Deployment = deployment
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
service, err := c.KubeClient.
|
||||
Services(c.Namespace).
|
||||
Get(c.connPoolName(), metav1.GetOptions{})
|
||||
|
||||
if err != nil && k8sutil.ResourceNotFound(err) {
|
||||
msg := "Service %s for connection pool synchronization is not found, create it"
|
||||
c.logger.Warningf(msg, c.connPoolName())
|
||||
|
||||
serviceSpec := c.generateConnPoolService(&newSpec.Spec)
|
||||
service, err := c.KubeClient.
|
||||
Services(serviceSpec.Namespace).
|
||||
Create(serviceSpec)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.ConnectionPool.Service = service
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("could not get connection pool service to sync: %v", err)
|
||||
} else {
|
||||
// Service updates are not supported and probably not that useful anyway
|
||||
c.ConnectionPool.Service = service
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,125 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
||||
"github.com/zalando/postgres-operator/pkg/util/config"
|
||||
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func int32ToPointer(value int32) *int32 {
|
||||
return &value
|
||||
}
|
||||
|
||||
func deploymentUpdated(cluster *Cluster, err error) error {
|
||||
if cluster.ConnectionPool.Deployment.Spec.Replicas == nil ||
|
||||
*cluster.ConnectionPool.Deployment.Spec.Replicas != 2 {
|
||||
return fmt.Errorf("Wrong nubmer of instances")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func objectsAreSaved(cluster *Cluster, err error) error {
|
||||
if cluster.ConnectionPool == nil {
|
||||
return fmt.Errorf("Connection pool resources are empty")
|
||||
}
|
||||
|
||||
if cluster.ConnectionPool.Deployment == nil {
|
||||
return fmt.Errorf("Deployment was not saved")
|
||||
}
|
||||
|
||||
if cluster.ConnectionPool.Service == nil {
|
||||
return fmt.Errorf("Service was not saved")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestConnPoolSynchronization(t *testing.T) {
|
||||
testName := "Test connection pool synchronization"
|
||||
var cluster = New(
|
||||
Config{
|
||||
OpConfig: config.Config{
|
||||
ProtectedRoles: []string{"admin"},
|
||||
Auth: config.Auth{
|
||||
SuperUsername: superUserName,
|
||||
ReplicationUsername: replicationUserName,
|
||||
},
|
||||
ConnectionPool: config.ConnectionPool{
|
||||
ConnPoolDefaultCPURequest: "100m",
|
||||
ConnPoolDefaultCPULimit: "100m",
|
||||
ConnPoolDefaultMemoryRequest: "100M",
|
||||
ConnPoolDefaultMemoryLimit: "100M",
|
||||
},
|
||||
},
|
||||
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
|
||||
|
||||
cluster.Statefulset = &appsv1.StatefulSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-sts",
|
||||
},
|
||||
}
|
||||
|
||||
clusterMissingObjects := *cluster
|
||||
clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects()
|
||||
|
||||
clusterMock := *cluster
|
||||
clusterMock.KubeClient = k8sutil.NewMockKubernetesClient()
|
||||
|
||||
tests := []struct {
|
||||
subTest string
|
||||
oldSpec *acidv1.Postgresql
|
||||
newSpec *acidv1.Postgresql
|
||||
cluster *Cluster
|
||||
check func(cluster *Cluster, err error) error
|
||||
}{
|
||||
{
|
||||
subTest: "create if doesn't exist",
|
||||
oldSpec: &acidv1.Postgresql{
|
||||
Spec: acidv1.PostgresSpec{
|
||||
ConnectionPool: &acidv1.ConnectionPool{},
|
||||
},
|
||||
},
|
||||
newSpec: &acidv1.Postgresql{
|
||||
Spec: acidv1.PostgresSpec{
|
||||
ConnectionPool: &acidv1.ConnectionPool{},
|
||||
},
|
||||
},
|
||||
cluster: &clusterMissingObjects,
|
||||
check: objectsAreSaved,
|
||||
},
|
||||
{
|
||||
subTest: "update deployment",
|
||||
oldSpec: &acidv1.Postgresql{
|
||||
Spec: acidv1.PostgresSpec{
|
||||
ConnectionPool: &acidv1.ConnectionPool{
|
||||
NumberOfInstances: int32ToPointer(1),
|
||||
},
|
||||
},
|
||||
},
|
||||
newSpec: &acidv1.Postgresql{
|
||||
Spec: acidv1.PostgresSpec{
|
||||
ConnectionPool: &acidv1.ConnectionPool{
|
||||
NumberOfInstances: int32ToPointer(2),
|
||||
},
|
||||
},
|
||||
},
|
||||
cluster: &clusterMock,
|
||||
check: deploymentUpdated,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
err := tt.cluster.syncConnectionPool(tt.oldSpec, tt.newSpec)
|
||||
|
||||
if err := tt.check(tt.cluster, err); err != nil {
|
||||
t.Errorf("%s [%s]: Could not synchronize, %+v",
|
||||
testName, tt.subTest, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -497,5 +497,5 @@ func (c *Cluster) patroniUsesKubernetes() bool {
|
|||
}
|
||||
|
||||
func (c *Cluster) needConnectionPool() bool {
|
||||
return c.Spec.ConnectionPool != nil
|
||||
return c.Spec.ConnectionPool != nil || c.Spec.EnableConnectionPool == true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,9 @@ import (
|
|||
"time"
|
||||
|
||||
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
||||
"github.com/zalando/postgres-operator/pkg/util"
|
||||
"github.com/zalando/postgres-operator/pkg/util/config"
|
||||
"github.com/zalando/postgres-operator/pkg/util/constants"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
|
|
@ -142,17 +144,52 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
|
|||
result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit
|
||||
result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit
|
||||
|
||||
// connection pool
|
||||
// Connection pool. Looks like we can't use defaulting in CRD before 1.17,
|
||||
// so ensure default values here.
|
||||
result.ConnectionPool.NumberOfInstances = fromCRD.ConnectionPool.NumberOfInstances
|
||||
result.ConnectionPool.Schema = fromCRD.ConnectionPool.Schema
|
||||
result.ConnectionPool.User = fromCRD.ConnectionPool.User
|
||||
result.ConnectionPool.Type = fromCRD.ConnectionPool.Type
|
||||
result.ConnectionPool.Image = fromCRD.ConnectionPool.Image
|
||||
result.ConnectionPool.Mode = fromCRD.ConnectionPool.Mode
|
||||
result.ConnectionPool.ConnPoolDefaultCPURequest = fromCRD.ConnectionPool.DefaultCPURequest
|
||||
result.ConnectionPool.ConnPoolDefaultMemoryRequest = fromCRD.ConnectionPool.DefaultMemoryRequest
|
||||
result.ConnectionPool.ConnPoolDefaultCPULimit = fromCRD.ConnectionPool.DefaultCPULimit
|
||||
result.ConnectionPool.ConnPoolDefaultMemoryLimit = fromCRD.ConnectionPool.DefaultMemoryLimit
|
||||
if result.ConnectionPool.NumberOfInstances == nil ||
|
||||
*result.ConnectionPool.NumberOfInstances < 1 {
|
||||
var value int32
|
||||
|
||||
value = 1
|
||||
result.ConnectionPool.NumberOfInstances = &value
|
||||
}
|
||||
|
||||
result.ConnectionPool.Schema = util.Coalesce(
|
||||
fromCRD.ConnectionPool.Schema,
|
||||
constants.ConnectionPoolSchemaName)
|
||||
|
||||
result.ConnectionPool.User = util.Coalesce(
|
||||
fromCRD.ConnectionPool.User,
|
||||
constants.ConnectionPoolUserName)
|
||||
|
||||
result.ConnectionPool.Type = util.Coalesce(
|
||||
fromCRD.ConnectionPool.Type,
|
||||
constants.ConnectionPoolDefaultType)
|
||||
|
||||
result.ConnectionPool.Image = util.Coalesce(
|
||||
fromCRD.ConnectionPool.Image,
|
||||
"pgbouncer:0.0.1")
|
||||
|
||||
result.ConnectionPool.Mode = util.Coalesce(
|
||||
fromCRD.ConnectionPool.Mode,
|
||||
constants.ConnectionPoolDefaultMode)
|
||||
|
||||
result.ConnectionPool.ConnPoolDefaultCPURequest = util.Coalesce(
|
||||
fromCRD.ConnectionPool.DefaultCPURequest,
|
||||
constants.ConnectionPoolDefaultCpuRequest)
|
||||
|
||||
result.ConnectionPool.ConnPoolDefaultMemoryRequest = util.Coalesce(
|
||||
fromCRD.ConnectionPool.DefaultMemoryRequest,
|
||||
constants.ConnectionPoolDefaultMemoryRequest)
|
||||
|
||||
result.ConnectionPool.ConnPoolDefaultCPULimit = util.Coalesce(
|
||||
fromCRD.ConnectionPool.DefaultCPULimit,
|
||||
constants.ConnectionPoolDefaultCpuLimit)
|
||||
|
||||
result.ConnectionPool.ConnPoolDefaultMemoryLimit = util.Coalesce(
|
||||
fromCRD.ConnectionPool.DefaultMemoryLimit,
|
||||
constants.ConnectionPoolDefaultMemoryLimit)
|
||||
|
||||
return result
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,13 @@
|
|||
package constants
|
||||
|
||||
// Connection pool specific constants
|
||||
const (
|
||||
ConnectionPoolUserName = "pooler"
|
||||
ConnectionPoolSchemaName = "pooler"
|
||||
ConnectionPoolDefaultType = "pgbouncer"
|
||||
ConnectionPoolDefaultMode = "transition"
|
||||
ConnectionPoolDefaultCpuRequest = "100m"
|
||||
ConnectionPoolDefaultCpuLimit = "100m"
|
||||
ConnectionPoolDefaultMemoryRequest = "100M"
|
||||
ConnectionPoolDefaultMemoryLimit = "100M"
|
||||
)
|
||||
|
|
@ -15,6 +15,7 @@ import (
|
|||
apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
|
||||
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
|
@ -27,6 +28,10 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func int32ToPointer(value int32) *int32 {
|
||||
return &value
|
||||
}
|
||||
|
||||
// KubernetesClient describes getters for Kubernetes objects
|
||||
type KubernetesClient struct {
|
||||
corev1.SecretsGetter
|
||||
|
|
@ -61,16 +66,30 @@ type mockDeployment struct {
|
|||
appsv1.DeploymentInterface
|
||||
}
|
||||
|
||||
type mockDeploymentNotExist struct {
|
||||
appsv1.DeploymentInterface
|
||||
}
|
||||
|
||||
type MockDeploymentGetter struct {
|
||||
}
|
||||
|
||||
type MockDeploymentNotExistGetter struct {
|
||||
}
|
||||
|
||||
type mockService struct {
|
||||
corev1.ServiceInterface
|
||||
}
|
||||
|
||||
type mockServiceNotExist struct {
|
||||
corev1.ServiceInterface
|
||||
}
|
||||
|
||||
type MockServiceGetter struct {
|
||||
}
|
||||
|
||||
type MockServiceNotExistGetter struct {
|
||||
}
|
||||
|
||||
type mockConfigMap struct {
|
||||
corev1.ConfigMapInterface
|
||||
}
|
||||
|
|
@ -260,6 +279,10 @@ func (mock *MockDeploymentGetter) Deployments(namespace string) appsv1.Deploymen
|
|||
return &mockDeployment{}
|
||||
}
|
||||
|
||||
func (mock *MockDeploymentNotExistGetter) Deployments(namespace string) appsv1.DeploymentInterface {
|
||||
return &mockDeploymentNotExist{}
|
||||
}
|
||||
|
||||
func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) {
|
||||
return &apiappsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
|
@ -272,10 +295,49 @@ func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) erro
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) {
|
||||
return &apiappsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-deployment",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) {
|
||||
return &apiappsv1.Deployment{
|
||||
Spec: apiappsv1.DeploymentSpec{
|
||||
Replicas: int32ToPointer(2),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-deployment",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) {
|
||||
return nil, &apierrors.StatusError{
|
||||
ErrStatus: metav1.Status{
|
||||
Reason: metav1.StatusReasonNotFound,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (mock *mockDeploymentNotExist) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) {
|
||||
return &apiappsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-deployment",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (mock *MockServiceGetter) Services(namespace string) corev1.ServiceInterface {
|
||||
return &mockService{}
|
||||
}
|
||||
|
||||
func (mock *MockServiceNotExistGetter) Services(namespace string) corev1.ServiceInterface {
|
||||
return &mockServiceNotExist{}
|
||||
}
|
||||
|
||||
func (mock *mockService) Create(*v1.Service) (*v1.Service, error) {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
|
@ -288,6 +350,30 @@ func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, error) {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-service",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-service",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (mock *mockServiceNotExist) Get(name string, opts metav1.GetOptions) (*v1.Service, error) {
|
||||
return nil, &apierrors.StatusError{
|
||||
ErrStatus: metav1.Status{
|
||||
Reason: metav1.StatusReasonNotFound,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NewMockKubernetesClient for other tests
|
||||
func NewMockKubernetesClient() KubernetesClient {
|
||||
return KubernetesClient{
|
||||
|
|
@ -297,3 +383,10 @@ func NewMockKubernetesClient() KubernetesClient {
|
|||
ServicesGetter: &MockServiceGetter{},
|
||||
}
|
||||
}
|
||||
|
||||
func ClientMissingObjects() KubernetesClient {
|
||||
return KubernetesClient{
|
||||
DeploymentsGetter: &MockDeploymentNotExistGetter{},
|
||||
ServicesGetter: &MockServiceNotExistGetter{},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue