diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index 7604e8d5a..82aba3767 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -470,6 +470,36 @@ spec: properties: s3_wal_path: type: string + streams: + type: array + nullable: true + items: + type: object + required: + - streamType + properties: + batchSize: + type: integer + database: + type: string + filter: + type: object + additionalProperties: + type: string + queueName: + type: string + sqsArn: + type: string + tables: + type: object + additionalProperties: + type: string + streamType: + type: string + enum: + - "nakadi" + - "sqs" + - "wal" teamId: type: string tls: diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 652a66fda..87a03392b 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -466,6 +466,36 @@ spec: properties: s3_wal_path: type: string + streams: + type: array + nullable: true + items: + type: object + required: + - streamType + properties: + batchSize: + type: integer + database: + type: string + filter: + type: object + additionalProperties: + type: string + queueName: + type: string + sqsArn: + type: string + tables: + type: object + additionalProperties: + type: string + streamType: + type: string + enum: + - "nakadi" + - "sqs" + - "wal" teamId: type: string tls: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index 105e14cd7..d0c4fa103 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -662,8 +662,11 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ Items: &apiextv1.JSONSchemaPropsOrArray{ Schema: &apiextv1.JSONSchemaProps{ Type: "object", - Required: []string{"type"}, + Required: []string{"streamType"}, Properties: map[string]apiextv1.JSONSchemaProps{ + "batchSize": { + Type: "integer", + }, "database": { Type: "string", }, @@ -671,10 +674,13 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ Type: "object", AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ Schema: &apiextv1.JSONSchemaProps{ - Type: "string", + Type: "string", }, }, }, + "queueName": { + Type: "string", + }, "sqsArn": { Type: "string", }, @@ -682,11 +688,11 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ Type: "object", AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ Schema: &apiextv1.JSONSchemaProps{ - Type: "string", + Type: "string", }, }, }, - "type": { + "streamType": { Type: "string", Enum: []apiextv1.JSON{ { diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 02eaf6ee6..a4a8f4477 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -227,12 +227,11 @@ type ConnectionPooler struct { } type Stream struct { - Type string `json:"type"` - Database string `json:"database,omitempty"` - Tables map[string]string `json:"tables,omitempty"` - Filter map[string]string `json:"filter,omitempty"` - BatchSize uint32 `json:"batchSize,omitempty"` - SqsArn string `json:"sqsArn,omitempty"` - QueueName string `json:"queueName,omitempty"` - User string `json:"user,omitempty"` + StreamType string `json:"streamType"` + Database string `json:"database,omitempty"` + Tables map[string]string `json:"tables,omitempty"` + Filter map[string]string `json:"filter,omitempty"` + BatchSize uint32 `json:"batchSize,omitempty"` + SqsArn string `json:"sqsArn,omitempty"` + QueueName string `json:"queueName,omitempty"` } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 3165534e8..977c3c4e5 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -362,7 +362,7 @@ func (c *Cluster) Create() error { c.createConnectionPooler(c.installLookupFunction) if len(c.Spec.Streams) > 0 { - c.createStreams() + c.syncStreams() } return nil @@ -1052,6 +1052,23 @@ func (c *Cluster) initSystemUsers() { c.systemUsers[constants.ConnectionPoolerUserKeyName] = connectionPoolerUser } } + + // replication users for event streams are another exception + // the operator will create one replication user for all streams + if len(c.Spec.Streams) > 0 { + username := constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix + streamUser := spec.PgUser{ + Origin: spec.RoleConnectionPooler, + Name: username, + Namespace: c.Namespace, + Flags: []string{constants.RoleFlagLogin, constants.RoleFlagReplication}, + Password: util.RandomPassword(constants.PasswordLength), + } + + if _, exists := c.pgUsers[username]; !exists { + c.pgUsers[username] = streamUser + } + } } func (c *Cluster) initPreparedDatabaseRoles() error { diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index a2adb1b15..8bc3129d8 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -8,6 +8,7 @@ import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" + "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" @@ -19,13 +20,8 @@ var outboxTableNameTemplate config.StringTemplate = "{table}_{eventtype}_outbox" func (c *Cluster) createStreams() error { c.setProcessName("creating streams") - err := c.syncLogicalDecoding() - if err != nil { - return fmt.Errorf("logical decoding setup incomplete: %v", err) - } - fes := c.generateFabricEventStream() - _, err = c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) + _, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("could not create event stream custom resource: %v", err) } @@ -33,27 +29,61 @@ func (c *Cluster) createStreams() error { return nil } -func (c *Cluster) syncLogicalDecoding() error { +func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStream) error { + c.setProcessName("updating event streams") - walLevel := c.Spec.PostgresqlParam.Parameters["wal_level"] - if walLevel == "" || walLevel != "logical" { - c.logger.Debugf("setting wal level to 'logical' in postgres configuration") - pods, err := c.listPods() - if err != nil || len(pods) == 0 { - return err - } - for _, pod := range pods { - if err := c.patroni.SetPostgresParameters(&pod, map[string]string{"wal_level": "logical"}); err == nil { - return fmt.Errorf("could not set wal_level to 'logical' calling Patroni REST API: %v", err) - } - } + _, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("could not update event stream custom resource: %v", err) } + return nil +} + +func (c *Cluster) syncPostgresConfig() error { + + desiredPostgresConfig := make(map[string]interface{}) + slots := make(map[string]map[string]string) + + c.logger.Debugf("setting wal level to 'logical' in postgres configuration") + desiredPostgresConfig["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: map[string]string{"wal_level": "logical"}} + for _, stream := range c.Spec.Streams { slotName := c.getLogicalReplicationSlot(stream.Database) if slotName == "" { - c.logger.Debugf("creating logical replication slot %d in database %d", constants.EventStreamSourceSlotPrefix+stream.Database, stream.Database) + c.logger.Debugf("creating logical replication slot %q in database %q", constants.EventStreamSourceSlotPrefix+stream.Database, stream.Database) + slot := map[string]string{ + "database": stream.Database, + "plugin": "wal2json", + "type": "logical", + } + slots[constants.EventStreamSourceSlotPrefix+stream.Database] = slot + } + } + + if len(slots) > 0 { + desiredPostgresConfig["slots"] = slots + } else { + return nil + } + + pods, err := c.listPods() + if err != nil || len(pods) == 0 { + return err + } + for i, pod := range pods { + podName := util.NameFromMeta(pods[i].ObjectMeta) + effectivePostgresConfig, err := c.patroni.GetConfig(&pod) + if err != nil { + c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) + continue + } + + _, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePostgresConfig, desiredPostgresConfig) + if err != nil { + c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) + continue } } @@ -64,18 +94,18 @@ func (c *Cluster) syncStreamDbResources() error { for _, stream := range c.Spec.Streams { if err := c.initDbConnWithName(stream.Database); err != nil { - return fmt.Errorf("could not init connection to database %s specified for event stream: %v", stream.Database, err) + return fmt.Errorf("could not init connection to database %q specified for event stream: %v", stream.Database, err) } for table, eventType := range stream.Tables { tableName, schemaName := getTableSchema(table) if exists, err := c.tableExists(tableName, schemaName); !exists { - return fmt.Errorf("could not find table %s specified for event stream: %v", table, err) + return fmt.Errorf("could not find table %q specified for event stream: %v", table, err) } // check if outbox table exists and if not, create it outboxTable := outboxTableNameTemplate.Format("table", tableName, "eventtype", eventType) if exists, err := c.tableExists(outboxTable, schemaName); !exists { - return fmt.Errorf("could not find outbox table %s specified for event stream: %v", outboxTable, err) + return fmt.Errorf("could not find outbox table %q specified for event stream: %v", outboxTable, err) } } } @@ -120,12 +150,12 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType st Schema: schema, EventStreamTable: getOutboxTable(table, eventType), Filter: streamFilter, - Connection: c.getStreamConnection(stream.Database, stream.User), + Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), } } func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { - switch stream.Type { + switch stream.StreamType { case "nakadi": return zalandov1alpha1.EventStreamFlow{ Type: constants.EventStreamFlowPgNakadiType, @@ -144,7 +174,7 @@ func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { } func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1alpha1.EventStreamSink { - switch stream.Type { + switch stream.StreamType { case "nakadi": return zalandov1alpha1.EventStreamSink{ Type: constants.EventStreamSinkNakadiType, @@ -204,6 +234,11 @@ func (c *Cluster) syncStreams() error { c.setProcessName("syncing streams") + err := c.syncPostgresConfig() + if err != nil { + return fmt.Errorf("logical decoding setup incomplete: %v", err) + } + effectiveStreams, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Get(context.TODO(), c.Name+constants.FESsuffix, metav1.GetOptions{}) if err != nil { if !k8sutil.ResourceNotFound(err) { @@ -216,7 +251,10 @@ func (c *Cluster) syncStreams() error { return fmt.Errorf("could not create missing streams: %v", err) } } else { - c.syncStreamDbResources() + err := c.syncStreamDbResources() + if err != nil { + return fmt.Warnf("database setup incomplete: %v", err) + } desiredStreams := c.generateFabricEventStream() if reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { c.updateStreams(desiredStreams) @@ -225,14 +263,3 @@ func (c *Cluster) syncStreams() error { return nil } - -func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStream) error { - c.setProcessName("updating event streams") - - _, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("could not update event stream custom resource: %v", err) - } - - return nil -} diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 06f46e8fc..8119168b0 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -23,6 +23,7 @@ func newFakeK8sStreamClient() (k8sutil.KubernetesClient, *fake.Clientset) { return k8sutil.KubernetesClient{ FabricEventStreamsGetter: zalandoClientSet.ZalandoV1alpha1(), + PodsGetter: clientSet.CoreV1(), }, clientSet } @@ -40,50 +41,32 @@ func TestGenerateFabricEventStream(t *testing.T) { Databases: map[string]string{ "foo": "foo_user", }, - Patroni: acidv1.Patroni{ - Slots: map[string]map[string]string{ - "fes": { - "type": "logical", - "database": "foo", - "plugin": "wal2json", - }, - }, - }, - PostgresqlParam: acidv1.PostgresqlParam{ - Parameters: map[string]string{ - "wal_level": "logical", - }, - }, Streams: []acidv1.Stream{ { - Type: "nakadi", - Database: "foo", + StreamType: "nakadi", + Database: "foo", Tables: map[string]string{ "bar": "stream_type_a", }, BatchSize: uint32(100), - User: "foo_user", }, { - Type: "wal", - Database: "foo", + StreamType: "wal", + Database: "foo", Tables: map[string]string{ "bar": "stream_type_a", }, BatchSize: uint32(100), - User: "zalando", }, { - Type: "sqs", - Database: "foo", - SqsArn: "arn:aws:sqs:eu-central-1:111122223333", - QueueName: "foo-queue", - User: "foo_user", + StreamType: "sqs", + Database: "foo", + SqsArn: "arn:aws:sqs:eu-central-1:111122223333", + QueueName: "foo-queue", }, }, Users: map[string]acidv1.UserFlags{ - "foo_user": {}, - "zalando": {}, + "foo_user": []string{"replication"}, }, Volume: acidv1.Volume{ Size: "1Gi", diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 4937a2034..e9cb1abd5 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -265,10 +265,11 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { func (c *Cluster) syncStatefulSet() error { var ( masterPod *v1.Pod - postgresConfig map[string]interface{} + effectivePostgresConfig map[string]interface{} instanceRestartRequired bool ) + desiredPostgresConfig := make(map[string]interface{}) podsToRecreate := make([]v1.Pod, 0) switchoverCandidates := make([]spec.NamespacedName, 0) @@ -394,14 +395,25 @@ func (c *Cluster) syncStatefulSet() error { // get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used. + desiredPostgresConfig["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: c.Spec.Parameters} + desiredPostgresConfig["loop_wait"] = c.Spec.Patroni.LoopWait + desiredPostgresConfig["maximum_lag_on_failover"] = c.Spec.Patroni.MaximumLagOnFailover + desiredPostgresConfig["pg_hba"] = c.Spec.Patroni.PgHba + desiredPostgresConfig["retry_timeout"] = c.Spec.Patroni.RetryTimeout + desiredPostgresConfig["slots"] = c.Spec.Patroni.Slots + desiredPostgresConfig["synchronous_mode"] = c.Spec.Patroni.SynchronousMode + desiredPostgresConfig["synchronous_mode_strict"] = c.Spec.Patroni.SynchronousModeStrict + desiredPostgresConfig["ttl"] = c.Spec.Patroni.TTL + for i, pod := range pods { podName := util.NameFromMeta(pods[i].ObjectMeta) - config, err := c.patroni.GetConfig(&pod) + effectivePostgresConfig, err := c.patroni.GetConfig(&pod) if err != nil { c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) continue } - instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, config) + + instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePostgresConfig, desiredPostgresConfig) if err != nil { c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) continue @@ -412,7 +424,7 @@ func (c *Cluster) syncStatefulSet() error { // if the config update requires a restart, call Patroni restart for replicas first, then master if instanceRestartRequired { c.logger.Debug("restarting Postgres server within pods") - ttl, ok := postgresConfig["ttl"].(int32) + ttl, ok := effectivePostgresConfig["ttl"].(int32) if !ok { ttl = 30 } @@ -493,62 +505,13 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters // (like max_connections) have changed and if necessary sets it via the Patroni API -func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig map[string]interface{}) (bool, error) { - configToSet := make(map[string]interface{}) - parametersToSet := make(map[string]string) - effectivePgParameters := make(map[string]interface{}) +func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectivePatroniConfig, desiredPatroniConfig map[string]interface{}) (bool, error) { - // read effective Patroni config if set - if patroniConfig != nil { - effectivePostgresql := patroniConfig["postgresql"].(map[string]interface{}) - effectivePgParameters = effectivePostgresql[patroniPGParametersParameterName].(map[string]interface{}) - } - - // compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest - desiredPgParameters := c.Spec.Parameters - for desiredOption, desiredValue := range desiredPgParameters { - effectiveValue := effectivePgParameters[desiredOption] - if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) { - parametersToSet[desiredOption] = desiredValue - } - } - - if len(parametersToSet) > 0 { - configToSet["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: parametersToSet} - } - - // compare other options from config with c.Spec.Patroni from manifest - desiredPatroniConfig := c.Spec.Patroni - if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != uint32(patroniConfig["loop_wait"].(float64)) { - configToSet["loop_wait"] = desiredPatroniConfig.LoopWait - } - if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != float32(patroniConfig["maximum_lag_on_failover"].(float64)) { - configToSet["maximum_lag_on_failover"] = desiredPatroniConfig.MaximumLagOnFailover - } - if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, (patroniConfig["pg_hba"])) { - configToSet["pg_hba"] = desiredPatroniConfig.PgHba - } - if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != uint32(patroniConfig["retry_timeout"].(float64)) { - configToSet["retry_timeout"] = desiredPatroniConfig.RetryTimeout - } - if desiredPatroniConfig.Slots != nil && !reflect.DeepEqual(desiredPatroniConfig.Slots, patroniConfig["slots"]) { - configToSet["slots"] = desiredPatroniConfig.Slots - } - if desiredPatroniConfig.SynchronousMode != patroniConfig["synchronous_mode"] { - configToSet["synchronous_mode"] = desiredPatroniConfig.SynchronousMode - } - if desiredPatroniConfig.SynchronousModeStrict != patroniConfig["synchronous_mode_strict"] { - configToSet["synchronous_mode_strict"] = desiredPatroniConfig.SynchronousModeStrict - } - if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != uint32(patroniConfig["ttl"].(float64)) { - configToSet["ttl"] = desiredPatroniConfig.TTL - } - - if len(configToSet) == 0 { + if reflect.DeepEqual(effectivePatroniConfig, desiredPatroniConfig) { return false, nil } - configToSetJson, err := json.Marshal(configToSet) + configToSetJson, err := json.Marshal(desiredPatroniConfig) if err != nil { c.logger.Debugf("could not convert config patch to JSON: %v", err) } @@ -558,8 +521,8 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC podName := util.NameFromMeta(pod.ObjectMeta) c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", podName, configToSetJson) - if err = c.patroni.SetConfig(pod, configToSet); err != nil { - return true, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err) + if err = c.patroni.SetConfig(pod, desiredPatroniConfig); err != nil { + return true, fmt.Errorf("could not patch postgres parameters within pod %s: %v", podName, err) } return true, nil diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go index 36eb376cf..7a2c9f81f 100644 --- a/pkg/util/constants/streams.go +++ b/pkg/util/constants/streams.go @@ -4,7 +4,7 @@ package constants const ( FESsuffix = "-event-streams" EventStreamSourcePGType = "PostgresLogicalReplication" - EventStreamSourceSlotPrefix = "fes" + EventStreamSourceSlotPrefix = "fes_" EventStreamSourceAuthType = "DatabaseAuthenticationSecret" EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent" EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent"