fix Postgres config sync

This commit is contained in:
Felix Kunde 2021-08-10 17:29:59 +02:00
parent 14e989ac83
commit 4862b05c69
3 changed files with 58 additions and 33 deletions

View File

@ -470,7 +470,7 @@ spec:
properties:
s3_wal_path:
type: string
streams:
streams:
type: array
nullable: true
items:
@ -479,18 +479,18 @@ spec:
- streamType
properties:
batchSize:
type: integer
database:
type: string
filter:
type: integer
database:
type: string
filter:
type: object
additionalProperties:
type: string
queueName:
type: string
sqsArn:
type: string
tables:
queueName:
type: string
sqsArn:
type: string
tables:
type: object
additionalProperties:
type: string

View File

@ -466,7 +466,7 @@ spec:
properties:
s3_wal_path:
type: string
streams:
streams:
type: array
nullable: true
items:
@ -475,18 +475,18 @@ spec:
- streamType
properties:
batchSize:
type: integer
database:
type: string
filter:
type: integer
database:
type: string
filter:
type: object
additionalProperties:
type: string
queueName:
type: string
sqsArn:
type: string
tables:
queueName:
type: string
sqsArn:
type: string
tables:
type: object
additionalProperties:
type: string

View File

@ -395,15 +395,33 @@ func (c *Cluster) syncStatefulSet() error {
// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
desiredPostgresConfig["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: c.Spec.Parameters}
desiredPostgresConfig["loop_wait"] = c.Spec.Patroni.LoopWait
desiredPostgresConfig["maximum_lag_on_failover"] = c.Spec.Patroni.MaximumLagOnFailover
desiredPostgresConfig["pg_hba"] = c.Spec.Patroni.PgHba
desiredPostgresConfig["retry_timeout"] = c.Spec.Patroni.RetryTimeout
desiredPostgresConfig["slots"] = c.Spec.Patroni.Slots
desiredPostgresConfig["synchronous_mode"] = c.Spec.Patroni.SynchronousMode
desiredPostgresConfig["synchronous_mode_strict"] = c.Spec.Patroni.SynchronousModeStrict
desiredPostgresConfig["ttl"] = c.Spec.Patroni.TTL
if len(c.Spec.Parameters) > 0 {
desiredPostgresConfig["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: c.Spec.Parameters}
}
if c.Spec.Patroni.LoopWait > 0 {
desiredPostgresConfig["loop_wait"] = c.Spec.Patroni.LoopWait
}
if c.Spec.Patroni.MaximumLagOnFailover > 0 {
desiredPostgresConfig["maximum_lag_on_failover"] = c.Spec.Patroni.MaximumLagOnFailover
}
if c.Spec.Patroni.PgHba != nil {
desiredPostgresConfig["pg_hba"] = c.Spec.Patroni.PgHba
}
if c.Spec.Patroni.RetryTimeout > 0 {
desiredPostgresConfig["retry_timeout"] = c.Spec.Patroni.RetryTimeout
}
if c.Spec.Patroni.Slots != nil {
desiredPostgresConfig["slots"] = c.Spec.Patroni.Slots
}
if c.Spec.Patroni.TTL > 0 {
desiredPostgresConfig["ttl"] = c.Spec.Patroni.TTL
}
if effectivePostgresConfig["synchronous_mode"] != desiredPostgresConfig["synchronous_mode"] {
desiredPostgresConfig["synchronous_mode"] = c.Spec.Patroni.SynchronousMode
}
if effectivePostgresConfig["synchronous_mode_strict"] != desiredPostgresConfig["synchronous_mode_strict"] {
desiredPostgresConfig["synchronous_mode_strict"] = c.Spec.Patroni.SynchronousModeStrict
}
for i, pod := range pods {
podName := util.NameFromMeta(pods[i].ObjectMeta)
@ -505,13 +523,20 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri
// checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
// (like max_connections) have changed and if necessary sets it via the Patroni API
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectivePatroniConfig, desiredPatroniConfig map[string]interface{}) (bool, error) {
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectivePostgresConfig, desiredPostgresConfig map[string]interface{}) (bool, error) {
if reflect.DeepEqual(effectivePatroniConfig, desiredPatroniConfig) {
configUpdateRequired := false
for parameter, value := range desiredPostgresConfig {
if !reflect.DeepEqual(effectivePostgresConfig[parameter], value) {
configUpdateRequired = true
break
}
}
if !configUpdateRequired {
return false, nil
}
configToSetJson, err := json.Marshal(desiredPatroniConfig)
configToSetJson, err := json.Marshal(desiredPostgresConfig)
if err != nil {
c.logger.Debugf("could not convert config patch to JSON: %v", err)
}
@ -521,7 +546,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
podName := util.NameFromMeta(pod.ObjectMeta)
c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s",
podName, configToSetJson)
if err = c.patroni.SetConfig(pod, desiredPatroniConfig); err != nil {
if err = c.patroni.SetConfig(pod, desiredPostgresConfig); err != nil {
return true, fmt.Errorf("could not patch postgres parameters within pod %s: %v", podName, err)
}