Merge branch 'master' into fes-support
This commit is contained in:
		
						commit
						2fadb740fa
					
				|  | @ -252,6 +252,13 @@ class K8s: | ||||||
|                               stdout=subprocess.PIPE, |                               stdout=subprocess.PIPE, | ||||||
|                               stderr=subprocess.PIPE) |                               stderr=subprocess.PIPE) | ||||||
| 
 | 
 | ||||||
|  |     def patroni_rest(self, pod, path): | ||||||
|  |         r = self.exec_with_kubectl(pod, "curl localhost:8008/" + path) | ||||||
|  |         if not r.returncode == 0 or not r.stdout.decode()[0:1] == "{": | ||||||
|  |             return None | ||||||
|  | 
 | ||||||
|  |         return json.loads(r.stdout.decode()) | ||||||
|  | 
 | ||||||
|     def get_patroni_state(self, pod): |     def get_patroni_state(self, pod): | ||||||
|         r = self.exec_with_kubectl(pod, "patronictl list -f json") |         r = self.exec_with_kubectl(pod, "patronictl list -f json") | ||||||
|         if not r.returncode == 0 or not r.stdout.decode()[0:1] == "[": |         if not r.returncode == 0 or not r.stdout.decode()[0:1] == "[": | ||||||
|  | @ -514,6 +521,13 @@ class K8sBase: | ||||||
|                               stdout=subprocess.PIPE, |                               stdout=subprocess.PIPE, | ||||||
|                               stderr=subprocess.PIPE) |                               stderr=subprocess.PIPE) | ||||||
| 
 | 
 | ||||||
|  |     def patroni_rest(self, pod, path): | ||||||
|  |         r = self.exec_with_kubectl(pod, "curl localhost:8008/" + path) | ||||||
|  |         if not r.returncode == 0 or not r.stdout.decode()[0:1] == "{": | ||||||
|  |             return None | ||||||
|  | 
 | ||||||
|  |         return json.loads(r.stdout.decode()) | ||||||
|  | 
 | ||||||
|     def get_patroni_state(self, pod): |     def get_patroni_state(self, pod): | ||||||
|         r = self.exec_with_kubectl(pod, "patronictl list -f json") |         r = self.exec_with_kubectl(pod, "patronictl list -f json") | ||||||
|         if not r.returncode == 0 or not r.stdout.decode()[0:1] == "[": |         if not r.returncode == 0 or not r.stdout.decode()[0:1] == "[": | ||||||
|  |  | ||||||
|  | @ -324,65 +324,6 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|         self.eventuallyEqual(lambda: self.k8s.count_secrets_with_label("cluster-name=acid-minimal-cluster,application=spilo", self.test_namespace), |         self.eventuallyEqual(lambda: self.k8s.count_secrets_with_label("cluster-name=acid-minimal-cluster,application=spilo", self.test_namespace), | ||||||
|                              1, "Secret not created for user in namespace") |                              1, "Secret not created for user in namespace") | ||||||
| 
 | 
 | ||||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) |  | ||||||
|     def test_decrease_max_connections(self): |  | ||||||
|         ''' |  | ||||||
|             Test decreasing max_connections and restarting cluster through rest api |  | ||||||
|         ''' |  | ||||||
|         k8s = self.k8s |  | ||||||
|         cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' |  | ||||||
|         labels = 'spilo-role=master,' + cluster_label |  | ||||||
|         new_max_connections_value = "99" |  | ||||||
|         pods = k8s.api.core_v1.list_namespaced_pod( |  | ||||||
|             'default', label_selector=labels).items |  | ||||||
|         self.assert_master_is_unique() |  | ||||||
|         masterPod = pods[0] |  | ||||||
|         creationTimestamp = masterPod.metadata.creation_timestamp |  | ||||||
| 
 |  | ||||||
|         # adjust max_connection |  | ||||||
|         pg_patch_max_connections = { |  | ||||||
|             "spec": { |  | ||||||
|                 "postgresql": { |  | ||||||
|                     "parameters": { |  | ||||||
|                         "max_connections": new_max_connections_value |  | ||||||
|                      } |  | ||||||
|                  } |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         try: |  | ||||||
|             k8s.api.custom_objects_api.patch_namespaced_custom_object( |  | ||||||
|                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections) |  | ||||||
| 
 |  | ||||||
|             def get_max_connections(): |  | ||||||
|                 pods = k8s.api.core_v1.list_namespaced_pod( |  | ||||||
|                     'default', label_selector=labels).items |  | ||||||
|                 self.assert_master_is_unique() |  | ||||||
|                 masterPod = pods[0] |  | ||||||
|                 get_max_connections_cmd = '''psql -At -U postgres -c "SELECT setting FROM pg_settings WHERE name = 'max_connections';"''' |  | ||||||
|                 result = k8s.exec_with_kubectl(masterPod.metadata.name, get_max_connections_cmd) |  | ||||||
|                 max_connections_value = int(result.stdout) |  | ||||||
|                 return max_connections_value |  | ||||||
| 
 |  | ||||||
|             #Make sure that max_connections decreased |  | ||||||
|             self.eventuallyEqual(get_max_connections, int(new_max_connections_value), "max_connections didn't decrease") |  | ||||||
|             pods = k8s.api.core_v1.list_namespaced_pod( |  | ||||||
|                 'default', label_selector=labels).items |  | ||||||
|             self.assert_master_is_unique() |  | ||||||
|             masterPod = pods[0] |  | ||||||
|             #Make sure that pod didn't restart |  | ||||||
|             self.assertEqual(creationTimestamp, masterPod.metadata.creation_timestamp, |  | ||||||
|                             "Master pod creation timestamp is updated") |  | ||||||
| 
 |  | ||||||
|         except timeout_decorator.TimeoutError: |  | ||||||
|             print('Operator log: {}'.format(k8s.get_operator_log())) |  | ||||||
|             raise |  | ||||||
| 
 |  | ||||||
|         # make sure cluster is in a good state for further tests |  | ||||||
|         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") |  | ||||||
|         self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, |  | ||||||
|                              "No 2 pods running") |  | ||||||
| 
 |  | ||||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) |     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||||
|     def test_enable_disable_connection_pooler(self): |     def test_enable_disable_connection_pooler(self): | ||||||
|         ''' |         ''' | ||||||
|  | @ -1114,6 +1055,88 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|         self.eventuallyEqual(lambda: self.k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), |         self.eventuallyEqual(lambda: self.k8s.count_running_pods("connection-pooler=acid-minimal-cluster-pooler"), | ||||||
|                              0, "Pooler pods not scaled down") |                              0, "Pooler pods not scaled down") | ||||||
| 
 | 
 | ||||||
|  |     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||||
|  |     def test_patroni_config_update(self): | ||||||
|  |         ''' | ||||||
|  |             Change Postgres config under Spec.Postgresql.Parameters and Spec.Patroni | ||||||
|  |             and query Patroni config endpoint to check if manifest changes got applied | ||||||
|  |             via restarting cluster through Patroni's rest api | ||||||
|  |         ''' | ||||||
|  |         k8s = self.k8s | ||||||
|  |         masterPod = k8s.get_cluster_leader_pod() | ||||||
|  |         labels = 'application=spilo,cluster-name=acid-minimal-cluster,spilo-role=master' | ||||||
|  |         creationTimestamp = masterPod.metadata.creation_timestamp | ||||||
|  |         new_max_connections_value = "50" | ||||||
|  | 
 | ||||||
|  |         # adjust max_connection | ||||||
|  |         pg_patch_config = { | ||||||
|  |             "spec": { | ||||||
|  |                 "postgresql": { | ||||||
|  |                     "parameters": { | ||||||
|  |                         "max_connections": new_max_connections_value | ||||||
|  |                      } | ||||||
|  |                  }, | ||||||
|  |                  "patroni": { | ||||||
|  |                     "slots": { | ||||||
|  |                         "test_slot": { | ||||||
|  |                             "type": "physical" | ||||||
|  |                         } | ||||||
|  |                     }, | ||||||
|  |                     "ttl": 29, | ||||||
|  |                     "loop_wait": 9, | ||||||
|  |                     "retry_timeout": 9, | ||||||
|  |                     "synchronous_mode": True | ||||||
|  |                  } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         try: | ||||||
|  |             k8s.api.custom_objects_api.patch_namespaced_custom_object( | ||||||
|  |                 "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_config) | ||||||
|  |              | ||||||
|  |             self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
|  | 
 | ||||||
|  |             def compare_config(): | ||||||
|  |                 effective_config = k8s.patroni_rest(masterPod.metadata.name, "config") | ||||||
|  |                 desired_patroni = pg_patch_config["spec"]["patroni"] | ||||||
|  |                 desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"] | ||||||
|  |                 effective_parameters = effective_config["postgresql"]["parameters"] | ||||||
|  |                 self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"], | ||||||
|  |                             "max_connections not updated") | ||||||
|  |                 self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added") | ||||||
|  |                 self.assertEqual(desired_patroni["ttl"], effective_config["ttl"], | ||||||
|  |                             "ttl not updated") | ||||||
|  |                 self.assertEqual(desired_patroni["loop_wait"], effective_config["loop_wait"], | ||||||
|  |                             "loop_wait not updated") | ||||||
|  |                 self.assertEqual(desired_patroni["retry_timeout"], effective_config["retry_timeout"], | ||||||
|  |                             "retry_timeout not updated") | ||||||
|  |                 self.assertEqual(desired_patroni["synchronous_mode"], effective_config["synchronous_mode"], | ||||||
|  |                             "synchronous_mode not updated") | ||||||
|  |                 return True | ||||||
|  | 
 | ||||||
|  |             self.eventuallyTrue(compare_config, "Postgres config not applied") | ||||||
|  | 
 | ||||||
|  |             setting_query = """ | ||||||
|  |                SELECT setting | ||||||
|  |                  FROM pg_settings | ||||||
|  |                 WHERE name = 'max_connections'; | ||||||
|  |             """ | ||||||
|  |             self.eventuallyEqual(lambda: self.query_database(masterPod.metadata.name, "postgres", setting_query)[0], new_max_connections_value, | ||||||
|  |                 "New max_connections setting not applied", 10, 5) | ||||||
|  | 
 | ||||||
|  |             # make sure that pod wasn't recreated | ||||||
|  |             self.assertEqual(creationTimestamp, masterPod.metadata.creation_timestamp, | ||||||
|  |                             "Master pod creation timestamp is updated") | ||||||
|  | 
 | ||||||
|  |         except timeout_decorator.TimeoutError: | ||||||
|  |             print('Operator log: {}'.format(k8s.get_operator_log())) | ||||||
|  |             raise | ||||||
|  | 
 | ||||||
|  |         # make sure cluster is in a good state for further tests | ||||||
|  |         self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") | ||||||
|  |         self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, | ||||||
|  |                              "No 2 pods running") | ||||||
|  | 
 | ||||||
|     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) |     @timeout_decorator.timeout(TEST_TIMEOUT_SEC) | ||||||
|     def test_rolling_update_flag(self): |     def test_rolling_update_flag(self): | ||||||
|         ''' |         ''' | ||||||
|  |  | ||||||
|  | @ -81,7 +81,7 @@ func (ps *PostgresStatus) UnmarshalJSON(data []byte) error { | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		metaErr := json.Unmarshal(data, &status) | 		metaErr := json.Unmarshal(data, &status) | ||||||
| 		if metaErr != nil { | 		if metaErr != nil { | ||||||
| 			return fmt.Errorf("Could not parse status: %v; err %v", string(data), metaErr) | 			return fmt.Errorf("could not parse status: %v; err %v", string(data), metaErr) | ||||||
| 		} | 		} | ||||||
| 		tmp.PostgresClusterStatus = status | 		tmp.PostgresClusterStatus = status | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | @ -2,7 +2,9 @@ package cluster | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"reflect" | ||||||
| 	"regexp" | 	"regexp" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
|  | @ -261,14 +263,18 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncStatefulSet() error { | func (c *Cluster) syncStatefulSet() error { | ||||||
| 	var instancesRestartRequired bool | 	var ( | ||||||
|  | 		masterPod               *v1.Pod | ||||||
|  | 		postgresConfig          map[string]interface{} | ||||||
|  | 		instanceRestartRequired bool | ||||||
|  | 	) | ||||||
| 
 | 
 | ||||||
| 	podsToRecreate := make([]v1.Pod, 0) | 	podsToRecreate := make([]v1.Pod, 0) | ||||||
| 	switchoverCandidates := make([]spec.NamespacedName, 0) | 	switchoverCandidates := make([]spec.NamespacedName, 0) | ||||||
| 
 | 
 | ||||||
| 	pods, err := c.listPods() | 	pods, err := c.listPods() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		c.logger.Infof("could not list pods of the statefulset: %v", err) | 		c.logger.Warnf("could not list pods of the statefulset: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early.
 | 	// NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early.
 | ||||||
|  | @ -381,20 +387,50 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 	// Apply special PostgreSQL parameters that can only be set via the Patroni API.
 | 	// Apply special PostgreSQL parameters that can only be set via the Patroni API.
 | ||||||
| 	// it is important to do it after the statefulset pods are there, but before the rolling update
 | 	// it is important to do it after the statefulset pods are there, but before the rolling update
 | ||||||
| 	// since those parameters require PostgreSQL restart.
 | 	// since those parameters require PostgreSQL restart.
 | ||||||
| 	instancesRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration() | 	pods, err = c.listPods() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("could not set cluster-wide PostgreSQL configuration options: %v", err) | 		c.logger.Warnf("could not get list of pods to apply special PostgreSQL parameters only to be set via Patroni API: %v", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if instancesRestartRequired { | 	// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
 | ||||||
| 		c.logger.Debugln("restarting Postgres server within pods") | 	// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
 | ||||||
| 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "restarting Postgres server within pods") | 	for i, pod := range pods { | ||||||
| 		if err := c.restartInstances(); err != nil { | 		podName := util.NameFromMeta(pods[i].ObjectMeta) | ||||||
| 			c.logger.Warningf("could not restart Postgres server within pods: %v", err) | 		config, err := c.patroni.GetConfig(&pod) | ||||||
|  | 		if err != nil { | ||||||
|  | 			c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) | ||||||
|  | 			continue | ||||||
| 		} | 		} | ||||||
| 		c.logger.Infof("Postgres server successfuly restarted on all pods") | 		instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, config) | ||||||
| 		c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Postgres server restart done - all instances have been restarted") | 		if err != nil { | ||||||
|  | 			c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		break | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	// if the config update requires a restart, call Patroni restart for replicas first, then master
 | ||||||
|  | 	if instanceRestartRequired { | ||||||
|  | 		c.logger.Debug("restarting Postgres server within pods") | ||||||
|  | 		ttl, ok := postgresConfig["ttl"].(int32) | ||||||
|  | 		if !ok { | ||||||
|  | 			ttl = 30 | ||||||
|  | 		} | ||||||
|  | 		for i, pod := range pods { | ||||||
|  | 			role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | ||||||
|  | 			if role == Master { | ||||||
|  | 				masterPod = &pods[i] | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			c.restartInstance(&pod) | ||||||
|  | 			time.Sleep(time.Duration(ttl) * time.Second) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if masterPod != nil { | ||||||
|  | 			c.restartInstance(masterPod) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	// if we get here we also need to re-create the pods (either leftovers from the old
 | 	// if we get here we also need to re-create the pods (either leftovers from the old
 | ||||||
| 	// statefulset or those that got their configuration from the outdated statefulset)
 | 	// statefulset or those that got their configuration from the outdated statefulset)
 | ||||||
| 	if len(podsToRecreate) > 0 { | 	if len(podsToRecreate) > 0 { | ||||||
|  | @ -408,55 +444,19 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) restartInstances() error { | func (c *Cluster) restartInstance(pod *v1.Pod) { | ||||||
| 	c.setProcessName("starting to restart Postgres servers") | 	podName := util.NameFromMeta(pod.ObjectMeta) | ||||||
| 	ls := c.labelsSet(false) | 	role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | ||||||
| 	namespace := c.Namespace |  | ||||||
| 
 | 
 | ||||||
| 	listOptions := metav1.ListOptions{ | 	c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) | ||||||
| 		LabelSelector: ls.String(), | 
 | ||||||
|  | 	if err := c.patroni.Restart(pod); err != nil { | ||||||
|  | 		c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) | ||||||
|  | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions) | 	c.logger.Debugf("Postgres server successfuly restarted in %s pod %s", role, podName) | ||||||
| 	if err != nil { | 	c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) | ||||||
| 		return fmt.Errorf("could not get the list of pods: %v", err) |  | ||||||
| 	} |  | ||||||
| 	c.logger.Infof("there are %d pods in the cluster which resquire Postgres server restart", len(pods.Items)) |  | ||||||
| 
 |  | ||||||
| 	var ( |  | ||||||
| 		masterPod *v1.Pod |  | ||||||
| 	) |  | ||||||
| 	for i, pod := range pods.Items { |  | ||||||
| 		role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) |  | ||||||
| 
 |  | ||||||
| 		if role == Master { |  | ||||||
| 			masterPod = &pods.Items[i] |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		podName := util.NameFromMeta(pods.Items[i].ObjectMeta) |  | ||||||
| 		config, err := c.patroni.GetConfig(&pod) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return fmt.Errorf("could not get config for pod %s: %v", podName, err) |  | ||||||
| 		} |  | ||||||
| 		ttl, ok := config["ttl"].(int32) |  | ||||||
| 		if !ok { |  | ||||||
| 			ttl = 30 |  | ||||||
| 		} |  | ||||||
| 		if err = c.patroni.Restart(&pod); err != nil { |  | ||||||
| 			return fmt.Errorf("could not restart Postgres server on pod %s: %v", podName, err) |  | ||||||
| 		} |  | ||||||
| 		time.Sleep(time.Duration(ttl) * time.Second) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if masterPod != nil { |  | ||||||
| 		podName := util.NameFromMeta(masterPod.ObjectMeta) |  | ||||||
| 		if err = c.patroni.Restart(masterPod); err != nil { |  | ||||||
| 			return fmt.Errorf("could not restart postgres server on masterPod %s: %v", podName, err) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return nil |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // AnnotationsToPropagate get the annotations to update if required
 | // AnnotationsToPropagate get the annotations to update if required
 | ||||||
|  | @ -492,48 +492,77 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
 | // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
 | ||||||
| // (like max_connections) has changed and if necessary sets it via the Patroni API
 | // (like max_connections) have changed and if necessary sets it via the Patroni API
 | ||||||
| func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() (bool, error) { | func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig map[string]interface{}) (bool, error) { | ||||||
| 	var ( | 	configToSet := make(map[string]interface{}) | ||||||
| 		err             error | 	parametersToSet := make(map[string]string) | ||||||
| 		pods            []v1.Pod | 	effectivePgParameters := make(map[string]interface{}) | ||||||
| 		restartRequired bool |  | ||||||
| 	) |  | ||||||
| 
 | 
 | ||||||
| 	// we need to extract those options from the cluster manifest.
 | 	// read effective Patroni config if set
 | ||||||
| 	optionsToSet := make(map[string]string) | 	if patroniConfig != nil { | ||||||
| 	pgOptions := c.Spec.Parameters | 		effectivePostgresql := patroniConfig["postgresql"].(map[string]interface{}) | ||||||
|  | 		effectivePgParameters = effectivePostgresql[patroniPGParametersParameterName].(map[string]interface{}) | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 	for k, v := range pgOptions { | 	// compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest
 | ||||||
| 		if isBootstrapOnlyParameter(k) { | 	desiredPgParameters := c.Spec.Parameters | ||||||
| 			optionsToSet[k] = v | 	for desiredOption, desiredValue := range desiredPgParameters { | ||||||
|  | 		effectiveValue := effectivePgParameters[desiredOption] | ||||||
|  | 		if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) { | ||||||
|  | 			parametersToSet[desiredOption] = desiredValue | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if len(optionsToSet) == 0 { | 	if len(parametersToSet) > 0 { | ||||||
| 		return restartRequired, nil | 		configToSet["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: parametersToSet} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if pods, err = c.listPods(); err != nil { | 	// compare other options from config with c.Spec.Patroni from manifest
 | ||||||
| 		return restartRequired, err | 	desiredPatroniConfig := c.Spec.Patroni | ||||||
|  | 	if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != uint32(patroniConfig["loop_wait"].(float64)) { | ||||||
|  | 		configToSet["loop_wait"] = desiredPatroniConfig.LoopWait | ||||||
| 	} | 	} | ||||||
| 	if len(pods) == 0 { | 	if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != float32(patroniConfig["maximum_lag_on_failover"].(float64)) { | ||||||
| 		return restartRequired, fmt.Errorf("could not call Patroni API: cluster has no pods") | 		configToSet["maximum_lag_on_failover"] = desiredPatroniConfig.MaximumLagOnFailover | ||||||
| 	} | 	} | ||||||
|  | 	if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, (patroniConfig["pg_hba"])) { | ||||||
|  | 		configToSet["pg_hba"] = desiredPatroniConfig.PgHba | ||||||
|  | 	} | ||||||
|  | 	if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != uint32(patroniConfig["retry_timeout"].(float64)) { | ||||||
|  | 		configToSet["retry_timeout"] = desiredPatroniConfig.RetryTimeout | ||||||
|  | 	} | ||||||
|  | 	if desiredPatroniConfig.Slots != nil && !reflect.DeepEqual(desiredPatroniConfig.Slots, patroniConfig["slots"]) { | ||||||
|  | 		configToSet["slots"] = desiredPatroniConfig.Slots | ||||||
|  | 	} | ||||||
|  | 	if desiredPatroniConfig.SynchronousMode != patroniConfig["synchronous_mode"] { | ||||||
|  | 		configToSet["synchronous_mode"] = desiredPatroniConfig.SynchronousMode | ||||||
|  | 	} | ||||||
|  | 	if desiredPatroniConfig.SynchronousModeStrict != patroniConfig["synchronous_mode_strict"] { | ||||||
|  | 		configToSet["synchronous_mode_strict"] = desiredPatroniConfig.SynchronousModeStrict | ||||||
|  | 	} | ||||||
|  | 	if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != uint32(patroniConfig["ttl"].(float64)) { | ||||||
|  | 		configToSet["ttl"] = desiredPatroniConfig.TTL | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if len(configToSet) == 0 { | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	configToSetJson, err := json.Marshal(configToSet) | ||||||
|  | 	if err != nil { | ||||||
|  | 		c.logger.Debugf("could not convert config patch to JSON: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	// try all pods until the first one that is successful, as it doesn't matter which pod
 | 	// try all pods until the first one that is successful, as it doesn't matter which pod
 | ||||||
| 	// carries the request to change configuration through
 | 	// carries the request to change configuration through
 | ||||||
| 	for _, pod := range pods { | 	podName := util.NameFromMeta(pod.ObjectMeta) | ||||||
| 		podName := util.NameFromMeta(pod.ObjectMeta) | 	c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", | ||||||
| 		c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v", | 		podName, configToSetJson) | ||||||
| 			podName, optionsToSet) | 	if err = c.patroni.SetConfig(pod, configToSet); err != nil { | ||||||
| 		if err = c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil { | 		return true, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err) | ||||||
| 			restartRequired = true |  | ||||||
| 			return restartRequired, nil |  | ||||||
| 		} |  | ||||||
| 		c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err) |  | ||||||
| 	} | 	} | ||||||
| 	return restartRequired, fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)", | 
 | ||||||
| 		len(pods)) | 	return true, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncSecrets() error { | func (c *Cluster) syncSecrets() error { | ||||||
|  |  | ||||||
|  | @ -32,6 +32,7 @@ type Interface interface { | ||||||
| 	GetMemberData(server *v1.Pod) (MemberData, error) | 	GetMemberData(server *v1.Pod) (MemberData, error) | ||||||
| 	Restart(server *v1.Pod) error | 	Restart(server *v1.Pod) error | ||||||
| 	GetConfig(server *v1.Pod) (map[string]interface{}, error) | 	GetConfig(server *v1.Pod) (map[string]interface{}, error) | ||||||
|  | 	SetConfig(server *v1.Pod, config map[string]interface{}) error | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Patroni API client
 | // Patroni API client
 | ||||||
|  | @ -163,6 +164,20 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st | ||||||
| 	return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) | 	return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | //SetConfig sets Patroni options via Patroni patch API call.
 | ||||||
|  | func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error { | ||||||
|  | 	buf := &bytes.Buffer{} | ||||||
|  | 	err := json.NewEncoder(buf).Encode(config) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("could not encode json: %v", err) | ||||||
|  | 	} | ||||||
|  | 	apiURLString, err := apiURL(server) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // MemberDataPatroni child element
 | // MemberDataPatroni child element
 | ||||||
| type MemberDataPatroni struct { | type MemberDataPatroni struct { | ||||||
| 	Version string `json:"version"` | 	Version string `json:"version"` | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue