improve Patroni config sync (#1635)
* improve Patroni config sync * collect new and updated slots to patch patroni * refactor httpGet in Patroni and extend unit tests * GetMemberData should call the patroni endpoint * add PATCH test
This commit is contained in:
		
							parent
							
								
									6dc239aa32
								
							
						
					
					
						commit
						2a33bf3313
					
				|  | @ -1099,19 +1099,19 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
| 
 | 
 | ||||||
|             def compare_config(): |             def compare_config(): | ||||||
|                 effective_config = k8s.patroni_rest(masterPod.metadata.name, "config") |                 effective_config = k8s.patroni_rest(masterPod.metadata.name, "config") | ||||||
|                 desired_patroni = pg_patch_config["spec"]["patroni"] |                 desired_config = pg_patch_config["spec"]["patroni"] | ||||||
|                 desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] |                 desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] | ||||||
|                 effective_parameters = effective_config["postgresql"]["parameters"] |                 effective_parameters = effective_config["postgresql"]["parameters"] | ||||||
|                 self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], |                 self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], | ||||||
|                             "max_connections not updated") |                             "max_connections not updated") | ||||||
|                 self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") |                 self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") | ||||||
|                 self.assertEqual(desired_patroni["ttl"], effective_config["ttl"], |                 self.assertEqual(desired_config["ttl"], effective_config["ttl"], | ||||||
|                             "ttl not updated") |                             "ttl not updated") | ||||||
|                 self.assertEqual(desired_patroni["loop_wait"], effective_config["loop_wait"], |                 self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"], | ||||||
|                             "loop_wait not updated") |                             "loop_wait not updated") | ||||||
|                 self.assertEqual(desired_patroni["retry_timeout"], effective_config["retry_timeout"], |                 self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"], | ||||||
|                             "retry_timeout not updated") |                             "retry_timeout not updated") | ||||||
|                 self.assertEqual(desired_patroni["synchronous_mode"], effective_config["synchronous_mode"], |                 self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"], | ||||||
|                             "synchronous_mode not updated") |                             "synchronous_mode not updated") | ||||||
|                 return True |                 return True | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -30,13 +30,12 @@ import ( | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| 	pgBinariesLocationTemplate       = "/usr/lib/postgresql/%v/bin" | 	pgBinariesLocationTemplate     = "/usr/lib/postgresql/%v/bin" | ||||||
| 	patroniPGBinariesParameterName   = "bin_dir" | 	patroniPGBinariesParameterName = "bin_dir" | ||||||
| 	patroniPGParametersParameterName = "parameters" | 	patroniPGHBAConfParameterName  = "pg_hba" | ||||||
| 	patroniPGHBAConfParameterName    = "pg_hba" | 	localHost                      = "127.0.0.1/32" | ||||||
| 	localHost                        = "127.0.0.1/32" | 	connectionPoolerContainer      = "connection-pooler" | ||||||
| 	connectionPoolerContainer        = "connection-pooler" | 	pgPort                         = 5432 | ||||||
| 	pgPort                           = 5432 |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type pgUser struct { | type pgUser struct { | ||||||
|  | @ -277,11 +276,11 @@ PatroniInitDBParams: | ||||||
| 		local, bootstrap := getLocalAndBoostrapPostgreSQLParameters(pg.Parameters) | 		local, bootstrap := getLocalAndBoostrapPostgreSQLParameters(pg.Parameters) | ||||||
| 
 | 
 | ||||||
| 		if len(local) > 0 { | 		if len(local) > 0 { | ||||||
| 			config.PgLocalConfiguration[patroniPGParametersParameterName] = local | 			config.PgLocalConfiguration[constants.PatroniPGParametersParameterName] = local | ||||||
| 		} | 		} | ||||||
| 		if len(bootstrap) > 0 { | 		if len(bootstrap) > 0 { | ||||||
| 			config.Bootstrap.DCS.PGBootstrapConfiguration = make(map[string]interface{}) | 			config.Bootstrap.DCS.PGBootstrapConfiguration = make(map[string]interface{}) | ||||||
| 			config.Bootstrap.DCS.PGBootstrapConfiguration[patroniPGParametersParameterName] = bootstrap | 			config.Bootstrap.DCS.PGBootstrapConfiguration[constants.PatroniPGParametersParameterName] = bootstrap | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	// Patroni gives us a choice of writing pg_hba.conf to either the bootstrap section or to the local postgresql one.
 | 	// Patroni gives us a choice of writing pg_hba.conf to either the bootstrap section or to the local postgresql one.
 | ||||||
|  |  | ||||||
|  | @ -395,18 +395,24 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 	// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
 | 	// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
 | ||||||
| 	// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
 | 	// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
 | ||||||
| 	for i, pod := range pods { | 	for i, pod := range pods { | ||||||
|  | 		emptyPatroniConfig := acidv1.Patroni{} | ||||||
| 		podName := util.NameFromMeta(pods[i].ObjectMeta) | 		podName := util.NameFromMeta(pods[i].ObjectMeta) | ||||||
| 		config, err := c.patroni.GetConfig(&pod) | 		patroniConfig, pgParameters, err := c.patroni.GetConfig(&pod) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) | 			c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, config) | 
 | ||||||
| 		if err != nil { | 		// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
 | ||||||
| 			c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | 		// do not attempt a restart
 | ||||||
| 			continue | 		if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { | ||||||
|  | 			instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) | ||||||
|  | 			if err != nil { | ||||||
|  | 				c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			break | ||||||
| 		} | 		} | ||||||
| 		break |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// if the config update requires a restart, call Patroni restart for replicas first, then master
 | 	// if the config update requires a restart, call Patroni restart for replicas first, then master
 | ||||||
|  | @ -493,16 +499,9 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri | ||||||
| 
 | 
 | ||||||
| // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
 | // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
 | ||||||
| // (like max_connections) have changed and if necessary sets it via the Patroni API
 | // (like max_connections) have changed and if necessary sets it via the Patroni API
 | ||||||
| func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig map[string]interface{}) (bool, error) { | func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, error) { | ||||||
| 	configToSet := make(map[string]interface{}) | 	configToSet := make(map[string]interface{}) | ||||||
| 	parametersToSet := make(map[string]string) | 	parametersToSet := make(map[string]string) | ||||||
| 	effectivePgParameters := make(map[string]interface{}) |  | ||||||
| 
 |  | ||||||
| 	// read effective Patroni config if set
 |  | ||||||
| 	if patroniConfig != nil { |  | ||||||
| 		effectivePostgresql := patroniConfig["postgresql"].(map[string]interface{}) |  | ||||||
| 		effectivePgParameters = effectivePostgresql[patroniPGParametersParameterName].(map[string]interface{}) |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	// compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest
 | 	// compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest
 | ||||||
| 	desiredPgParameters := c.Spec.Parameters | 	desiredPgParameters := c.Spec.Parameters | ||||||
|  | @ -514,36 +513,47 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if len(parametersToSet) > 0 { | 	if len(parametersToSet) > 0 { | ||||||
| 		configToSet["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: parametersToSet} | 		configToSet["postgresql"] = map[string]interface{}{constants.PatroniPGParametersParameterName: parametersToSet} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// compare other options from config with c.Spec.Patroni from manifest
 | 	// compare other options from config with c.Spec.Patroni from manifest
 | ||||||
| 	desiredPatroniConfig := c.Spec.Patroni | 	desiredPatroniConfig := c.Spec.Patroni | ||||||
| 	if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != uint32(patroniConfig["loop_wait"].(float64)) { | 	if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != patroniConfig.LoopWait { | ||||||
| 		configToSet["loop_wait"] = desiredPatroniConfig.LoopWait | 		configToSet["loop_wait"] = desiredPatroniConfig.LoopWait | ||||||
| 	} | 	} | ||||||
| 	if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != float32(patroniConfig["maximum_lag_on_failover"].(float64)) { | 	if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != patroniConfig.MaximumLagOnFailover { | ||||||
| 		configToSet["maximum_lag_on_failover"] = desiredPatroniConfig.MaximumLagOnFailover | 		configToSet["maximum_lag_on_failover"] = desiredPatroniConfig.MaximumLagOnFailover | ||||||
| 	} | 	} | ||||||
| 	if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, (patroniConfig["pg_hba"])) { | 	if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, patroniConfig.PgHba) { | ||||||
| 		configToSet["pg_hba"] = desiredPatroniConfig.PgHba | 		configToSet["pg_hba"] = desiredPatroniConfig.PgHba | ||||||
| 	} | 	} | ||||||
| 	if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != uint32(patroniConfig["retry_timeout"].(float64)) { | 	if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != patroniConfig.RetryTimeout { | ||||||
| 		configToSet["retry_timeout"] = desiredPatroniConfig.RetryTimeout | 		configToSet["retry_timeout"] = desiredPatroniConfig.RetryTimeout | ||||||
| 	} | 	} | ||||||
| 	if desiredPatroniConfig.Slots != nil && !reflect.DeepEqual(desiredPatroniConfig.Slots, patroniConfig["slots"]) { | 	if desiredPatroniConfig.SynchronousMode != patroniConfig.SynchronousMode { | ||||||
| 		configToSet["slots"] = desiredPatroniConfig.Slots |  | ||||||
| 	} |  | ||||||
| 	if desiredPatroniConfig.SynchronousMode != patroniConfig["synchronous_mode"] { |  | ||||||
| 		configToSet["synchronous_mode"] = desiredPatroniConfig.SynchronousMode | 		configToSet["synchronous_mode"] = desiredPatroniConfig.SynchronousMode | ||||||
| 	} | 	} | ||||||
| 	if desiredPatroniConfig.SynchronousModeStrict != patroniConfig["synchronous_mode_strict"] { | 	if desiredPatroniConfig.SynchronousModeStrict != patroniConfig.SynchronousModeStrict { | ||||||
| 		configToSet["synchronous_mode_strict"] = desiredPatroniConfig.SynchronousModeStrict | 		configToSet["synchronous_mode_strict"] = desiredPatroniConfig.SynchronousModeStrict | ||||||
| 	} | 	} | ||||||
| 	if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != uint32(patroniConfig["ttl"].(float64)) { | 	if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != patroniConfig.TTL { | ||||||
| 		configToSet["ttl"] = desiredPatroniConfig.TTL | 		configToSet["ttl"] = desiredPatroniConfig.TTL | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// check if specified slots exist in config and if they differ
 | ||||||
|  | 	slotsToSet := make(map[string]map[string]string) | ||||||
|  | 	for slotName, desiredSlot := range desiredPatroniConfig.Slots { | ||||||
|  | 		if effectiveSlot, exists := patroniConfig.Slots[slotName]; exists { | ||||||
|  | 			if reflect.DeepEqual(desiredSlot, effectiveSlot) { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		slotsToSet[slotName] = desiredSlot | ||||||
|  | 	} | ||||||
|  | 	if len(slotsToSet) > 0 { | ||||||
|  | 		configToSet["slots"] = slotsToSet | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	if len(configToSet) == 0 { | 	if len(configToSet) == 0 { | ||||||
| 		return false, nil | 		return false, nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -8,6 +8,8 @@ const ( | ||||||
| 	PostgresDataMount = "/home/postgres/pgdata" | 	PostgresDataMount = "/home/postgres/pgdata" | ||||||
| 	PostgresDataPath  = PostgresDataMount + "/pgroot" | 	PostgresDataPath  = PostgresDataMount + "/pgroot" | ||||||
| 
 | 
 | ||||||
|  | 	PatroniPGParametersParameterName = "parameters" | ||||||
|  | 
 | ||||||
| 	PostgresConnectRetryTimeout = 2 * time.Minute | 	PostgresConnectRetryTimeout = 2 * time.Minute | ||||||
| 	PostgresConnectTimeout      = 15 * time.Second | 	PostgresConnectTimeout      = 15 * time.Second | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -10,9 +10,11 @@ import ( | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/zalando/postgres-operator/pkg/util/constants" | ||||||
| 	httpclient "github.com/zalando/postgres-operator/pkg/util/httpclient" | 	httpclient "github.com/zalando/postgres-operator/pkg/util/httpclient" | ||||||
| 
 | 
 | ||||||
| 	"github.com/sirupsen/logrus" | 	"github.com/sirupsen/logrus" | ||||||
|  | 	acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" | ||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -31,7 +33,7 @@ type Interface interface { | ||||||
| 	SetPostgresParameters(server *v1.Pod, options map[string]string) error | 	SetPostgresParameters(server *v1.Pod, options map[string]string) error | ||||||
| 	GetMemberData(server *v1.Pod) (MemberData, error) | 	GetMemberData(server *v1.Pod) (MemberData, error) | ||||||
| 	Restart(server *v1.Pod) error | 	Restart(server *v1.Pod) error | ||||||
| 	GetConfig(server *v1.Pod) (map[string]interface{}, error) | 	GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error) | ||||||
| 	SetConfig(server *v1.Pod, config map[string]interface{}) error | 	SetConfig(server *v1.Pod, config map[string]interface{}) error | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -109,28 +111,23 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (p *Patroni) httpGet(url string) (string, error) { | func (p *Patroni) httpGet(url string) (string, error) { | ||||||
| 	request, err := http.NewRequest("GET", url, nil) | 	p.logger.Debugf("making GET http request: %s", url) | ||||||
| 	if err != nil { |  | ||||||
| 		return "", fmt.Errorf("could not create request: %v", err) |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	p.logger.Debugf("making GET http request: %s", request.URL.String()) | 	response, err := p.httpClient.Get(url) | ||||||
| 
 |  | ||||||
| 	resp, err := p.httpClient.Do(request) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return "", fmt.Errorf("could not make request: %v", err) | 		return "", fmt.Errorf("could not make request: %v", err) | ||||||
| 	} | 	} | ||||||
| 	bodyBytes, err := ioutil.ReadAll(resp.Body) | 	defer response.Body.Close() | ||||||
|  | 
 | ||||||
|  | 	bodyBytes, err := ioutil.ReadAll(response.Body) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return "", fmt.Errorf("could not read response: %v", err) | 		return "", fmt.Errorf("could not read response: %v", err) | ||||||
| 	} | 	} | ||||||
| 	if err := resp.Body.Close(); err != nil { | 
 | ||||||
| 		return "", fmt.Errorf("could not close request: %v", err) | 	if response.StatusCode != http.StatusOK { | ||||||
|  | 		return string(bodyBytes), fmt.Errorf("patroni returned '%d'", response.StatusCode) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if resp.StatusCode != http.StatusOK { |  | ||||||
| 		return string(bodyBytes), fmt.Errorf("patroni returned '%d'", resp.StatusCode) |  | ||||||
| 	} |  | ||||||
| 	return string(bodyBytes), nil | 	return string(bodyBytes), nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -194,30 +191,43 @@ type MemberData struct { | ||||||
| 	Patroni         MemberDataPatroni `json:"patroni"` | 	Patroni         MemberDataPatroni `json:"patroni"` | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (p *Patroni) GetConfigOrStatus(server *v1.Pod, path string) (map[string]interface{}, error) { | func (p *Patroni) GetConfig(server *v1.Pod) (acidv1.Patroni, map[string]string, error) { | ||||||
| 	result := make(map[string]interface{}) | 	var ( | ||||||
|  | 		patroniConfig acidv1.Patroni | ||||||
|  | 		pgConfig      map[string]interface{} | ||||||
|  | 	) | ||||||
| 	apiURLString, err := apiURL(server) | 	apiURLString, err := apiURL(server) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return result, err | 		return patroniConfig, nil, err | ||||||
| 	} | 	} | ||||||
| 	body, err := p.httpGet(apiURLString + path) | 	body, err := p.httpGet(apiURLString + configPath) | ||||||
| 	err = json.Unmarshal([]byte(body), &result) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return result, err | 		return patroniConfig, nil, err | ||||||
|  | 	} | ||||||
|  | 	err = json.Unmarshal([]byte(body), &patroniConfig) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return patroniConfig, nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return result, err | 	// unmarshalling postgresql parameters needs a detour
 | ||||||
|  | 	err = json.Unmarshal([]byte(body), &pgConfig) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return patroniConfig, nil, err | ||||||
|  | 	} | ||||||
|  | 	pgParameters := make(map[string]string) | ||||||
|  | 	if _, exists := pgConfig["postgresql"]; exists { | ||||||
|  | 		effectivePostgresql := pgConfig["postgresql"].(map[string]interface{}) | ||||||
|  | 		effectivePgParameters := effectivePostgresql[constants.PatroniPGParametersParameterName].(map[string]interface{}) | ||||||
|  | 		for parameter, value := range effectivePgParameters { | ||||||
|  | 			strValue := fmt.Sprintf("%v", value) | ||||||
|  | 			pgParameters[parameter] = strValue | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return patroniConfig, pgParameters, err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (p *Patroni) GetStatus(server *v1.Pod) (map[string]interface{}, error) { | // Restart method restarts instance via Patroni POST API call.
 | ||||||
| 	return p.GetConfigOrStatus(server, statusPath) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (p *Patroni) GetConfig(server *v1.Pod) (map[string]interface{}, error) { |  | ||||||
| 	return p.GetConfigOrStatus(server, configPath) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| //Restart method restarts instance via Patroni POST API call.
 |  | ||||||
| func (p *Patroni) Restart(server *v1.Pod) error { | func (p *Patroni) Restart(server *v1.Pod) error { | ||||||
| 	buf := &bytes.Buffer{} | 	buf := &bytes.Buffer{} | ||||||
| 	err := json.NewEncoder(buf).Encode(map[string]interface{}{"restart_pending": true}) | 	err := json.NewEncoder(buf).Encode(map[string]interface{}{"restart_pending": true}) | ||||||
|  | @ -228,9 +238,13 @@ func (p *Patroni) Restart(server *v1.Pod) error { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	status, err := p.GetStatus(server) | 	memberData, err := p.GetMemberData(server) | ||||||
| 	pending_restart, ok := status["pending_restart"] | 	if err != nil { | ||||||
| 	if !ok || !pending_restart.(bool) { | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// do restart only when it is pending
 | ||||||
|  | 	if !memberData.PendingRestart { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	return p.httpPostOrPatch(http.MethodPost, apiURLString+restartPath, buf) | 	return p.httpPostOrPatch(http.MethodPost, apiURLString+restartPath, buf) | ||||||
|  | @ -243,19 +257,13 @@ func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return MemberData{}, err | 		return MemberData{}, err | ||||||
| 	} | 	} | ||||||
| 	response, err := p.httpClient.Get(apiURLString) | 	body, err := p.httpGet(apiURLString + statusPath) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return MemberData{}, fmt.Errorf("could not perform Get request: %v", err) | 		return MemberData{}, err | ||||||
| 	} |  | ||||||
| 	defer response.Body.Close() |  | ||||||
| 
 |  | ||||||
| 	body, err := ioutil.ReadAll(response.Body) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return MemberData{}, fmt.Errorf("could not read response: %v", err) |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	data := MemberData{} | 	data := MemberData{} | ||||||
| 	err = json.Unmarshal(body, &data) | 	err = json.Unmarshal([]byte(body), &data) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return MemberData{}, err | 		return MemberData{}, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -6,14 +6,19 @@ import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"reflect" | ||||||
| 	"testing" | 	"testing" | ||||||
| 
 | 
 | ||||||
| 	"github.com/golang/mock/gomock" | 	"github.com/golang/mock/gomock" | ||||||
|  | 	"github.com/sirupsen/logrus" | ||||||
| 	"github.com/zalando/postgres-operator/mocks" | 	"github.com/zalando/postgres-operator/mocks" | ||||||
| 
 | 
 | ||||||
|  | 	acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" | ||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | var logger = logrus.New().WithField("test", "patroni") | ||||||
|  | 
 | ||||||
| func newMockPod(ip string) *v1.Pod { | func newMockPod(ip string) *v1.Pod { | ||||||
| 	return &v1.Pod{ | 	return &v1.Pod{ | ||||||
| 		Status: v1.PodStatus{ | 		Status: v1.PodStatus{ | ||||||
|  | @ -80,31 +85,141 @@ func TestApiURL(t *testing.T) { | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func TestPatroniAPI(t *testing.T) { | func TestGetMemberData(t *testing.T) { | ||||||
| 	ctrl := gomock.NewController(t) | 	ctrl := gomock.NewController(t) | ||||||
| 	defer ctrl.Finish() | 	defer ctrl.Finish() | ||||||
| 
 | 
 | ||||||
| 	json := `{"state": "running", "postmaster_start_time": "2021-02-19 14:31:50.053 CET", "role": "master", "server_version": 90621, "cluster_unlocked": false, "xlog": {"location": 55978296057856}, "timeline": 6, "database_system_identifier": "6462555844314089962", "pending_restart": true, "patroni": {"version": "2.0.1", "scope": "acid-rest92-standby"}}` | 	expectedMemberData := MemberData{ | ||||||
|  | 		State:          "running", | ||||||
|  | 		Role:           "master", | ||||||
|  | 		ServerVersion:  130004, | ||||||
|  | 		PendingRestart: true, | ||||||
|  | 		Patroni: MemberDataPatroni{ | ||||||
|  | 			Version: "2.1.1", | ||||||
|  | 			Scope:   "acid-test-cluster", | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	json := `{"state": "running", "postmaster_start_time": "2021-02-19 14:31:50.053 CET", "role": "master", "server_version": 130004, "cluster_unlocked": false, "xlog": {"location": 123456789}, "timeline": 1, "database_system_identifier": "6462555844314089962", "pending_restart": true, "patroni": {"version": "2.1.1", "scope": "acid-test-cluster"}}` | ||||||
| 	r := ioutil.NopCloser(bytes.NewReader([]byte(json))) | 	r := ioutil.NopCloser(bytes.NewReader([]byte(json))) | ||||||
| 
 | 
 | ||||||
| 	response := http.Response{ | 	response := http.Response{ | ||||||
| 		Status: "200", | 		StatusCode: 200, | ||||||
| 		Body:   r, | 		Body:       r, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	mockClient := mocks.NewMockHTTPClient(ctrl) | 	mockClient := mocks.NewMockHTTPClient(ctrl) | ||||||
| 	mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil) | 	mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil) | ||||||
| 
 | 
 | ||||||
| 	p := New(nil, mockClient) | 	p := New(logger, mockClient) | ||||||
| 
 | 
 | ||||||
| 	pod := v1.Pod{ | 	memberData, err := p.GetMemberData(newMockPod("192.168.100.1")) | ||||||
| 		Status: v1.PodStatus{ | 
 | ||||||
| 			PodIP: "192.168.100.1", | 	if !reflect.DeepEqual(expectedMemberData, memberData) { | ||||||
| 		}, | 		t.Errorf("Patroni member data differs: expected: %#v, got: %#v", expectedMemberData, memberData) | ||||||
| 	} | 	} | ||||||
| 	_, err := p.GetMemberData(&pod) |  | ||||||
| 
 | 
 | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Errorf("Could not read Patroni data: %v", err) | 		t.Errorf("Could not read Patroni data: %v", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func TestGetConfig(t *testing.T) { | ||||||
|  | 	ctrl := gomock.NewController(t) | ||||||
|  | 	defer ctrl.Finish() | ||||||
|  | 
 | ||||||
|  | 	expectedPatroniConfig := acidv1.Patroni{ | ||||||
|  | 		TTL:                  30, | ||||||
|  | 		LoopWait:             10, | ||||||
|  | 		RetryTimeout:         10, | ||||||
|  | 		MaximumLagOnFailover: 33554432, | ||||||
|  | 		Slots: map[string]map[string]string{ | ||||||
|  | 			"cdc": { | ||||||
|  | 				"database": "foo", | ||||||
|  | 				"plugin":   "wal2json", | ||||||
|  | 				"type":     "logical", | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	expectedPgParameters := map[string]string{ | ||||||
|  | 		"archive_mode":                    "on", | ||||||
|  | 		"archive_timeout":                 "1800s", | ||||||
|  | 		"autovacuum_analyze_scale_factor": "0.02", | ||||||
|  | 		"autovacuum_max_workers":          "5", | ||||||
|  | 		"autovacuum_vacuum_scale_factor":  "0.05", | ||||||
|  | 		"checkpoint_completion_target":    "0.9", | ||||||
|  | 		"hot_standby":                     "on", | ||||||
|  | 		"log_autovacuum_min_duration":     "0", | ||||||
|  | 		"log_checkpoints":                 "on", | ||||||
|  | 		"log_connections":                 "on", | ||||||
|  | 		"log_disconnections":              "on", | ||||||
|  | 		"log_line_prefix":                 "%t [%p]: [%l-1] %c %x %d %u %a %h ", | ||||||
|  | 		"log_lock_waits":                  "on", | ||||||
|  | 		"log_min_duration_statement":      "500", | ||||||
|  | 		"log_statement":                   "ddl", | ||||||
|  | 		"log_temp_files":                  "0", | ||||||
|  | 		"max_connections":                 "100", | ||||||
|  | 		"max_replication_slots":           "10", | ||||||
|  | 		"max_wal_senders":                 "10", | ||||||
|  | 		"tcp_keepalives_idle":             "900", | ||||||
|  | 		"tcp_keepalives_interval":         "100", | ||||||
|  | 		"track_functions":                 "all", | ||||||
|  | 		"wal_level":                       "hot_standby", | ||||||
|  | 		"wal_log_hints":                   "on", | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 100, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "hot_standby", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "wal2json", "type": "logical"}}, "ttl": 30}` | ||||||
|  | 	r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) | ||||||
|  | 
 | ||||||
|  | 	response := http.Response{ | ||||||
|  | 		StatusCode: 200, | ||||||
|  | 		Body:       r, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	mockClient := mocks.NewMockHTTPClient(ctrl) | ||||||
|  | 	mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil) | ||||||
|  | 
 | ||||||
|  | 	p := New(logger, mockClient) | ||||||
|  | 
 | ||||||
|  | 	patroniConfig, pgParameters, err := p.GetConfig(newMockPod("192.168.100.1")) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Errorf("Could not read Patroni config endpoint: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if !reflect.DeepEqual(expectedPatroniConfig, patroniConfig) { | ||||||
|  | 		t.Errorf("Patroni config differs: expected: %#v, got: %#v", expectedPatroniConfig, patroniConfig) | ||||||
|  | 	} | ||||||
|  | 	if !reflect.DeepEqual(expectedPgParameters, pgParameters) { | ||||||
|  | 		t.Errorf("Postgre parameters differ: expected: %#v, got: %#v", expectedPgParameters, pgParameters) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestSetPostgresParameters(t *testing.T) { | ||||||
|  | 	ctrl := gomock.NewController(t) | ||||||
|  | 	defer ctrl.Finish() | ||||||
|  | 
 | ||||||
|  | 	parametersToSet := map[string]string{ | ||||||
|  | 		"max_connections": "50", | ||||||
|  | 		"wal_level":       "logical", | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 50, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "logical", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "wal2json", "type": "logical"}}, "ttl": 30}` | ||||||
|  | 	r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) | ||||||
|  | 
 | ||||||
|  | 	response := http.Response{ | ||||||
|  | 		StatusCode: 200, | ||||||
|  | 		Body:       r, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	mockClient := mocks.NewMockHTTPClient(ctrl) | ||||||
|  | 	mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil) | ||||||
|  | 
 | ||||||
|  | 	p := New(logger, mockClient) | ||||||
|  | 
 | ||||||
|  | 	err := p.SetPostgresParameters(newMockPod("192.168.100.1"), parametersToSet) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Errorf("could not call patch Patroni config: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue