diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 8252227d1..28d0a970d 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -111,7 +111,7 @@ spec: # permanent_logical_1: # type: logical # database: foo -# plugin: wal2json +# plugin: pgoutput ttl: 30 loop_wait: &loop_wait 10 retry_timeout: 10 diff --git a/pkg/apis/acid.zalan.do/v1/util_test.go b/pkg/apis/acid.zalan.do/v1/util_test.go index 56ede36c4..bf6875a82 100644 --- a/pkg/apis/acid.zalan.do/v1/util_test.go +++ b/pkg/apis/acid.zalan.do/v1/util_test.go @@ -252,7 +252,7 @@ var unmarshalCluster = []struct { "permanent_logical_1" : { "type" : "logical", "database" : "foo", - "plugin" : "wal2json" + "plugin" : "pgoutput" } } }, @@ -298,7 +298,7 @@ var unmarshalCluster = []struct { LoopWait: 10, RetryTimeout: 10, MaximumLagOnFailover: 33554432, - Slots: map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "wal2json"}}, + Slots: map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "pgoutput"}}, }, Resources: Resources{ ResourceRequests: ResourceDescription{CPU: "10m", Memory: "50Mi"}, @@ -334,7 +334,7 @@ var unmarshalCluster = []struct { }, Error: "", }, - marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"9.6","parameters":{"log_statement":"all","max_connections":"10","shared_buffers":"32MB"}},"pod_priority_class_name":"spilo-pod-priority","volume":{"size":"5Gi","storageClass":"SSD", "subPath": "subdir"},"enableShmVolume":false,"patroni":{"initdb":{"data-checksums":"true","encoding":"UTF8","locale":"en_US.UTF-8"},"pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"],"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"slots":{"permanent_logical_1":{"database":"foo","plugin":"wal2json","type":"logical"}}},"resources":{"requests":{"cpu":"10m","memory":"50Mi"},"limits":{"cpu":"300m","memory":"3000Mi"}},"teamId":"acid","allowedSourceRanges":["127.0.0.1/32"],"numberOfInstances":2,"users":{"zalando":["superuser","createdb"]},"maintenanceWindows":["Mon:01:00-06:00","Sat:00:00-04:00","05:00-05:15"],"clone":{"cluster":"acid-batman"}},"status":{"PostgresClusterStatus":""}}`), + marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"9.6","parameters":{"log_statement":"all","max_connections":"10","shared_buffers":"32MB"}},"pod_priority_class_name":"spilo-pod-priority","volume":{"size":"5Gi","storageClass":"SSD", "subPath": "subdir"},"enableShmVolume":false,"patroni":{"initdb":{"data-checksums":"true","encoding":"UTF8","locale":"en_US.UTF-8"},"pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"],"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}},"resources":{"requests":{"cpu":"10m","memory":"50Mi"},"limits":{"cpu":"300m","memory":"3000Mi"}},"teamId":"acid","allowedSourceRanges":["127.0.0.1/32"],"numberOfInstances":2,"users":{"zalando":["superuser","createdb"]},"maintenanceWindows":["Mon:01:00-06:00","Sat:00:00-04:00","05:00-05:15"],"clone":{"cluster":"acid-batman"}},"status":{"PostgresClusterStatus":""}}`), err: nil}, { about: "example with teamId set in input", diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 18563b0a9..e7553754d 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -562,7 +562,7 @@ func (c *Cluster) executeCreatePublication(pubName, tableList string) error { // The caller is responsible for opening and closing the database connection. func (c *Cluster) executeAlterPublication(pubName, tableList string) error { return c.execCreateOrAlterPublication(pubName, tableList, alterPublicationSQL, - "changing table list of publication", "alter publication tables") + "changing publication", "alter publication tables") } func (c *Cluster) execCreateOrAlterPublication(pubName, tableList, statement, doing, operation string) error { diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index fede3fca8..c3c4a56bd 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -91,11 +91,11 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) { MaximumLagOnFailover: 33554432, SynchronousMode: true, SynchronousModeStrict: true, - Slots: map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "wal2json"}}, + Slots: map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "pgoutput"}}, }, role: "zalandos", opConfig: config.Config{}, - result: `{"postgresql":{"bin_dir":"/usr/lib/postgresql/11/bin","pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"]},"bootstrap":{"initdb":[{"auth-host":"md5"},{"auth-local":"trust"},"data-checksums",{"encoding":"UTF8"},{"locale":"en_US.UTF-8"}],"users":{"zalandos":{"password":"","options":["CREATEDB","NOLOGIN"]}},"dcs":{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"synchronous_mode":true,"synchronous_mode_strict":true,"slots":{"permanent_logical_1":{"database":"foo","plugin":"wal2json","type":"logical"}}}}}`, + result: `{"postgresql":{"bin_dir":"/usr/lib/postgresql/11/bin","pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"]},"bootstrap":{"initdb":[{"auth-host":"md5"},{"auth-local":"trust"},"data-checksums",{"encoding":"UTF8"},{"locale":"en_US.UTF-8"}],"users":{"zalandos":{"password":"","options":["CREATEDB","NOLOGIN"]}},"dcs":{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"synchronous_mode":true,"synchronous_mode_strict":true,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}}}}`, }, } for _, tt := range tests { diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 3bcaf6b3e..75675d490 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -217,7 +217,7 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent APIVersion: "zalando.org/v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: c.Name + "-" + appId, + Name: fmt.Sprintf("%s-%s", c.Name, appId), Namespace: c.Namespace, Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)), // make cluster StatefulSet the owner (like with connection pooler objects) @@ -279,7 +279,7 @@ func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable { } func getSlotName(dbName, appId string) string { - return constants.EventStreamSourceSlotPrefix + "_" + dbName + "_" + strings.Replace(appId, "-", "_", -1) + return fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)) } func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Connection { @@ -323,7 +323,7 @@ func (c *Cluster) createOrUpdateStreams() error { appIds := gatherApplicationIds(c.Spec.Streams) for _, appId := range appIds { - fesName := c.Name + "-" + appId + fesName := fmt.Sprintf("%s-%s", c.Name, appId) effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) if err != nil { if !k8sutil.ResourceNotFound(err) { diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 0068364c3..283b956e9 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -41,7 +41,7 @@ var ( appId string = "test-app" dbName string = "foo" fesUser string = constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix - fesName string = clusterName + "-" + appId + fesName string = fmt.Sprintf("%s-%s", clusterName, appId) slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)) pg = acidv1.Postgresql{ diff --git a/pkg/util/patroni/patroni_test.go b/pkg/util/patroni/patroni_test.go index 614f77828..60f289c6f 100644 --- a/pkg/util/patroni/patroni_test.go +++ b/pkg/util/patroni/patroni_test.go @@ -185,7 +185,7 @@ func TestGetConfig(t *testing.T) { Slots: map[string]map[string]string{ "cdc": { "database": "foo", - "plugin": "wal2json", + "plugin": "pgoutput", "type": "logical", }, }, @@ -218,7 +218,7 @@ func TestGetConfig(t *testing.T) { "wal_log_hints": "on", } - configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 100, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "hot_standby", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "wal2json", "type": "logical"}}, "ttl": 30}` + configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 100, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "hot_standby", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "pgoutput", "type": "logical"}}, "ttl": 30}` r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) response := http.Response{ @@ -253,7 +253,7 @@ func TestSetPostgresParameters(t *testing.T) { "wal_level": "logical", } - configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 50, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "logical", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "wal2json", "type": "logical"}}, "ttl": 30}` + configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 50, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "logical", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "pgoutput", "type": "logical"}}, "ttl": 30}` r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) response := http.Response{