Various improvements

Add synchronization logic. For now get rid of podTemplate, type fields.
Add crd validation & configuration part, put retry on top of lookup
function installation.
This commit is contained in:
Dmitrii Dolgov 2020-02-12 17:28:48 +01:00
parent b40ea2c426
commit 6c3752068b
14 changed files with 653 additions and 135 deletions

View File

@ -106,6 +106,7 @@ rules:
- apps
resources:
- statefulsets
- deployments
verbs:
- create
- delete

View File

@ -294,6 +294,43 @@ spec:
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
scalyr_server_url:
type: string
connection_pool:
type: object
properties:
connection_pool_schema:
type: string
#default: "pooler"
connection_pool_user:
type: string
#default: "pooler"
connection_pool_instances_number:
type: integer
#default: 1
connection_pool_image:
type: string
#default: "pierone.stups.zalan.do/acid/pgbouncer:0.0.1"
connection_pool_mode:
type: string
enum:
- "session"
- "transaction"
#default: "transaction"
connection_pool_default_cpu_limit:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
#default: "1"
connection_pool_default_cpu_request:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
#default: "1"
connection_pool_default_memory_limit:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
#default: "100m"
connection_pool_default_memory_request:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
#default: "100m"
status:
type: object
additionalProperties:

View File

@ -27,7 +27,8 @@ type PostgresSpec struct {
Patroni `json:"patroni,omitempty"`
Resources `json:"resources,omitempty"`
ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"`
EnableConnectionPool bool `json:"enable_connection_pool,omitempty"`
ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"`
TeamID string `json:"teamId"`
DockerImage string `json:"dockerImage,omitempty"`
@ -159,12 +160,15 @@ type PostgresStatus struct {
// Options for connection pooler
type ConnectionPool struct {
NumberOfInstances *int32 `json:"instancesNumber,omitempty"`
Schema *string `json:"schema,omitempty"`
User *string `json:"user,omitempty"`
Type *string `json:"type,omitempty"`
Mode *string `json:"mode,omitempty"`
PodTemplate *v1.PodTemplateSpec `json:"podTemplate,omitempty"`
NumberOfInstances *int32 `json:"instancesNumber,omitempty"`
Schema string `json:"schema,omitempty"`
User string `json:"user,omitempty"`
Mode string `json:"mode,omitempty"`
DockerImage string `json:"dockerImage,omitempty"`
// TODO: prepared snippets of configuration, one can choose via type, e.g.
// pgbouncer-large (with higher resources) or odyssey-small (with smaller
// resources)
// Type string `json:"type,omitempty"`
Resources `json:"resources,omitempty"`
}

View File

@ -11,6 +11,7 @@ import (
"sync"
"time"
"github.com/r3labs/diff"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
@ -723,6 +724,17 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}
}
// connection pool
if !reflect.DeepEqual(oldSpec.Spec.ConnectionPool,
newSpec.Spec.ConnectionPool) {
c.logger.Debug("syncing connection pool")
if err := c.syncConnectionPool(oldSpec, newSpec); err != nil {
c.logger.Errorf("could not sync connection pool: %v", err)
updateFailed = true
}
}
return nil
}
@ -852,13 +864,13 @@ func (c *Cluster) initSystemUsers() {
if c.needConnectionPool() {
username := c.Spec.ConnectionPool.User
if username == nil {
username = &c.OpConfig.ConnectionPool.User
if username == "" {
username = c.OpConfig.ConnectionPool.User
}
c.systemUsers[constants.ConnectionPoolUserKeyName] = spec.PgUser{
Origin: spec.RoleConnectionPool,
Name: *username,
Name: username,
Password: util.RandomPassword(constants.PasswordLength),
}
}
@ -1188,3 +1200,29 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error {
return c.deleteClusterObject(get, deleteConfigMapFn, "configmap")
}
// Test if two connection pool configuration needs to be synced. For simplicity
// compare not the actual K8S objects, but the configuration itself and request
// sync if there is any difference.
func (c *Cluster) needSyncConnPoolDeployments(oldSpec, newSpec *acidv1.ConnectionPool) (sync bool, reasons []string) {
reasons = []string{}
sync = false
changelog, err := diff.Diff(oldSpec, newSpec)
if err != nil {
c.logger.Infof("Cannot get diff, do not do anything, %+v", err)
return false, reasons
} else {
if len(changelog) > 0 {
sync = true
}
for _, change := range changelog {
msg := fmt.Sprintf("%s %+v from %s to %s",
change.Type, change.Path, change.From, change.To)
reasons = append(reasons, msg)
}
}
return sync, reasons
}

View File

@ -300,8 +300,26 @@ func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error {
params, err)
}
if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil {
return fmt.Errorf("could not execute sql statement %s: %v",
// golang sql will do retries couple of times if pq driver reports
// connections issues (driver.ErrBadConn), but since our query is
// idempotent, we can retry in a view of other errors (e.g. due to
// failover a db is temporary in a read-only mode or so) to make sure
// it was applied.
execErr := retryutil.Retry(
constants.PostgresConnectTimeout,
constants.PostgresConnectRetryTimeout,
func() (bool, error) {
if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil {
msg := fmt.Errorf("could not execute sql statement %s: %v",
stmtBytes.String(), err)
return false, msg
}
return true, nil
})
if execErr != nil {
return fmt.Errorf("could not execute after retries %s: %v",
stmtBytes.String(), err)
}

View File

@ -1734,100 +1734,99 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) {
func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) (
*v1.PodTemplateSpec, error) {
podTemplate := spec.ConnectionPool.PodTemplate
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
resources, err := generateResourceRequirements(
spec.ConnectionPool.Resources,
c.makeDefaultConnPoolResources())
if podTemplate == nil {
gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds())
resources, err := generateResourceRequirements(
spec.ConnectionPool.Resources,
c.makeDefaultConnPoolResources())
effectiveMode := util.Coalesce(
spec.ConnectionPool.Mode,
c.OpConfig.ConnectionPool.Mode)
effectiveMode := spec.ConnectionPool.Mode
if effectiveMode == nil {
effectiveMode = &c.OpConfig.ConnectionPool.Mode
effectiveDockerImage := util.Coalesce(
spec.ConnectionPool.DockerImage,
c.OpConfig.ConnectionPool.Image)
if err != nil {
return nil, fmt.Errorf("could not generate resource requirements: %v", err)
}
secretSelector := func(key string) *v1.SecretKeySelector {
return &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: c.credentialSecretName(c.OpConfig.SuperUsername),
},
Key: key,
}
}
if err != nil {
return nil, fmt.Errorf("could not generate resource requirements: %v", err)
}
envVars := []v1.EnvVar{
{
Name: "PGHOST",
Value: c.serviceAddress(Master),
},
{
Name: "PGPORT",
Value: c.servicePort(Master),
},
{
Name: "PGUSER",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("username"),
},
},
// the convention is to use the same schema name as
// connection pool username
{
Name: "PGSCHEMA",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("username"),
},
},
{
Name: "PGPASSWORD",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("password"),
},
},
{
Name: "CONNECTION_POOL_MODE",
Value: effectiveMode,
},
{
Name: "CONNECTION_POOL_PORT",
Value: fmt.Sprint(pgPort),
},
}
secretSelector := func(key string) *v1.SecretKeySelector {
return &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: c.credentialSecretName(c.OpConfig.SuperUsername),
},
Key: key,
}
}
poolerContainer := v1.Container{
Name: connectionPoolContainer,
Image: effectiveDockerImage,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: *resources,
Ports: []v1.ContainerPort{
{
ContainerPort: pgPort,
Protocol: v1.ProtocolTCP,
},
},
Env: envVars,
}
envVars := []v1.EnvVar{
{
Name: "PGHOST",
Value: c.serviceAddress(Master),
},
{
Name: "PGPORT",
Value: c.servicePort(Master),
},
{
Name: "PGUSER",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("username"),
},
},
// the convention is to use the same schema name as
// connection pool username
{
Name: "PGSCHEMA",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("username"),
},
},
{
Name: "PGPASSWORD",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: secretSelector("password"),
},
},
{
Name: "CONNECTION_POOL_MODE",
Value: *effectiveMode,
},
{
Name: "CONNECTION_POOL_PORT",
Value: fmt.Sprint(pgPort),
},
}
poolerContainer := v1.Container{
Name: connectionPoolContainer,
Image: c.OpConfig.ConnectionPool.Image,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: *resources,
Ports: []v1.ContainerPort{
{
ContainerPort: pgPort,
Protocol: v1.ProtocolTCP,
},
},
Env: envVars,
}
podTemplate = &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: c.connPoolLabelsSelector().MatchLabels,
Namespace: c.Namespace,
Annotations: c.generatePodAnnotations(spec),
},
Spec: v1.PodSpec{
ServiceAccountName: c.OpConfig.PodServiceAccountName,
TerminationGracePeriodSeconds: &gracePeriod,
Containers: []v1.Container{poolerContainer},
// TODO: add tolerations to scheduler pooler on the same node
// as database
//Tolerations: *tolerationsSpec,
},
}
podTemplate := &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: c.connPoolLabelsSelector().MatchLabels,
Namespace: c.Namespace,
Annotations: c.generatePodAnnotations(spec),
},
Spec: v1.PodSpec{
ServiceAccountName: c.OpConfig.PodServiceAccountName,
TerminationGracePeriodSeconds: &gracePeriod,
Containers: []v1.Container{poolerContainer},
// TODO: add tolerations to scheduler pooler on the same node
// as database
//Tolerations: *tolerationsSpec,
},
}
return podTemplate, nil

View File

@ -613,21 +613,6 @@ func TestConnPoolPodSpec(t *testing.T) {
cluster: cluster,
check: testEnvs,
},
{
subTest: "custom pod template",
spec: &acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{
PodTemplate: &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-template",
},
},
},
},
expected: nil,
cluster: cluster,
check: testCustomPodTemplate,
},
}
for _, tt := range tests {
podSpec, err := tt.cluster.generateConnPoolPodTemplate(tt.spec)

View File

@ -101,9 +101,17 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolR
var msg string
c.setProcessName("creating connection pool")
err := lookup(
c.OpConfig.ConnectionPool.Schema,
c.OpConfig.ConnectionPool.User)
schema := c.Spec.ConnectionPool.Schema
if schema == "" {
schema = c.OpConfig.ConnectionPool.Schema
}
user := c.Spec.ConnectionPool.User
if user == "" {
user = c.OpConfig.ConnectionPool.User
}
err := lookup(schema, user)
if err != nil {
msg = "could not prepare database for connection pool: %v"
@ -116,6 +124,9 @@ func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolR
return nil, fmt.Errorf(msg, err)
}
// client-go does retry 10 times (with NoBackoff by default) when the API
// believe a request can be retried and returns Retry-After header. This
// should be good enough to not think about it here.
deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(deploymentSpec)
@ -154,14 +165,22 @@ func (c *Cluster) deleteConnectionPool() (err error) {
return nil
}
// Clean up the deployment object. If deployment resource we've remembered
// is somehow empty, try to delete based on what would we generate
deploymentName := c.connPoolName()
deployment := c.ConnectionPool.Deployment
if deployment != nil {
deploymentName = deployment.Name
}
// set delete propagation policy to foreground, so that replica set will be
// also deleted.
policy := metav1.DeletePropagationForeground
options := metav1.DeleteOptions{PropagationPolicy: &policy}
deployment := c.ConnectionPool.Deployment
err = c.KubeClient.
Deployments(deployment.Namespace).
Delete(deployment.Name, &options)
Deployments(c.Namespace).
Delete(deploymentName, &options)
if !k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Connection pool deployment was already deleted")
@ -172,12 +191,19 @@ func (c *Cluster) deleteConnectionPool() (err error) {
c.logger.Infof("Connection pool deployment %q has been deleted",
util.NameFromMeta(deployment.ObjectMeta))
// Repeat the same for the service object
service := c.ConnectionPool.Service
serviceName := c.connPoolName()
if service != nil {
serviceName = service.Name
}
// set delete propagation policy to foreground, so that all the dependant
// will be deleted.
service := c.ConnectionPool.Service
err = c.KubeClient.
Services(service.Namespace).
Delete(service.Name, &options)
Services(c.Namespace).
Delete(serviceName, &options)
if !k8sutil.ResourceNotFound(err) {
c.logger.Debugf("Connection pool service was already deleted")
@ -823,3 +849,34 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet {
func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget {
return c.PodDisruptionBudget
}
// Perform actual patching of a connection pool deployment, assuming that all
// the check were already done before.
func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) {
c.setProcessName("updating connection pool")
if c.ConnectionPool == nil || c.ConnectionPool.Deployment == nil {
return nil, fmt.Errorf("there is no connection pool in the cluster")
}
patchData, err := specPatch(newDeployment.Spec)
if err != nil {
return nil, fmt.Errorf("could not form patch for the deployment: %v", err)
}
// An update probably requires RetryOnConflict, but since only one operator
// worker at one time will try to update it changes of conflicts are
// minimal.
deployment, err := c.KubeClient.
Deployments(c.ConnectionPool.Deployment.Namespace).
Patch(
c.ConnectionPool.Deployment.Name,
types.MergePatchType,
patchData, "")
if err != nil {
return nil, fmt.Errorf("could not patch deployment: %v", err)
}
c.ConnectionPool.Deployment = deployment
return deployment, nil
}

View File

@ -2,6 +2,7 @@ package cluster
import (
"fmt"
"reflect"
batchv1beta1 "k8s.io/api/batch/v1beta1"
v1 "k8s.io/api/core/v1"
@ -23,6 +24,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
c.mu.Lock()
defer c.mu.Unlock()
oldSpec := c.Postgresql
c.setSpec(newSpec)
defer func() {
@ -108,6 +110,20 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
}
}
// connection pool
oldPool := oldSpec.Spec.ConnectionPool
newPool := newSpec.Spec.ConnectionPool
if c.needConnectionPool() &&
(c.ConnectionPool == nil || !reflect.DeepEqual(oldPool, newPool)) {
c.logger.Debug("syncing connection pool")
if err := c.syncConnectionPool(&oldSpec, newSpec); err != nil {
c.logger.Errorf("could not sync connection pool: %v", err)
return err
}
}
return err
}
@ -594,3 +610,98 @@ func (c *Cluster) syncLogicalBackupJob() error {
return nil
}
// Synchronize connection pool resources. Effectively we're interested only in
// synchronizing the corresponding deployment, but in case of deployment or
// service is missing, create it. After checking, also remember an object for
// the future references.
func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql) error {
if c.ConnectionPool == nil {
c.logger.Warning("Connection pool resources are empty")
c.ConnectionPool = &ConnectionPoolResources{}
}
deployment, err := c.KubeClient.
Deployments(c.Namespace).
Get(c.connPoolName(), metav1.GetOptions{})
if err != nil && k8sutil.ResourceNotFound(err) {
msg := "Deployment %s for connection pool synchronization is not found, create it"
c.logger.Warningf(msg, c.connPoolName())
deploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec)
if err != nil {
msg = "could not generate deployment for connection pool: %v"
return fmt.Errorf(msg, err)
}
deployment, err := c.KubeClient.
Deployments(deploymentSpec.Namespace).
Create(deploymentSpec)
if err != nil {
return err
}
c.ConnectionPool.Deployment = deployment
} else if err != nil {
return fmt.Errorf("could not get connection pool deployment to sync: %v", err)
} else {
c.ConnectionPool.Deployment = deployment
// actual synchronization
oldConnPool := oldSpec.Spec.ConnectionPool
newConnPool := newSpec.Spec.ConnectionPool
sync, reason := c.needSyncConnPoolDeployments(oldConnPool, newConnPool)
if sync {
c.logger.Infof("Update connection pool deployment %s, reason: %s",
c.connPoolName(), reason)
newDeploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec)
if err != nil {
msg := "could not generate deployment for connection pool: %v"
return fmt.Errorf(msg, err)
}
oldDeploymentSpec := c.ConnectionPool.Deployment
deployment, err := c.updateConnPoolDeployment(
oldDeploymentSpec,
newDeploymentSpec)
if err != nil {
return err
}
c.ConnectionPool.Deployment = deployment
return nil
}
}
service, err := c.KubeClient.
Services(c.Namespace).
Get(c.connPoolName(), metav1.GetOptions{})
if err != nil && k8sutil.ResourceNotFound(err) {
msg := "Service %s for connection pool synchronization is not found, create it"
c.logger.Warningf(msg, c.connPoolName())
serviceSpec := c.generateConnPoolService(&newSpec.Spec)
service, err := c.KubeClient.
Services(serviceSpec.Namespace).
Create(serviceSpec)
if err != nil {
return err
}
c.ConnectionPool.Service = service
} else if err != nil {
return fmt.Errorf("could not get connection pool service to sync: %v", err)
} else {
// Service updates are not supported and probably not that useful anyway
c.ConnectionPool.Service = service
}
return nil
}

125
pkg/cluster/sync_test.go Normal file
View File

@ -0,0 +1,125 @@
package cluster
import (
"fmt"
"testing"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func int32ToPointer(value int32) *int32 {
return &value
}
func deploymentUpdated(cluster *Cluster, err error) error {
if cluster.ConnectionPool.Deployment.Spec.Replicas == nil ||
*cluster.ConnectionPool.Deployment.Spec.Replicas != 2 {
return fmt.Errorf("Wrong nubmer of instances")
}
return nil
}
func objectsAreSaved(cluster *Cluster, err error) error {
if cluster.ConnectionPool == nil {
return fmt.Errorf("Connection pool resources are empty")
}
if cluster.ConnectionPool.Deployment == nil {
return fmt.Errorf("Deployment was not saved")
}
if cluster.ConnectionPool.Service == nil {
return fmt.Errorf("Service was not saved")
}
return nil
}
func TestConnPoolSynchronization(t *testing.T) {
testName := "Test connection pool synchronization"
var cluster = New(
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
ConnectionPool: config.ConnectionPool{
ConnPoolDefaultCPURequest: "100m",
ConnPoolDefaultCPULimit: "100m",
ConnPoolDefaultMemoryRequest: "100M",
ConnPoolDefaultMemoryLimit: "100M",
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sts",
},
}
clusterMissingObjects := *cluster
clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects()
clusterMock := *cluster
clusterMock.KubeClient = k8sutil.NewMockKubernetesClient()
tests := []struct {
subTest string
oldSpec *acidv1.Postgresql
newSpec *acidv1.Postgresql
cluster *Cluster
check func(cluster *Cluster, err error) error
}{
{
subTest: "create if doesn't exist",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{},
},
},
cluster: &clusterMissingObjects,
check: objectsAreSaved,
},
{
subTest: "update deployment",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{
NumberOfInstances: int32ToPointer(1),
},
},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
ConnectionPool: &acidv1.ConnectionPool{
NumberOfInstances: int32ToPointer(2),
},
},
},
cluster: &clusterMock,
check: deploymentUpdated,
},
}
for _, tt := range tests {
err := tt.cluster.syncConnectionPool(tt.oldSpec, tt.newSpec)
if err := tt.check(tt.cluster, err); err != nil {
t.Errorf("%s [%s]: Could not synchronize, %+v",
testName, tt.subTest, err)
}
}
}

View File

@ -497,5 +497,5 @@ func (c *Cluster) patroniUsesKubernetes() bool {
}
func (c *Cluster) needConnectionPool() bool {
return c.Spec.ConnectionPool != nil
return c.Spec.ConnectionPool != nil || c.Spec.EnableConnectionPool == true
}

View File

@ -6,7 +6,9 @@ import (
"time"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -142,17 +144,52 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit
result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit
// connection pool
// Connection pool. Looks like we can't use defaulting in CRD before 1.17,
// so ensure default values here.
result.ConnectionPool.NumberOfInstances = fromCRD.ConnectionPool.NumberOfInstances
result.ConnectionPool.Schema = fromCRD.ConnectionPool.Schema
result.ConnectionPool.User = fromCRD.ConnectionPool.User
result.ConnectionPool.Type = fromCRD.ConnectionPool.Type
result.ConnectionPool.Image = fromCRD.ConnectionPool.Image
result.ConnectionPool.Mode = fromCRD.ConnectionPool.Mode
result.ConnectionPool.ConnPoolDefaultCPURequest = fromCRD.ConnectionPool.DefaultCPURequest
result.ConnectionPool.ConnPoolDefaultMemoryRequest = fromCRD.ConnectionPool.DefaultMemoryRequest
result.ConnectionPool.ConnPoolDefaultCPULimit = fromCRD.ConnectionPool.DefaultCPULimit
result.ConnectionPool.ConnPoolDefaultMemoryLimit = fromCRD.ConnectionPool.DefaultMemoryLimit
if result.ConnectionPool.NumberOfInstances == nil ||
*result.ConnectionPool.NumberOfInstances < 1 {
var value int32
value = 1
result.ConnectionPool.NumberOfInstances = &value
}
result.ConnectionPool.Schema = util.Coalesce(
fromCRD.ConnectionPool.Schema,
constants.ConnectionPoolSchemaName)
result.ConnectionPool.User = util.Coalesce(
fromCRD.ConnectionPool.User,
constants.ConnectionPoolUserName)
result.ConnectionPool.Type = util.Coalesce(
fromCRD.ConnectionPool.Type,
constants.ConnectionPoolDefaultType)
result.ConnectionPool.Image = util.Coalesce(
fromCRD.ConnectionPool.Image,
"pgbouncer:0.0.1")
result.ConnectionPool.Mode = util.Coalesce(
fromCRD.ConnectionPool.Mode,
constants.ConnectionPoolDefaultMode)
result.ConnectionPool.ConnPoolDefaultCPURequest = util.Coalesce(
fromCRD.ConnectionPool.DefaultCPURequest,
constants.ConnectionPoolDefaultCpuRequest)
result.ConnectionPool.ConnPoolDefaultMemoryRequest = util.Coalesce(
fromCRD.ConnectionPool.DefaultMemoryRequest,
constants.ConnectionPoolDefaultMemoryRequest)
result.ConnectionPool.ConnPoolDefaultCPULimit = util.Coalesce(
fromCRD.ConnectionPool.DefaultCPULimit,
constants.ConnectionPoolDefaultCpuLimit)
result.ConnectionPool.ConnPoolDefaultMemoryLimit = util.Coalesce(
fromCRD.ConnectionPool.DefaultMemoryLimit,
constants.ConnectionPoolDefaultMemoryLimit)
return result
}

View File

@ -0,0 +1,13 @@
package constants
// Connection pool specific constants
const (
ConnectionPoolUserName = "pooler"
ConnectionPoolSchemaName = "pooler"
ConnectionPoolDefaultType = "pgbouncer"
ConnectionPoolDefaultMode = "transition"
ConnectionPoolDefaultCpuRequest = "100m"
ConnectionPoolDefaultCpuLimit = "100m"
ConnectionPoolDefaultMemoryRequest = "100M"
ConnectionPoolDefaultMemoryLimit = "100M"
)

View File

@ -16,6 +16,7 @@ import (
apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@ -28,6 +29,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func int32ToPointer(value int32) *int32 {
return &value
}
// KubernetesClient describes getters for Kubernetes objects
type KubernetesClient struct {
corev1.SecretsGetter
@ -62,16 +67,30 @@ type mockDeployment struct {
appsv1.DeploymentInterface
}
type mockDeploymentNotExist struct {
appsv1.DeploymentInterface
}
type MockDeploymentGetter struct {
}
type MockDeploymentNotExistGetter struct {
}
type mockService struct {
corev1.ServiceInterface
}
type mockServiceNotExist struct {
corev1.ServiceInterface
}
type MockServiceGetter struct {
}
type MockServiceNotExistGetter struct {
}
type mockConfigMap struct {
corev1.ConfigMapInterface
}
@ -245,6 +264,10 @@ func (mock *MockDeploymentGetter) Deployments(namespace string) appsv1.Deploymen
return &mockDeployment{}
}
func (mock *MockDeploymentNotExistGetter) Deployments(namespace string) appsv1.DeploymentInterface {
return &mockDeploymentNotExist{}
}
func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) {
return &apiappsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
@ -257,10 +280,49 @@ func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) erro
return nil
}
func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) {
return &apiappsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
},
}, nil
}
func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) {
return &apiappsv1.Deployment{
Spec: apiappsv1.DeploymentSpec{
Replicas: int32ToPointer(2),
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
},
}, nil
}
func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) {
return nil, &apierrors.StatusError{
ErrStatus: metav1.Status{
Reason: metav1.StatusReasonNotFound,
},
}
}
func (mock *mockDeploymentNotExist) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) {
return &apiappsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
},
}, nil
}
func (mock *MockServiceGetter) Services(namespace string) corev1.ServiceInterface {
return &mockService{}
}
func (mock *MockServiceNotExistGetter) Services(namespace string) corev1.ServiceInterface {
return &mockServiceNotExist{}
}
func (mock *mockService) Create(*v1.Service) (*v1.Service, error) {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
@ -273,6 +335,30 @@ func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error {
return nil
}
func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, error) {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
},
}, nil
}
func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
},
}, nil
}
func (mock *mockServiceNotExist) Get(name string, opts metav1.GetOptions) (*v1.Service, error) {
return nil, &apierrors.StatusError{
ErrStatus: metav1.Status{
Reason: metav1.StatusReasonNotFound,
},
}
}
// NewMockKubernetesClient for other tests
func NewMockKubernetesClient() KubernetesClient {
return KubernetesClient{
@ -282,3 +368,10 @@ func NewMockKubernetesClient() KubernetesClient {
ServicesGetter: &MockServiceGetter{},
}
}
func ClientMissingObjects() KubernetesClient {
return KubernetesClient{
DeploymentsGetter: &MockDeploymentNotExistGetter{},
ServicesGetter: &MockServiceNotExistGetter{},
}
}