diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 2300ebfe3..1bf723f12 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -926,6 +926,33 @@ class EndToEndTestCase(unittest.TestCase): self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing") + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + @unittest.skip("Skipping this test until fixed") + def test_zaa_test_major_version_upgrade(self): + k8s = self.k8s + result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest-12.yaml") + self.eventuallyEqual(lambda: k8s.count_running_pods(labels="application=spilo,cluster-name=acid-upgrade-test"), 2, "No 2 pods running") + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + pg_patch_version = { + "spec": { + "postgres": { + "version": "13" + } + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + "acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version) + + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + def check_version_13(): + p = k8s.get_patroni_state("acid-upgrade-test-0") + version = p["server_version"][0:2] + return version + + self.evantuallyEqual(check_version_13, "13", "Version was not upgrade to 13") + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) @unittest.skip("Skipping this test until fixed") def test_zzz_taint_based_eviction(self): diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index c35344b4d..8bb9b715b 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -74,6 +74,7 @@ data: # logical_backup_s3_secret_access_key: "" logical_backup_s3_sse: "AES256" logical_backup_schedule: "30 00 * * *" + major_version_upgrade_mode: "manual" master_dns_name_format: "{cluster}.{team}.{hostedzone}" # master_pod_move_timeout: 20m # max_instances: "-1" diff --git a/manifests/minimal-postgres-manifest-12.yaml b/manifests/minimal-postgres-manifest-12.yaml new file mode 100644 index 000000000..3f89b765d --- /dev/null +++ b/manifests/minimal-postgres-manifest-12.yaml @@ -0,0 +1,21 @@ +apiVersion: "acid.zalan.do/v1" +kind: postgresql +metadata: + name: acid-upgrade-test + namespace: default +spec: + teamId: "acid" + volume: + size: 1Gi + numberOfInstances: 2 + users: + zalando: # database owner + - superuser + - createdb + foo_user: [] # role for application foo + databases: + foo: zalando # dbname: owner + preparedDatabases: + bar: {} + postgresql: + version: "12" diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1055b795d..c3e3ec905 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -83,15 +83,16 @@ type Cluster struct { deleteOptions metav1.DeleteOptions podEventsQueue *cache.FIFO - teamsAPIClient teams.Interface - oauthTokenGetter OAuthTokenGetter - KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? - currentProcess Process - processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex - specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex - ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects - EBSVolumes map[string]volumes.VolumeProperties - VolumeResizer volumes.VolumeResizer + teamsAPIClient teams.Interface + oauthTokenGetter OAuthTokenGetter + KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? + currentProcess Process + processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex + specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex + ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects + EBSVolumes map[string]volumes.VolumeProperties + VolumeResizer volumes.VolumeResizer + currentMajorVersion int } type compareStatefulsetResult struct { @@ -128,15 +129,16 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service), Endpoints: make(map[PostgresRole]*v1.Endpoints)}, - userSyncStrategy: users.DefaultUserSyncStrategy{PasswordEncryption: passwordEncryption}, - deleteOptions: metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}, - podEventsQueue: podEventsQueue, - KubeClient: kubeClient, + userSyncStrategy: users.DefaultUserSyncStrategy{PasswordEncryption: passwordEncryption}, + deleteOptions: metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy}, + podEventsQueue: podEventsQueue, + KubeClient: kubeClient, + currentMajorVersion: 0, } cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName()) cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName) - cluster.patroni = patroni.New(cluster.logger) + cluster.patroni = patroni.New(cluster.logger, nil) cluster.eventRecorder = eventRecorder cluster.EBSVolumes = make(map[string]volumes.VolumeProperties) @@ -359,7 +361,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa } if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) { match = false - reasons = append(reasons, "new statefulset's annotations does not match the current one") + reasons = append(reasons, "new statefulset's annotations do not match the current one") } needsRollUpdate, reasons = c.compareContainers("initContainers", c.Statefulset.Spec.Template.Spec.InitContainers, statefulSet.Spec.Template.Spec.InitContainers, needsRollUpdate, reasons) @@ -614,17 +616,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { logNiceDiff(c.logger, oldSpec, newSpec) - if oldSpec.Spec.PostgresqlParam.PgVersion > newSpec.Spec.PostgresqlParam.PgVersion { - c.logger.Warningf("postgresql version change(%q -> %q) has no effect", - oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect", - oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) - // we need that hack to generate statefulset with the old version - newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion - } else if oldSpec.Spec.PostgresqlParam.PgVersion < newSpec.Spec.PostgresqlParam.PgVersion { - c.logger.Infof("postgresql version increased (%q -> %q), major version upgrade can be done manually after StatefulSet Sync", + if IsBiggerPostgresVersion(oldSpec.Spec.PostgresqlParam.PgVersion, c.GetDesiredMajorVersion()) { + c.logger.Infof("postgresql version increased (%s -> %s), depending on config manual upgrade needed", oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) syncStatetfulSet = true + } else { + c.logger.Infof("postgresql major version unchanged or smaller, no changes needed") + // sticking with old version, this will also advance GetDesiredVersion next time. + newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion } // Service @@ -781,6 +780,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true } + if !updateFailed { + // Major version upgrade must only fire after success of earlier operations and should stay last + if err := c.majorVersionUpgrade(); err != nil { + c.logger.Errorf("major version upgrade failed: %v", err) + updateFailed = true + } + } + return nil } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 002ea0149..e3ee322d5 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -734,7 +734,7 @@ func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration stri }, } if c.OpConfig.EnablePgVersionEnvVar { - envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.Spec.PgVersion}) + envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.GetDesiredMajorVersion()}) } // Spilo expects cluster labels as JSON if clusterLabels, err := json.Marshal(labels.Set(c.OpConfig.ClusterLabels)); err != nil { diff --git a/pkg/cluster/majorversionupgrade.go b/pkg/cluster/majorversionupgrade.go new file mode 100644 index 000000000..351bd26a4 --- /dev/null +++ b/pkg/cluster/majorversionupgrade.go @@ -0,0 +1,97 @@ +package cluster + +import ( + "fmt" + + "github.com/zalando/postgres-operator/pkg/spec" + v1 "k8s.io/api/core/v1" +) + +// VersionMap Map of version numbers +var VersionMap = map[string]int{ + "9.5": 90500, + "9.6": 90600, + "10": 100000, + "11": 110000, + "12": 120000, + "13": 130000, +} + +// IsBiggerPostgresVersion Compare two Postgres version numbers +func IsBiggerPostgresVersion(old string, new string) bool { + oldN, _ := VersionMap[old] + newN, _ := VersionMap[new] + return newN > oldN +} + +// GetDesiredMajorVersionAsInt Convert string to comparable integer of PG version +func (c *Cluster) GetDesiredMajorVersionAsInt() int { + return VersionMap[c.GetDesiredMajorVersion()] +} + +// GetDesiredMajorVersion returns major version to use, incl. potential auto upgrade +func (c *Cluster) GetDesiredMajorVersion() string { + + if c.Config.OpConfig.MajorVersionUpgradeMode == "full" { + // current is 9.5, minimal is 11 allowing 11 to 13 clusters, everything below is upgraded + if IsBiggerPostgresVersion(c.Spec.PgVersion, c.Config.OpConfig.MinimalMajorVersion) { + c.logger.Infof("overwriting configured major version %s to %s", c.Spec.PgVersion, c.Config.OpConfig.TargetMajorVersion) + return c.Config.OpConfig.TargetMajorVersion + } + } + + return c.Spec.PgVersion +} + +func (c *Cluster) majorVersionUpgrade() error { + + if c.OpConfig.MajorVersionUpgradeMode == "off" { + return nil + } + + desiredVersion := c.GetDesiredMajorVersionAsInt() + + if c.currentMajorVersion >= desiredVersion { + c.logger.Infof("cluster version up to date. current: %d desired: %d", c.currentMajorVersion, desiredVersion) + return nil + } + + pods, err := c.listPods() + if err != nil { + return err + } + + allRunning := true + + var masterPod *v1.Pod + + for _, pod := range pods { + ps, _ := c.patroni.GetMemberData(&pod) + + if ps.State != "running" { + allRunning = false + c.logger.Infof("identified non running pod, potentially skipping major version upgrade") + } + + if ps.Role == "master" { + masterPod = &pod + c.currentMajorVersion = ps.ServerVersion + } + } + + numberOfPods := len(pods) + if allRunning && masterPod != nil { + c.logger.Infof("healthy cluster ready to upgrade, current: %d desired: %d", c.currentMajorVersion, desiredVersion) + if c.currentMajorVersion < desiredVersion { + podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name} + c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods) + upgradeCommand := fmt.Sprintf("/usr/bin/python3 /scripts/inplace_upgrade.py %d 2>&1 | tee last_upgrade.log", numberOfPods) + _, err := c.ExecCommand(podName, "/bin/su", "postgres", "-c", upgradeCommand) + if err != nil { + return err + } + } + } + + return nil +} diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index cf43de9a7..94e79d186 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -12,6 +12,7 @@ import ( "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" + "github.com/zalando/postgres-operator/pkg/util/patroni" "github.com/zalando/postgres-operator/pkg/util/retryutil" ) @@ -312,14 +313,14 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { for _, pod := range pods.Items { - var state string + var data patroni.MemberData err := retryutil.Retry(1*time.Second, 5*time.Second, func() (bool, error) { var err error - state, err = c.patroni.GetPatroniMemberState(&pod) + data, err = c.patroni.GetMemberData(&pod) if err != nil { return false, err @@ -331,7 +332,7 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { if err != nil { c.logger.Errorf("failed to get Patroni state for pod: %s", err) return false - } else if state == "creating replica" { + } else if data.State == "creating replica" { c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name) return false } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 0144857b9..d104fd6e0 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -118,6 +118,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return fmt.Errorf("could not sync connection pooler: %v", err) } + // Major version upgrade must only run after success of all earlier operations, must remain last item in sync + if err := c.majorVersionUpgrade(); err != nil { + c.logger.Errorf("major version upgrade failed: %v", err) + } + return err } @@ -471,7 +476,7 @@ func (c *Cluster) syncSecrets() error { for secretUsername, secretSpec := range secrets { if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Create(context.TODO(), secretSpec, metav1.CreateOptions{}); err == nil { c.Secrets[secret.UID] = secret - c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) + c.logger.Debugf("created new secret %s, uid: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID) continue } if k8sutil.ResourceAlreadyExists(err) { @@ -480,7 +485,7 @@ func (c *Cluster) syncSecrets() error { return fmt.Errorf("could not get current secret: %v", err) } if secretUsername != string(secret.Data["username"]) { - c.logger.Errorf("secret %s does not contain the role %q", secretSpec.Name, secretUsername) + c.logger.Errorf("secret %s does not contain the role %s", secretSpec.Name, secretUsername) continue } c.Secrets[secret.UID] = secret @@ -499,7 +504,7 @@ func (c *Cluster) syncSecrets() error { if pwdUser.Password != string(secret.Data["password"]) && pwdUser.Origin == spec.RoleOriginInfrastructure { - c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name) + c.logger.Debugf("updating the secret %s from the infrastructure roles", secretSpec.Name) if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(context.TODO(), secretSpec, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err) } @@ -509,7 +514,7 @@ func (c *Cluster) syncSecrets() error { userMap[secretUsername] = pwdUser } } else { - return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) + return fmt.Errorf("could not create secret for user %s: %v", secretUsername, err) } } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 0fe0c1120..b3c4d8414 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -199,10 +199,10 @@ func (c *Controller) processEvent(event ClusterEvent) { if event.EventType == EventRepair { runRepair, lastOperationStatus := cl.NeedsRepair() if !runRepair { - lg.Debugf("Observed cluster status %s, repair is not required", lastOperationStatus) + lg.Debugf("observed cluster status %s, repair is not required", lastOperationStatus) return } - lg.Debugf("Observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus) + lg.Debugf("observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus) event.EventType = EventSync } @@ -217,7 +217,7 @@ func (c *Controller) processEvent(event ClusterEvent) { } if err := c.submitRBACCredentials(event); err != nil { - c.logger.Warnf("Pods and/or Patroni may misfunction due to the lack of permissions: %v", err) + c.logger.Warnf("pods and/or Patroni may misfunction due to the lack of permissions: %v", err) } } @@ -225,7 +225,7 @@ func (c *Controller) processEvent(event ClusterEvent) { switch event.EventType { case EventAdd: if clusterFound { - lg.Infof("Recieved add event for already existing Postgres cluster") + lg.Infof("recieved add event for already existing Postgres cluster") return } @@ -348,11 +348,11 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) { deprecate := func(deprecated, replacement string) { - c.logger.Warningf("Parameter %q is deprecated. Consider setting %q instead", deprecated, replacement) + c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement) } noeffect := func(param string, explanation string) { - c.logger.Warningf("Parameter %q takes no effect. %s", param, explanation) + c.logger.Warningf("parameter %q takes no effect. %s", param, explanation) } if spec.UseLoadBalancer != nil { @@ -368,7 +368,7 @@ func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.Postg if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) && (spec.EnableReplicaLoadBalancer != nil || spec.EnableMasterLoadBalancer != nil) { - c.logger.Warnf("Both old and new load balancer parameters are present in the manifest, ignoring old ones") + c.logger.Warnf("both old and new load balancer parameters are present in the manifest, ignoring old ones") } } diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go index edc05d67e..d8e4c3782 100644 --- a/pkg/controller/util_test.go +++ b/pkg/controller/util_test.go @@ -480,3 +480,45 @@ func TestInfrastructureRoleDefinitions(t *testing.T) { } } } + +type SubConfig struct { + teammap map[string]string +} + +type SuperConfig struct { + sub SubConfig +} + +func TestUnderstandingMapsAndReferences(t *testing.T) { + teams := map[string]string{"acid": "Felix"} + + sc := SubConfig{ + teammap: teams, + } + + ssc := SuperConfig{ + sub: sc, + } + + teams["24x7"] = "alex" + + if len(ssc.sub.teammap) != 2 { + t.Errorf("Team Map does not contain 2 elements") + } + + ssc.sub.teammap["teapot"] = "Mikkel" + + if len(teams) != 3 { + t.Errorf("Team Map does not contain 3 elements") + } + + teams = make(map[string]string) + + if len(ssc.sub.teammap) != 3 { + t.Errorf("Team Map does not contain 0 elements") + } + + if &teams == &(ssc.sub.teammap) { + t.Errorf("Identical maps") + } +} diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 93fceff01..2969441c6 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -206,6 +206,10 @@ type Config struct { EnableLazySpiloUpgrade bool `name:"enable_lazy_spilo_upgrade" default:"false"` EnablePgVersionEnvVar bool `name:"enable_pgversion_env_var" default:"true"` EnableSpiloWalPathCompat bool `name:"enable_spilo_wal_path_compat" default:"false"` + MajorVersionUpgradeMode string `name:"major_version_upgrade_mode" default:"off"` // off - no actions, manual - manifest triggers action, full - manifest and minimal version violation trigger upgrade + MinimalMajorVersion string `name:"minimal_major_version" default:"9.5"` + TargetMajorVersion string `name:"target_major_version" default:"13"` + AllowedMajorUpgradeVersions []string `name:"allowed_major_upgrade_versions" default:"12,13"` } // MustMarshal marshals the config or panics diff --git a/pkg/util/httpclient/httpclient.go b/pkg/util/httpclient/httpclient.go new file mode 100644 index 000000000..e7022f1a3 --- /dev/null +++ b/pkg/util/httpclient/httpclient.go @@ -0,0 +1,11 @@ +package httpclient + +//go:generate mockgen -package mocks -destination=$PWD/mocks/$GOFILE -source=$GOFILE -build_flags=-mod=vendor + +import "net/http" + +// HTTPClient interface +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) + Get(url string) (resp *http.Response, err error) +} diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index 53065e599..b63912e55 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -3,7 +3,6 @@ package patroni import ( "bytes" "encoding/json" - "errors" "fmt" "io/ioutil" "net" @@ -11,6 +10,8 @@ import ( "strconv" "time" + httpclient "github.com/zalando/postgres-operator/pkg/util/httpclient" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" ) @@ -26,24 +27,28 @@ const ( type Interface interface { Switchover(master *v1.Pod, candidate string) error SetPostgresParameters(server *v1.Pod, options map[string]string) error - GetPatroniMemberState(pod *v1.Pod) (string, error) + GetMemberData(server *v1.Pod) (MemberData, error) } // Patroni API client type Patroni struct { - httpClient *http.Client + httpClient httpclient.HTTPClient logger *logrus.Entry } // New create patroni -func New(logger *logrus.Entry) *Patroni { - cl := http.Client{ - Timeout: timeout, +func New(logger *logrus.Entry, client httpclient.HTTPClient) *Patroni { + if client == nil { + + client = &http.Client{ + Timeout: timeout, + } + } return &Patroni{ logger: logger, - httpClient: &cl, + httpClient: client, } } @@ -68,7 +73,9 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) return fmt.Errorf("could not create request: %v", err) } - p.logger.Debugf("making %s http request: %s", method, request.URL.String()) + if p.logger != nil { + p.logger.Debugf("making %s http request: %s", method, request.URL.String()) + } resp, err := p.httpClient.Do(request) if err != nil { @@ -126,35 +133,45 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) } -//GetPatroniMemberState returns a state of member of a Patroni cluster -func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) { +// MemberDataPatroni child element +type MemberDataPatroni struct { + Version string `json:"version"` + Scope string `json:"scope"` +} + +// MemberData Patroni member data from Patroni API +type MemberData struct { + State string `json:"state"` + Role string `json:"role"` + ServerVersion int `json:"server_version"` + PendingRestart bool `json:"pending_restart"` + ClusterUnlocked bool `json:"cluster_unlocked"` + Patroni MemberDataPatroni `json:"patroni"` +} + +// GetMemberData read member data from patroni API +func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) { apiURLString, err := apiURL(server) if err != nil { - return "", err + return MemberData{}, err } response, err := p.httpClient.Get(apiURLString) if err != nil { - return "", fmt.Errorf("could not perform Get request: %v", err) + return MemberData{}, fmt.Errorf("could not perform Get request: %v", err) } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { - return "", fmt.Errorf("could not read response: %v", err) + return MemberData{}, fmt.Errorf("could not read response: %v", err) } - data := make(map[string]interface{}) + data := MemberData{} err = json.Unmarshal(body, &data) if err != nil { - return "", err + return MemberData{}, err } - state, ok := data["state"].(string) - if !ok { - return "", errors.New("Patroni Get call response contains wrong type for 'state' field") - } - - return state, nil - + return data, nil } diff --git a/pkg/util/patroni/patroni_test.go b/pkg/util/patroni/patroni_test.go index 388120ae5..939270453 100644 --- a/pkg/util/patroni/patroni_test.go +++ b/pkg/util/patroni/patroni_test.go @@ -1,10 +1,17 @@ package patroni import ( + "bytes" "errors" "fmt" - "k8s.io/api/core/v1" + "io/ioutil" + "net/http" "testing" + + "github.com/golang/mock/gomock" + "github.com/zalando/postgres-operator/mocks" + + v1 "k8s.io/api/core/v1" ) func newMockPod(ip string) *v1.Pod { @@ -72,3 +79,32 @@ func TestApiURL(t *testing.T) { } } } + +func TestPatroniAPI(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + json := `{"state": "running", "postmaster_start_time": "2021-02-19 14:31:50.053 CET", "role": "master", "server_version": 90621, "cluster_unlocked": false, "xlog": {"location": 55978296057856}, "timeline": 6, "database_system_identifier": "6462555844314089962", "pending_restart": true, "patroni": {"version": "2.0.1", "scope": "acid-rest92-standby"}}` + r := ioutil.NopCloser(bytes.NewReader([]byte(json))) + + response := http.Response{ + Status: "200", + Body: r, + } + + mockClient := mocks.NewMockHTTPClient(ctrl) + mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil) + + p := New(nil, mockClient) + + pod := v1.Pod{ + Status: v1.PodStatus{ + PodIP: "192.168.100.1", + }, + } + _, err := p.GetMemberData(&pod) + + if err != nil { + t.Errorf("Could not read Patroni data: %v", err) + } +}