fix sync of publications
This commit is contained in:
		
							parent
							
								
									2630d8eff3
								
							
						
					
					
						commit
						3f0ed26828
					
				|  | @ -111,7 +111,7 @@ spec: | |||
| #      permanent_logical_1: | ||||
| #        type: logical | ||||
| #        database: foo | ||||
| #        plugin: pgoutput | ||||
| #        plugin: wal2json | ||||
|     ttl: 30 | ||||
|     loop_wait: &loop_wait 10 | ||||
|     retry_timeout: 10 | ||||
|  |  | |||
|  | @ -252,7 +252,7 @@ var unmarshalCluster = []struct { | |||
| 				  "permanent_logical_1" : { | ||||
| 					  "type"     : "logical", | ||||
| 					  "database" : "foo", | ||||
| 					  "plugin"   : "pgoutput" | ||||
| 					  "plugin"   : "wal2json" | ||||
| 			       } | ||||
| 			  } | ||||
| 	  	}, | ||||
|  | @ -298,7 +298,7 @@ var unmarshalCluster = []struct { | |||
| 					LoopWait:             10, | ||||
| 					RetryTimeout:         10, | ||||
| 					MaximumLagOnFailover: 33554432, | ||||
| 					Slots:                map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "pgoutput"}}, | ||||
| 					Slots:                map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "wal2json"}}, | ||||
| 				}, | ||||
| 				Resources: Resources{ | ||||
| 					ResourceRequests: ResourceDescription{CPU: "10m", Memory: "50Mi"}, | ||||
|  | @ -334,7 +334,7 @@ var unmarshalCluster = []struct { | |||
| 			}, | ||||
| 			Error: "", | ||||
| 		}, | ||||
| 		marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"9.6","parameters":{"log_statement":"all","max_connections":"10","shared_buffers":"32MB"}},"pod_priority_class_name":"spilo-pod-priority","volume":{"size":"5Gi","storageClass":"SSD", "subPath": "subdir"},"enableShmVolume":false,"patroni":{"initdb":{"data-checksums":"true","encoding":"UTF8","locale":"en_US.UTF-8"},"pg_hba":["hostssl all all 0.0.0.0/0 md5","host    all all 0.0.0.0/0 md5"],"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}},"resources":{"requests":{"cpu":"10m","memory":"50Mi"},"limits":{"cpu":"300m","memory":"3000Mi"}},"teamId":"acid","allowedSourceRanges":["127.0.0.1/32"],"numberOfInstances":2,"users":{"zalando":["superuser","createdb"]},"maintenanceWindows":["Mon:01:00-06:00","Sat:00:00-04:00","05:00-05:15"],"clone":{"cluster":"acid-batman"}},"status":{"PostgresClusterStatus":""}}`), | ||||
| 		marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"9.6","parameters":{"log_statement":"all","max_connections":"10","shared_buffers":"32MB"}},"pod_priority_class_name":"spilo-pod-priority","volume":{"size":"5Gi","storageClass":"SSD", "subPath": "subdir"},"enableShmVolume":false,"patroni":{"initdb":{"data-checksums":"true","encoding":"UTF8","locale":"en_US.UTF-8"},"pg_hba":["hostssl all all 0.0.0.0/0 md5","host    all all 0.0.0.0/0 md5"],"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"slots":{"permanent_logical_1":{"database":"foo","plugin":"wal2json","type":"logical"}}},"resources":{"requests":{"cpu":"10m","memory":"50Mi"},"limits":{"cpu":"300m","memory":"3000Mi"}},"teamId":"acid","allowedSourceRanges":["127.0.0.1/32"],"numberOfInstances":2,"users":{"zalando":["superuser","createdb"]},"maintenanceWindows":["Mon:01:00-06:00","Sat:00:00-04:00","05:00-05:15"],"clone":{"cluster":"acid-batman"}},"status":{"PostgresClusterStatus":""}}`), | ||||
| 		err:     nil}, | ||||
| 	{ | ||||
| 		about: "example with teamId set in input", | ||||
|  |  | |||
|  | @ -70,7 +70,8 @@ type EventStreamTable struct { | |||
| type Connection struct { | ||||
| 	Url             string `json:"jdbcUrl"` | ||||
| 	SlotName        string `json:"slotName"` | ||||
| 	PublicationName string `json:"publicationName"` | ||||
| 	PluginType      string `json:"pluginType,omitempty" defaults:"wal2json"` | ||||
| 	PublicationName string `json:"publicationName,omitempty"` | ||||
| 	DBAuth          DBAuth `json:"databaseAuthentication"` | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -517,8 +517,7 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi | |||
| // The caller is responsible for opening and closing the database connection
 | ||||
| func (c *Cluster) getPublications() (publications map[string]string, err error) { | ||||
| 	var ( | ||||
| 		rows           *sql.Rows | ||||
| 		dbPublications map[string]string | ||||
| 		rows *sql.Rows | ||||
| 	) | ||||
| 
 | ||||
| 	if rows, err = c.pgDb.Query(getPublicationsSQL); err != nil { | ||||
|  | @ -535,6 +534,8 @@ func (c *Cluster) getPublications() (publications map[string]string, err error) | |||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	dbPublications := make(map[string]string) | ||||
| 
 | ||||
| 	for rows.Next() { | ||||
| 		var ( | ||||
| 			dbPublication       string | ||||
|  | @ -566,7 +567,7 @@ func (c *Cluster) executeAlterPublication(pubName, tableList string) error { | |||
| 
 | ||||
| func (c *Cluster) execCreateOrAlterPublication(pubName, tableList, statement, doing, operation string) error { | ||||
| 
 | ||||
| 	c.logger.Infof("%s %q table list %q", doing, pubName, tableList) | ||||
| 	c.logger.Infof("%s %q with table list %q", doing, pubName, tableList) | ||||
| 	if _, err := c.pgDb.Exec(fmt.Sprintf(statement, pubName, tableList)); err != nil { | ||||
| 		return fmt.Errorf("could not execute %s: %v", operation, err) | ||||
| 	} | ||||
|  |  | |||
|  | @ -91,11 +91,11 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) { | |||
| 				MaximumLagOnFailover:  33554432, | ||||
| 				SynchronousMode:       true, | ||||
| 				SynchronousModeStrict: true, | ||||
| 				Slots:                 map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "pgoutput"}}, | ||||
| 				Slots:                 map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "wal2json"}}, | ||||
| 			}, | ||||
| 			role:     "zalandos", | ||||
| 			opConfig: config.Config{}, | ||||
| 			result:   `{"postgresql":{"bin_dir":"/usr/lib/postgresql/11/bin","pg_hba":["hostssl all all 0.0.0.0/0 md5","host    all all 0.0.0.0/0 md5"]},"bootstrap":{"initdb":[{"auth-host":"md5"},{"auth-local":"trust"},"data-checksums",{"encoding":"UTF8"},{"locale":"en_US.UTF-8"}],"users":{"zalandos":{"password":"","options":["CREATEDB","NOLOGIN"]}},"dcs":{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"synchronous_mode":true,"synchronous_mode_strict":true,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}}}}`, | ||||
| 			result:   `{"postgresql":{"bin_dir":"/usr/lib/postgresql/11/bin","pg_hba":["hostssl all all 0.0.0.0/0 md5","host    all all 0.0.0.0/0 md5"]},"bootstrap":{"initdb":[{"auth-host":"md5"},{"auth-local":"trust"},"data-checksums",{"encoding":"UTF8"},{"locale":"en_US.UTF-8"}],"users":{"zalandos":{"password":"","options":["CREATEDB","NOLOGIN"]}},"dcs":{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"synchronous_mode":true,"synchronous_mode_strict":true,"slots":{"permanent_logical_1":{"database":"foo","plugin":"wal2json","type":"logical"}}}}}`, | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, tt := range tests { | ||||
|  |  | |||
|  | @ -75,7 +75,6 @@ func gatherApplicationIds(streams []acidv1.Stream) []string { | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncPostgresConfig() error { | ||||
| 
 | ||||
| 	slots := make(map[string]map[string]string) | ||||
| 	publications := make(map[string]map[string]acidv1.StreamTable) | ||||
| 	createPublications := make(map[string]string) | ||||
|  | @ -92,10 +91,11 @@ func (c *Cluster) syncPostgresConfig() error { | |||
| 		slots = desiredPatroniConfig.Slots | ||||
| 	} | ||||
| 
 | ||||
| 	// define extra logical slots for Patroni config
 | ||||
| 	for _, stream := range c.Spec.Streams { | ||||
| 		slot := map[string]string{ | ||||
| 			"database": stream.Database, | ||||
| 			"plugin":   "pgoutput", | ||||
| 			"plugin":   constants.EventStreamSourcePluginType, | ||||
| 			"type":     "logical", | ||||
| 		} | ||||
| 		slotName := getSlotName(stream.Database, stream.ApplicationId) | ||||
|  | @ -114,18 +114,15 @@ func (c *Cluster) syncPostgresConfig() error { | |||
| 	} | ||||
| 
 | ||||
| 	if len(slots) > 0 { | ||||
| 		c.logger.Debugf("setting wal level to 'logical' in Postgres configuration to allow for decoding changes") | ||||
| 		for slotName, slot := range slots { | ||||
| 			c.logger.Debugf("creating logical replication slot %q in database %q", slotName, slot["database"]) | ||||
| 		} | ||||
| 		desiredPatroniConfig.Slots = slots | ||||
| 	} else { | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	// if streams are defined wal_level must be switched to logical and slots have to be defined
 | ||||
| 	// if streams are defined wal_level must be switched to logical
 | ||||
| 	desiredPgParameters := map[string]string{"wal_level": "logical"} | ||||
| 
 | ||||
| 	// apply config changes in pods
 | ||||
| 	pods, err := c.listPods() | ||||
| 	if err != nil || len(pods) == 0 { | ||||
| 		c.logger.Warningf("could not list pods of the statefulset: %v", err) | ||||
|  | @ -146,6 +143,7 @@ func (c *Cluster) syncPostgresConfig() error { | |||
| 	} | ||||
| 
 | ||||
| 	// next, create publications to each created slot
 | ||||
| 	c.logger.Debug("syncing database publications") | ||||
| 	for publication, tables := range publications { | ||||
| 		// but first check for existing publications
 | ||||
| 		dbName := slots[publication]["database"] | ||||
|  | @ -162,7 +160,7 @@ func (c *Cluster) syncPostgresConfig() error { | |||
| 		i := 0 | ||||
| 		for t := range tables { | ||||
| 			tableName, schemaName := getTableSchema(t) | ||||
| 			tableNames[i] = fmt.Sprintf("%q.%q", schemaName, tableName) | ||||
| 			tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) | ||||
| 			i++ | ||||
| 		} | ||||
| 		sort.Strings(tableNames) | ||||
|  | @ -286,9 +284,9 @@ func getSlotName(dbName, appId string) string { | |||
| 
 | ||||
| func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Connection { | ||||
| 	return zalandov1.Connection{ | ||||
| 		Url:             fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), | ||||
| 		SlotName:        getSlotName(database, appId), | ||||
| 		PublicationName: getSlotName(database, appId), | ||||
| 		Url:        fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), | ||||
| 		SlotName:   getSlotName(database, appId), | ||||
| 		PluginType: constants.EventStreamSourcePluginType, | ||||
| 		DBAuth: zalandov1.DBAuth{ | ||||
| 			Type:        constants.EventStreamSourceAuthType, | ||||
| 			Name:        c.credentialSecretNameForCluster(user, c.Name), | ||||
|  |  | |||
|  | @ -119,9 +119,9 @@ var ( | |||
| 								Type:        constants.EventStreamSourceAuthType, | ||||
| 								UserKey:     "username", | ||||
| 							}, | ||||
| 							Url:             fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser), | ||||
| 							SlotName:        slotName, | ||||
| 							PublicationName: slotName, | ||||
| 							Url:        fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser), | ||||
| 							SlotName:   slotName, | ||||
| 							PluginType: constants.EventStreamSourcePluginType, | ||||
| 						}, | ||||
| 						Schema: "data", | ||||
| 						EventStreamTable: v1.EventStreamTable{ | ||||
|  |  | |||
|  | @ -6,6 +6,7 @@ const ( | |||
| 	EventStreamSourceCRDName     = "fabriceventstreams.zalando.org" | ||||
| 	EventStreamSourcePGType      = "PostgresLogicalReplication" | ||||
| 	EventStreamSourceSlotPrefix  = "fes" | ||||
| 	EventStreamSourcePluginType  = "pgoutput" | ||||
| 	EventStreamSourceAuthType    = "DatabaseAuthenticationSecret" | ||||
| 	EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent" | ||||
| 	EventStreamSinkNakadiType    = "Nakadi" | ||||
|  |  | |||
|  | @ -185,7 +185,7 @@ func TestGetConfig(t *testing.T) { | |||
| 		Slots: map[string]map[string]string{ | ||||
| 			"cdc": { | ||||
| 				"database": "foo", | ||||
| 				"plugin":   "pgoutput", | ||||
| 				"plugin":   "wal2json", | ||||
| 				"type":     "logical", | ||||
| 			}, | ||||
| 		}, | ||||
|  | @ -218,7 +218,7 @@ func TestGetConfig(t *testing.T) { | |||
| 		"wal_log_hints":                   "on", | ||||
| 	} | ||||
| 
 | ||||
| 	configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 100, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "hot_standby", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "pgoutput", "type": "logical"}}, "ttl": 30}` | ||||
| 	configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 100, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "hot_standby", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "wal2json", "type": "logical"}}, "ttl": 30}` | ||||
| 	r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) | ||||
| 
 | ||||
| 	response := http.Response{ | ||||
|  | @ -253,7 +253,7 @@ func TestSetPostgresParameters(t *testing.T) { | |||
| 		"wal_level":       "logical", | ||||
| 	} | ||||
| 
 | ||||
| 	configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 50, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "logical", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "pgoutput", "type": "logical"}}, "ttl": 30}` | ||||
| 	configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 50, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "logical", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "wal2json", "type": "logical"}}, "ttl": 30}` | ||||
| 	r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) | ||||
| 
 | ||||
| 	response := http.Response{ | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue