Add possibility to set max db connections
Since it's an important part of a connection pool configuration, allow to configure max db connections, that pool will open to a target db. From this numbers several others (like default pool size, min pool size, reserve) will be deduced, taking into account desired number of instances.
This commit is contained in:
		
							parent
							
								
									037d7120ef
								
							
						
					
					
						commit
						918df1461b
					
				|  | @ -161,6 +161,7 @@ type ConnectionPoolConfiguration struct { | |||
| 	User                 string `json:"connection_pool_user,omitempty"` | ||||
| 	Image                string `json:"connection_pool_image,omitempty"` | ||||
| 	Mode                 string `json:"connection_pool_mode,omitempty"` | ||||
| 	MaxDBConnections     *int32 `json:"connection_pool_max_db_connections,omitempty"` | ||||
| 	DefaultCPURequest    string `json:"connection_pool_default_cpu_request,omitempty"` | ||||
| 	DefaultMemoryRequest string `json:"connection_pool_default_memory_request,omitempty"` | ||||
| 	DefaultCPULimit      string `json:"connection_pool_default_cpu_limit,omitempty"` | ||||
|  |  | |||
|  | @ -176,6 +176,7 @@ type ConnectionPool struct { | |||
| 	User              string `json:"user,omitempty"` | ||||
| 	Mode              string `json:"mode,omitempty"` | ||||
| 	DockerImage       string `json:"dockerImage,omitempty"` | ||||
| 	MaxDBConnections  *int32 `json:"maxDBConnections,omitempty"` | ||||
| 
 | ||||
| 	Resources `json:"resources,omitempty"` | ||||
| } | ||||
|  |  | |||
|  | @ -20,6 +20,7 @@ import ( | |||
| 	"github.com/zalando/postgres-operator/pkg/util" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/config" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/constants" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/k8sutil" | ||||
| 	batchv1 "k8s.io/api/batch/v1" | ||||
| 	batchv1beta1 "k8s.io/api/batch/v1beta1" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
|  | @ -1731,6 +1732,82 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) { | |||
| 	return "logical-backup-" + c.clusterName().Name | ||||
| } | ||||
| 
 | ||||
| // Generate pool size related environment variables.
 | ||||
| //
 | ||||
| // MAX_DB_CONN would specify the global maximum for connections to a target
 | ||||
| // 	database.
 | ||||
| //
 | ||||
| // MAX_CLIENT_CONN is not configurable at the moment, just set it high enough.
 | ||||
| //
 | ||||
| // DEFAULT_SIZE is a pool size per db/user (having in mind the use case when
 | ||||
| // 	most of the queries coming through a connection pooler are from the same
 | ||||
| // 	user to the same db). In case if we want to spin up more connection pool
 | ||||
| // 	instances, take this into account and maintain the same number of
 | ||||
| // 	connections.
 | ||||
| //
 | ||||
| // MIN_SIZE is a pool minimal size, to prevent situation when sudden workload
 | ||||
| // 	have to wait for spinning up a new connections.
 | ||||
| //
 | ||||
| // RESERVE_SIZE is how many additional connections to allow for a pool.
 | ||||
| func (c *Cluster) getConnPoolEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar { | ||||
| 	effectiveMode := util.Coalesce( | ||||
| 		spec.ConnectionPool.Mode, | ||||
| 		c.OpConfig.ConnectionPool.Mode) | ||||
| 
 | ||||
| 	numberOfInstances := spec.ConnectionPool.NumberOfInstances | ||||
| 	if numberOfInstances == nil { | ||||
| 		numberOfInstances = util.CoalesceInt32( | ||||
| 			c.OpConfig.ConnectionPool.NumberOfInstances, | ||||
| 			k8sutil.Int32ToPointer(1)) | ||||
| 	} | ||||
| 
 | ||||
| 	effectiveMaxDBConn := util.CoalesceInt32( | ||||
| 		spec.ConnectionPool.MaxDBConnections, | ||||
| 		c.OpConfig.ConnectionPool.MaxDBConnections) | ||||
| 
 | ||||
| 	if effectiveMaxDBConn == nil { | ||||
| 		effectiveMaxDBConn = k8sutil.Int32ToPointer( | ||||
| 			constants.ConnPoolMaxDBConnections) | ||||
| 	} | ||||
| 
 | ||||
| 	maxDBConn := *effectiveMaxDBConn / *numberOfInstances | ||||
| 
 | ||||
| 	defaultSize := maxDBConn / 2 | ||||
| 	minSize := defaultSize / 2 | ||||
| 	reserveSize := minSize | ||||
| 
 | ||||
| 	return []v1.EnvVar{ | ||||
| 		{ | ||||
| 			Name:  "CONNECTION_POOL_PORT", | ||||
| 			Value: fmt.Sprint(pgPort), | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:  "CONNECTION_POOL_MODE", | ||||
| 			Value: effectiveMode, | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:  "CONNECTION_POOL_DEFAULT_SIZE", | ||||
| 			Value: fmt.Sprint(defaultSize), | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:  "CONNECTION_POOL_MIN_SIZE", | ||||
| 			Value: fmt.Sprint(minSize), | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:  "CONNECTION_POOL_RESERVE_SIZE", | ||||
| 			Value: fmt.Sprint(reserveSize), | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:  "CONNECTION_POOL_MAX_CLIENT_CONN", | ||||
| 			Value: fmt.Sprint(constants.ConnPoolMaxClientConnections), | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:  "CONNECTION_POOL_MAX_DB_CONN", | ||||
| 			Value: fmt.Sprint(effectiveMaxDBConn), | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( | ||||
| 	*v1.PodTemplateSpec, error) { | ||||
| 
 | ||||
|  | @ -1739,10 +1816,6 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( | |||
| 		spec.ConnectionPool.Resources, | ||||
| 		c.makeDefaultConnPoolResources()) | ||||
| 
 | ||||
| 	effectiveMode := util.Coalesce( | ||||
| 		spec.ConnectionPool.Mode, | ||||
| 		c.OpConfig.ConnectionPool.Mode) | ||||
| 
 | ||||
| 	effectiveDockerImage := util.Coalesce( | ||||
| 		spec.ConnectionPool.DockerImage, | ||||
| 		c.OpConfig.ConnectionPool.Image) | ||||
|  | @ -1789,16 +1862,10 @@ func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( | |||
| 				SecretKeyRef: secretSelector("password"), | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:  "CONNECTION_POOL_MODE", | ||||
| 			Value: effectiveMode, | ||||
| 		}, | ||||
| 		{ | ||||
| 			Name:  "CONNECTION_POOL_PORT", | ||||
| 			Value: fmt.Sprint(pgPort), | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	envVars = append(envVars, c.getConnPoolEnvVars(spec)...) | ||||
| 
 | ||||
| 	poolerContainer := v1.Container{ | ||||
| 		Name:            connectionPoolContainer, | ||||
| 		Image:           effectiveDockerImage, | ||||
|  | @ -1873,7 +1940,9 @@ func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( | |||
| 	podTemplate, err := c.generateConnPoolPodTemplate(spec) | ||||
| 	numberOfInstances := spec.ConnectionPool.NumberOfInstances | ||||
| 	if numberOfInstances == nil { | ||||
| 		numberOfInstances = c.OpConfig.ConnectionPool.NumberOfInstances | ||||
| 		numberOfInstances = util.CoalesceInt32( | ||||
| 			c.OpConfig.ConnectionPool.NumberOfInstances, | ||||
| 			k8sutil.Int32ToPointer(1)) | ||||
| 	} | ||||
| 
 | ||||
| 	if err != nil { | ||||
|  |  | |||
|  | @ -539,6 +539,7 @@ func TestConnPoolPodSpec(t *testing.T) { | |||
| 					ReplicationUsername: replicationUserName, | ||||
| 				}, | ||||
| 				ConnectionPool: config.ConnectionPool{ | ||||
| 					MaxDBConnections:             int32ToPointer(60), | ||||
| 					ConnPoolDefaultCPURequest:    "100m", | ||||
| 					ConnPoolDefaultCPULimit:      "100m", | ||||
| 					ConnPoolDefaultMemoryRequest: "100Mi", | ||||
|  |  | |||
|  | @ -22,6 +22,10 @@ func (c *Controller) readOperatorConfigurationFromCRD(configObjectNamespace, con | |||
| 	return config, nil | ||||
| } | ||||
| 
 | ||||
| func int32ToPointer(value int32) *int32 { | ||||
| 	return &value | ||||
| } | ||||
| 
 | ||||
| // importConfigurationFromCRD is a transitional function that converts CRD configuration to the one based on the configmap
 | ||||
| func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigurationData) *config.Config { | ||||
| 	result := &config.Config{} | ||||
|  | @ -146,14 +150,13 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur | |||
| 
 | ||||
| 	// Connection pool. Looks like we can't use defaulting in CRD before 1.17,
 | ||||
| 	// so ensure default values here.
 | ||||
| 	result.ConnectionPool.NumberOfInstances = fromCRD.ConnectionPool.NumberOfInstances | ||||
| 	if result.ConnectionPool.NumberOfInstances == nil || | ||||
| 		*result.ConnectionPool.NumberOfInstances < 1 { | ||||
| 		var value int32 | ||||
| 	result.ConnectionPool.NumberOfInstances = util.CoalesceInt32( | ||||
| 		fromCRD.ConnectionPool.NumberOfInstances, | ||||
| 		int32ToPointer(1)) | ||||
| 
 | ||||
| 		value = 1 | ||||
| 		result.ConnectionPool.NumberOfInstances = &value | ||||
| 	} | ||||
| 	result.ConnectionPool.NumberOfInstances = util.MaxInt32( | ||||
| 		result.ConnectionPool.NumberOfInstances, | ||||
| 		int32ToPointer(1)) | ||||
| 
 | ||||
| 	result.ConnectionPool.Schema = util.Coalesce( | ||||
| 		fromCRD.ConnectionPool.Schema, | ||||
|  | @ -187,5 +190,9 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur | |||
| 		fromCRD.ConnectionPool.DefaultMemoryLimit, | ||||
| 		constants.ConnectionPoolDefaultMemoryLimit) | ||||
| 
 | ||||
| 	result.ConnectionPool.MaxDBConnections = util.CoalesceInt32( | ||||
| 		fromCRD.ConnectionPool.MaxDBConnections, | ||||
| 		int32ToPointer(constants.ConnPoolMaxDBConnections)) | ||||
| 
 | ||||
| 	return result | ||||
| } | ||||
|  |  | |||
|  | @ -94,6 +94,7 @@ type ConnectionPool struct { | |||
| 	ConnPoolDefaultMemoryRequest string `name:"connection_pool_default_memory_request" default:"100Mi"` | ||||
| 	ConnPoolDefaultCPULimit      string `name:"connection_pool_default_cpu_limit" default:"3"` | ||||
| 	ConnPoolDefaultMemoryLimit   string `name:"connection_pool_default_memory_limit" default:"1Gi"` | ||||
| 	MaxDBConnections             *int32 `name:"connection_pool_max_db_connections" default:"60"` | ||||
| } | ||||
| 
 | ||||
| // Config describes operator config
 | ||||
|  |  | |||
|  | @ -11,5 +11,7 @@ const ( | |||
| 	ConnectionPoolDefaultMemoryRequest = "100Mi" | ||||
| 	ConnectionPoolDefaultMemoryLimit   = "100Mi" | ||||
| 
 | ||||
| 	ConnPoolContainer = 0 | ||||
| 	ConnPoolContainer            = 0 | ||||
| 	ConnPoolMaxDBConnections     = 60 | ||||
| 	ConnPoolMaxClientConnections = 10000 | ||||
| ) | ||||
|  |  | |||
|  | @ -28,7 +28,7 @@ import ( | |||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| ) | ||||
| 
 | ||||
| func int32ToPointer(value int32) *int32 { | ||||
| func Int32ToPointer(value int32) *int32 { | ||||
| 	return &value | ||||
| } | ||||
| 
 | ||||
|  | @ -301,7 +301,7 @@ func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1 | |||
| 			Name: "test-deployment", | ||||
| 		}, | ||||
| 		Spec: apiappsv1.DeploymentSpec{ | ||||
| 			Replicas: int32ToPointer(1), | ||||
| 			Replicas: Int32ToPointer(1), | ||||
| 			Template: v1.PodTemplateSpec{ | ||||
| 				Spec: v1.PodSpec{ | ||||
| 					Containers: []v1.Container{ | ||||
|  | @ -318,7 +318,7 @@ func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1 | |||
| func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) { | ||||
| 	return &apiappsv1.Deployment{ | ||||
| 		Spec: apiappsv1.DeploymentSpec{ | ||||
| 			Replicas: int32ToPointer(2), | ||||
| 			Replicas: Int32ToPointer(2), | ||||
| 		}, | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name: "test-deployment", | ||||
|  |  | |||
|  | @ -141,6 +141,22 @@ func Coalesce(val, defaultVal string) string { | |||
| 	return val | ||||
| } | ||||
| 
 | ||||
| // Yeah, golang
 | ||||
| func CoalesceInt32(val, defaultVal *int32) *int32 { | ||||
| 	if val == nil { | ||||
| 		return defaultVal | ||||
| 	} | ||||
| 	return val | ||||
| } | ||||
| 
 | ||||
| func MaxInt32(a, b *int32) *int32 { | ||||
| 	if *a > *b { | ||||
| 		return a | ||||
| 	} | ||||
| 
 | ||||
| 	return b | ||||
| } | ||||
| 
 | ||||
| // IsSmallerQuantity : checks if first resource is of a smaller quantity than the second
 | ||||
| func IsSmallerQuantity(requestStr, limitStr string) (bool, error) { | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue