improve Patroni config sync
This commit is contained in:
		
							parent
							
								
									d3183604a7
								
							
						
					
					
						commit
						4989d3b2dd
					
				|  | @ -1098,19 +1098,19 @@ class EndToEndTestCase(unittest.TestCase): | |||
| 
 | ||||
|             def compare_config(): | ||||
|                 effective_config = k8s.patroni_rest(masterPod.metadata.name, "config") | ||||
|                 desired_patroni = pg_patch_config["spec"]["patroni"] | ||||
|                 desired_config = pg_patch_config["spec"]["patroni"] | ||||
|                 desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] | ||||
|                 effective_parameters = effective_config["postgresql"]["parameters"] | ||||
|                 self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], | ||||
|                             "max_connections not updated") | ||||
|                 self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") | ||||
|                 self.assertEqual(desired_patroni["ttl"], effective_config["ttl"], | ||||
|                 self.assertEqual(desired_config["ttl"], effective_config["ttl"], | ||||
|                             "ttl not updated") | ||||
|                 self.assertEqual(desired_patroni["loop_wait"], effective_config["loop_wait"], | ||||
|                 self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"], | ||||
|                             "loop_wait not updated") | ||||
|                 self.assertEqual(desired_patroni["retry_timeout"], effective_config["retry_timeout"], | ||||
|                 self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"], | ||||
|                             "retry_timeout not updated") | ||||
|                 self.assertEqual(desired_patroni["synchronous_mode"], effective_config["synchronous_mode"], | ||||
|                 self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"], | ||||
|                             "synchronous_mode not updated") | ||||
|                 return True | ||||
| 
 | ||||
|  |  | |||
|  | @ -30,13 +30,12 @@ import ( | |||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	pgBinariesLocationTemplate       = "/usr/lib/postgresql/%v/bin" | ||||
| 	patroniPGBinariesParameterName   = "bin_dir" | ||||
| 	patroniPGParametersParameterName = "parameters" | ||||
| 	patroniPGHBAConfParameterName    = "pg_hba" | ||||
| 	localHost                        = "127.0.0.1/32" | ||||
| 	connectionPoolerContainer        = "connection-pooler" | ||||
| 	pgPort                           = 5432 | ||||
| 	pgBinariesLocationTemplate     = "/usr/lib/postgresql/%v/bin" | ||||
| 	patroniPGBinariesParameterName = "bin_dir" | ||||
| 	patroniPGHBAConfParameterName  = "pg_hba" | ||||
| 	localHost                      = "127.0.0.1/32" | ||||
| 	connectionPoolerContainer      = "connection-pooler" | ||||
| 	pgPort                         = 5432 | ||||
| ) | ||||
| 
 | ||||
| type pgUser struct { | ||||
|  | @ -277,11 +276,11 @@ PatroniInitDBParams: | |||
| 		local, bootstrap := getLocalAndBoostrapPostgreSQLParameters(pg.Parameters) | ||||
| 
 | ||||
| 		if len(local) > 0 { | ||||
| 			config.PgLocalConfiguration[patroniPGParametersParameterName] = local | ||||
| 			config.PgLocalConfiguration[constants.PatroniPGParametersParameterName] = local | ||||
| 		} | ||||
| 		if len(bootstrap) > 0 { | ||||
| 			config.Bootstrap.DCS.PGBootstrapConfiguration = make(map[string]interface{}) | ||||
| 			config.Bootstrap.DCS.PGBootstrapConfiguration[patroniPGParametersParameterName] = bootstrap | ||||
| 			config.Bootstrap.DCS.PGBootstrapConfiguration[constants.PatroniPGParametersParameterName] = bootstrap | ||||
| 		} | ||||
| 	} | ||||
| 	// Patroni gives us a choice of writing pg_hba.conf to either the bootstrap section or to the local postgresql one.
 | ||||
|  |  | |||
|  | @ -395,18 +395,24 @@ func (c *Cluster) syncStatefulSet() error { | |||
| 	// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
 | ||||
| 	// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
 | ||||
| 	for i, pod := range pods { | ||||
| 		emptyPatroniConfig := acidv1.Patroni{} | ||||
| 		podName := util.NameFromMeta(pods[i].ObjectMeta) | ||||
| 		config, err := c.patroni.GetConfig(&pod) | ||||
| 		patroniConfig, pgParameters, err := c.patroni.GetConfig(&pod) | ||||
| 		if err != nil { | ||||
| 			c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) | ||||
| 			c.logger.Warningf("could not get Postgres config from pod %s: %#v, %v", podName, patroniConfig, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, config) | ||||
| 		if err != nil { | ||||
| 			c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | ||||
| 			continue | ||||
| 
 | ||||
| 		// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | ||||
| 		// to not attempt a restart in such situation
 | ||||
| 		if reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { | ||||
| 			instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) | ||||
| 			if err != nil { | ||||
| 				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | ||||
| 				continue | ||||
| 			} | ||||
| 			break | ||||
| 		} | ||||
| 		break | ||||
| 	} | ||||
| 
 | ||||
| 	// if the config update requires a restart, call Patroni restart for replicas first, then master
 | ||||
|  | @ -493,16 +499,9 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri | |||
| 
 | ||||
| // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
 | ||||
| // (like max_connections) have changed and if necessary sets it via the Patroni API
 | ||||
| func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig map[string]interface{}) (bool, error) { | ||||
| func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, error) { | ||||
| 	configToSet := make(map[string]interface{}) | ||||
| 	parametersToSet := make(map[string]string) | ||||
| 	effectivePgParameters := make(map[string]interface{}) | ||||
| 
 | ||||
| 	// read effective Patroni config if set
 | ||||
| 	if patroniConfig != nil { | ||||
| 		effectivePostgresql := patroniConfig["postgresql"].(map[string]interface{}) | ||||
| 		effectivePgParameters = effectivePostgresql[patroniPGParametersParameterName].(map[string]interface{}) | ||||
| 	} | ||||
| 
 | ||||
| 	// compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest
 | ||||
| 	desiredPgParameters := c.Spec.Parameters | ||||
|  | @ -514,36 +513,43 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC | |||
| 	} | ||||
| 
 | ||||
| 	if len(parametersToSet) > 0 { | ||||
| 		configToSet["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: parametersToSet} | ||||
| 		configToSet["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: parametersToSet} | ||||
| 	} | ||||
| 
 | ||||
| 	// compare other options from config with c.Spec.Patroni from manifest
 | ||||
| 	desiredPatroniConfig := c.Spec.Patroni | ||||
| 	if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != uint32(patroniConfig["loop_wait"].(float64)) { | ||||
| 	if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != patroniConfig.LoopWait { | ||||
| 		configToSet["loop_wait"] = desiredPatroniConfig.LoopWait | ||||
| 	} | ||||
| 	if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != float32(patroniConfig["maximum_lag_on_failover"].(float64)) { | ||||
| 	if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != patroniConfig.MaximumLagOnFailover { | ||||
| 		configToSet["maximum_lag_on_failover"] = desiredPatroniConfig.MaximumLagOnFailover | ||||
| 	} | ||||
| 	if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, (patroniConfig["pg_hba"])) { | ||||
| 	if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, patroniConfig.PgHba) { | ||||
| 		configToSet["pg_hba"] = desiredPatroniConfig.PgHba | ||||
| 	} | ||||
| 	if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != uint32(patroniConfig["retry_timeout"].(float64)) { | ||||
| 	if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != patroniConfig.RetryTimeout { | ||||
| 		configToSet["retry_timeout"] = desiredPatroniConfig.RetryTimeout | ||||
| 	} | ||||
| 	if desiredPatroniConfig.Slots != nil && !reflect.DeepEqual(desiredPatroniConfig.Slots, patroniConfig["slots"]) { | ||||
| 		configToSet["slots"] = desiredPatroniConfig.Slots | ||||
| 	} | ||||
| 	if desiredPatroniConfig.SynchronousMode != patroniConfig["synchronous_mode"] { | ||||
| 	if desiredPatroniConfig.SynchronousMode != patroniConfig.SynchronousMode { | ||||
| 		configToSet["synchronous_mode"] = desiredPatroniConfig.SynchronousMode | ||||
| 	} | ||||
| 	if desiredPatroniConfig.SynchronousModeStrict != patroniConfig["synchronous_mode_strict"] { | ||||
| 	if desiredPatroniConfig.SynchronousModeStrict != patroniConfig.SynchronousModeStrict { | ||||
| 		configToSet["synchronous_mode_strict"] = desiredPatroniConfig.SynchronousModeStrict | ||||
| 	} | ||||
| 	if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != uint32(patroniConfig["ttl"].(float64)) { | ||||
| 	if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != patroniConfig.TTL { | ||||
| 		configToSet["ttl"] = desiredPatroniConfig.TTL | ||||
| 	} | ||||
| 
 | ||||
| 	// only check if specified slots exist in config and if they differ
 | ||||
| 	for slotName, desiredSlot := range desiredPatroniConfig.Slots { | ||||
| 		if effectiveSlot, exists := patroniConfig.Slots[slotName]; exists { | ||||
| 			if !reflect.DeepEqual(desiredSlot, effectiveSlot) { | ||||
| 				configToSet["slots"] = desiredPatroniConfig.Slots | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if len(configToSet) == 0 { | ||||
| 		return false, nil | ||||
| 	} | ||||
|  |  | |||
|  | @ -8,6 +8,8 @@ const ( | |||
| 	PostgresDataMount = "/home/postgres/pgdata" | ||||
| 	PostgresDataPath  = PostgresDataMount + "/pgroot" | ||||
| 
 | ||||
| 	PatroniPGParametersParameterName = "parameters" | ||||
| 
 | ||||
| 	PostgresConnectRetryTimeout = 2 * time.Minute | ||||
| 	PostgresConnectTimeout      = 15 * time.Second | ||||
| 
 | ||||
|  |  | |||
|  | @ -10,9 +10,11 @@ import ( | |||
| 	"strconv" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/constants" | ||||
| 	httpclient "github.com/zalando/postgres-operator/pkg/util/httpclient" | ||||
| 
 | ||||
| 	"github.com/sirupsen/logrus" | ||||
| 	acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| ) | ||||
| 
 | ||||
|  | @ -31,7 +33,7 @@ type Interface interface { | |||
| 	SetPostgresParameters(server *v1.Pod, options map[string]string) error | ||||
| 	GetMemberData(server *v1.Pod) (MemberData, error) | ||||
| 	Restart(server *v1.Pod) error | ||||
| 	GetConfig(server *v1.Pod) (map[string]interface{}, error) | ||||
| 	GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error) | ||||
| 	SetConfig(server *v1.Pod, config map[string]interface{}) error | ||||
| } | ||||
| 
 | ||||
|  | @ -194,13 +196,18 @@ type MemberData struct { | |||
| 	Patroni         MemberDataPatroni `json:"patroni"` | ||||
| } | ||||
| 
 | ||||
| func (p *Patroni) GetConfigOrStatus(server *v1.Pod, path string) (map[string]interface{}, error) { | ||||
| func (p *Patroni) GetStatus(server *v1.Pod) (map[string]interface{}, error) { | ||||
| 	result := make(map[string]interface{}) | ||||
| 	apiURLString, err := apiURL(server) | ||||
| 	if err != nil { | ||||
| 		return result, err | ||||
| 	} | ||||
| 	body, err := p.httpGet(apiURLString + path) | ||||
| 
 | ||||
| 	body, err := p.httpGet(apiURLString + statusPath) | ||||
| 	if err != nil { | ||||
| 		return result, err | ||||
| 	} | ||||
| 
 | ||||
| 	err = json.Unmarshal([]byte(body), &result) | ||||
| 	if err != nil { | ||||
| 		return result, err | ||||
|  | @ -209,12 +216,42 @@ func (p *Patroni) GetConfigOrStatus(server *v1.Pod, path string) (map[string]int | |||
| 	return result, err | ||||
| } | ||||
| 
 | ||||
| func (p *Patroni) GetStatus(server *v1.Pod) (map[string]interface{}, error) { | ||||
| 	return p.GetConfigOrStatus(server, statusPath) | ||||
| } | ||||
| func (p *Patroni) GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error) { | ||||
| 	var ( | ||||
| 		patroniConfig acidv1.Patroni | ||||
| 		pgConfig      map[string]interface{} | ||||
| 	) | ||||
| 
 | ||||
| func (p *Patroni) GetConfig(server *v1.Pod) (map[string]interface{}, error) { | ||||
| 	return p.GetConfigOrStatus(server, configPath) | ||||
| 	apiURLString, err := apiURL(server) | ||||
| 	if err != nil { | ||||
| 		return patroniConfig, nil, err | ||||
| 	} | ||||
| 	body, err := p.httpGet(apiURLString + configPath) | ||||
| 	if err != nil { | ||||
| 		return patroniConfig, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	err = json.Unmarshal([]byte(body), &patroniConfig) | ||||
| 	if err != nil { | ||||
| 		return patroniConfig, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	// unmarshalling postgresql parameters needs a detour
 | ||||
| 	err = json.Unmarshal([]byte(body), &pgConfig) | ||||
| 	if err != nil { | ||||
| 		return patroniConfig, nil, err | ||||
| 	} | ||||
| 	pgParameters := make(map[string]string) | ||||
| 	if _, exists := pgConfig["postgresql"]; exists { | ||||
| 		effectivePostgresql := pgConfig["postgresql"].(map[string]interface{}) | ||||
| 		effectivePgParameters := effectivePostgresql[constants.PatroniPGParametersParameterName].(map[string]interface{}) | ||||
| 		for parameter, value := range effectivePgParameters { | ||||
| 			strValue := fmt.Sprintf("%v", value) | ||||
| 			pgParameters[parameter] = strValue | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return patroniConfig, pgParameters, err | ||||
| } | ||||
| 
 | ||||
| //Restart method restarts instance via Patroni POST API call.
 | ||||
|  | @ -229,6 +266,9 @@ func (p *Patroni) Restart(server *v1.Pod) error { | |||
| 		return err | ||||
| 	} | ||||
| 	status, err := p.GetStatus(server) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	pending_restart, ok := status["pending_restart"] | ||||
| 	if !ok || !pending_restart.(bool) { | ||||
| 		return nil | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue