diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index 0c4ff42b8..4b2a46620 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -161,6 +161,7 @@ type ConnectionPoolConfiguration struct { User string `json:"connection_pool_user,omitempty"` Image string `json:"connection_pool_image,omitempty"` Mode string `json:"connection_pool_mode,omitempty"` + MaxDBConnections *int32 `json:"connection_pool_max_db_connections,omitempty"` DefaultCPURequest string `json:"connection_pool_default_cpu_request,omitempty"` DefaultMemoryRequest string `json:"connection_pool_default_memory_request,omitempty"` DefaultCPULimit string `json:"connection_pool_default_cpu_limit,omitempty"` diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 315154da2..771924be8 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -176,6 +176,7 @@ type ConnectionPool struct { User string `json:"user,omitempty"` Mode string `json:"mode,omitempty"` DockerImage string `json:"dockerImage,omitempty"` + MaxDBConnections *int32 `json:"maxDBConnections,omitempty"` Resources `json:"resources,omitempty"` } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 53c608e1d..67417f277 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -20,6 +20,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" "k8s.io/apimachinery/pkg/labels" @@ -1731,6 +1732,82 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) { return "logical-backup-" + c.clusterName().Name } +// Generate pool size related environment variables. +// +// MAX_DB_CONN would specify the global maximum for connections to a target +// database. +// +// MAX_CLIENT_CONN is not configurable at the moment, just set it high enough. +// +// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when +// most of the queries coming through a connection pooler are from the same +// user to the same db). In case if we want to spin up more connection pool +// instances, take this into account and maintain the same number of +// connections. +// +// MIN_SIZE is a pool minimal size, to prevent situation when sudden workload +// have to wait for spinning up a new connections. +// +// RESERVE_SIZE is how many additional connections to allow for a pool. +func (c *Cluster) getConnPoolEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar { + effectiveMode := util.Coalesce( + spec.ConnectionPool.Mode, + c.OpConfig.ConnectionPool.Mode) + + numberOfInstances := spec.ConnectionPool.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPool.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + effectiveMaxDBConn := util.CoalesceInt32( + spec.ConnectionPool.MaxDBConnections, + c.OpConfig.ConnectionPool.MaxDBConnections) + + if effectiveMaxDBConn == nil { + effectiveMaxDBConn = k8sutil.Int32ToPointer( + constants.ConnPoolMaxDBConnections) + } + + maxDBConn := *effectiveMaxDBConn / *numberOfInstances + + defaultSize := maxDBConn / 2 + minSize := defaultSize / 2 + reserveSize := minSize + + return []v1.EnvVar{ + { + Name: "CONNECTION_POOL_PORT", + Value: fmt.Sprint(pgPort), + }, + { + Name: "CONNECTION_POOL_MODE", + Value: effectiveMode, + }, + { + Name: "CONNECTION_POOL_DEFAULT_SIZE", + Value: fmt.Sprint(defaultSize), + }, + { + Name: "CONNECTION_POOL_MIN_SIZE", + Value: fmt.Sprint(minSize), + }, + { + Name: "CONNECTION_POOL_RESERVE_SIZE", + Value: fmt.Sprint(reserveSize), + }, + { + Name: "CONNECTION_POOL_MAX_CLIENT_CONN", + Value: fmt.Sprint(constants.ConnPoolMaxClientConnections), + }, + { + Name: "CONNECTION_POOL_MAX_DB_CONN", + Value: fmt.Sprint(effectiveMaxDBConn), + }, + } +} + func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( *v1.PodTemplateSpec, error) { @@ -1739,10 +1816,6 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( spec.ConnectionPool.Resources, c.makeDefaultConnPoolResources()) - effectiveMode := util.Coalesce( - spec.ConnectionPool.Mode, - c.OpConfig.ConnectionPool.Mode) - effectiveDockerImage := util.Coalesce( spec.ConnectionPool.DockerImage, c.OpConfig.ConnectionPool.Image) @@ -1789,16 +1862,10 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( SecretKeyRef: secretSelector("password"), }, }, - { - Name: "CONNECTION_POOL_MODE", - Value: effectiveMode, - }, - { - Name: "CONNECTION_POOL_PORT", - Value: fmt.Sprint(pgPort), - }, } + envVars = append(envVars, c.getConnPoolEnvVars(spec)...) + poolerContainer := v1.Container{ Name: connectionPoolContainer, Image: effectiveDockerImage, @@ -1873,7 +1940,9 @@ func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( podTemplate, err := c.generateConnPoolPodTemplate(spec) numberOfInstances := spec.ConnectionPool.NumberOfInstances if numberOfInstances == nil { - numberOfInstances = c.OpConfig.ConnectionPool.NumberOfInstances + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPool.NumberOfInstances, + k8sutil.Int32ToPointer(1)) } if err != nil { diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 4b0a31628..43b2eac8e 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -539,6 +539,7 @@ func TestConnPoolPodSpec(t *testing.T) { ReplicationUsername: replicationUserName, }, ConnectionPool: config.ConnectionPool{ + MaxDBConnections: int32ToPointer(60), ConnPoolDefaultCPURequest: "100m", ConnPoolDefaultCPULimit: "100m", ConnPoolDefaultMemoryRequest: "100Mi", diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index ecec0307f..e775b3336 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -22,6 +22,10 @@ func (c *Controller) readOperatorConfigurationFromCRD(configObjectNamespace, con return config, nil } +func int32ToPointer(value int32) *int32 { + return &value +} + // importConfigurationFromCRD is a transitional function that converts CRD configuration to the one based on the configmap func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigurationData) *config.Config { result := &config.Config{} @@ -146,14 +150,13 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur // Connection pool. Looks like we can't use defaulting in CRD before 1.17, // so ensure default values here. - result.ConnectionPool.NumberOfInstances = fromCRD.ConnectionPool.NumberOfInstances - if result.ConnectionPool.NumberOfInstances == nil || - *result.ConnectionPool.NumberOfInstances < 1 { - var value int32 + result.ConnectionPool.NumberOfInstances = util.CoalesceInt32( + fromCRD.ConnectionPool.NumberOfInstances, + int32ToPointer(1)) - value = 1 - result.ConnectionPool.NumberOfInstances = &value - } + result.ConnectionPool.NumberOfInstances = util.MaxInt32( + result.ConnectionPool.NumberOfInstances, + int32ToPointer(1)) result.ConnectionPool.Schema = util.Coalesce( fromCRD.ConnectionPool.Schema, @@ -187,5 +190,9 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur fromCRD.ConnectionPool.DefaultMemoryLimit, constants.ConnectionPoolDefaultMemoryLimit) + result.ConnectionPool.MaxDBConnections = util.CoalesceInt32( + fromCRD.ConnectionPool.MaxDBConnections, + int32ToPointer(constants.ConnPoolMaxDBConnections)) + return result } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 56455de9a..dae21a4ee 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -94,6 +94,7 @@ type ConnectionPool struct { ConnPoolDefaultMemoryRequest string `name:"connection_pool_default_memory_request" default:"100Mi"` ConnPoolDefaultCPULimit string `name:"connection_pool_default_cpu_limit" default:"3"` ConnPoolDefaultMemoryLimit string `name:"connection_pool_default_memory_limit" default:"1Gi"` + MaxDBConnections *int32 `name:"connection_pool_max_db_connections" default:"60"` } // Config describes operator config diff --git a/pkg/util/constants/pooler.go b/pkg/util/constants/pooler.go index aa8171110..0426dbfe8 100644 --- a/pkg/util/constants/pooler.go +++ b/pkg/util/constants/pooler.go @@ -11,5 +11,7 @@ const ( ConnectionPoolDefaultMemoryRequest = "100Mi" ConnectionPoolDefaultMemoryLimit = "100Mi" - ConnPoolContainer = 0 + ConnPoolContainer = 0 + ConnPoolMaxDBConnections = 60 + ConnPoolMaxClientConnections = 10000 ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 6e0798312..80d5df0d9 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -28,7 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func int32ToPointer(value int32) *int32 { +func Int32ToPointer(value int32) *int32 { return &value } @@ -301,7 +301,7 @@ func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1 Name: "test-deployment", }, Spec: apiappsv1.DeploymentSpec{ - Replicas: int32ToPointer(1), + Replicas: Int32ToPointer(1), Template: v1.PodTemplateSpec{ Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -318,7 +318,7 @@ func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1 func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) { return &apiappsv1.Deployment{ Spec: apiappsv1.DeploymentSpec{ - Replicas: int32ToPointer(2), + Replicas: Int32ToPointer(2), }, ObjectMeta: metav1.ObjectMeta{ Name: "test-deployment", diff --git a/pkg/util/util.go b/pkg/util/util.go index ad6de14a2..abb10d42e 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -141,6 +141,22 @@ func Coalesce(val, defaultVal string) string { return val } +// Yeah, golang +func CoalesceInt32(val, defaultVal *int32) *int32 { + if val == nil { + return defaultVal + } + return val +} + +func MaxInt32(a, b *int32) *int32 { + if *a > *b { + return a + } + + return b +} + // IsSmallerQuantity : checks if first resource is of a smaller quantity than the second func IsSmallerQuantity(requestStr, limitStr string) (bool, error) {