Support major version upgrade via manifest and global upgrades via min version (#1372)
Support major version upgrade trigger via manifest. There is `off` `manual` and `full`. Manual is what you expect, and full will auto upgrade clusters below a certain threshold.
This commit is contained in:
parent
ca968ca150
commit
636a9a8191
|
|
@ -926,6 +926,33 @@ class EndToEndTestCase(unittest.TestCase):
|
||||||
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
|
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
|
||||||
self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing")
|
self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing")
|
||||||
|
|
||||||
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
||||||
|
@unittest.skip("Skipping this test until fixed")
|
||||||
|
def test_zaa_test_major_version_upgrade(self):
|
||||||
|
k8s = self.k8s
|
||||||
|
result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest-12.yaml")
|
||||||
|
self.eventuallyEqual(lambda: k8s.count_running_pods(labels="application=spilo,cluster-name=acid-upgrade-test"), 2, "No 2 pods running")
|
||||||
|
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
|
||||||
|
|
||||||
|
pg_patch_version = {
|
||||||
|
"spec": {
|
||||||
|
"postgres": {
|
||||||
|
"version": "13"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
||||||
|
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version)
|
||||||
|
|
||||||
|
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
|
||||||
|
|
||||||
|
def check_version_13():
|
||||||
|
p = k8s.get_patroni_state("acid-upgrade-test-0")
|
||||||
|
version = p["server_version"][0:2]
|
||||||
|
return version
|
||||||
|
|
||||||
|
self.evantuallyEqual(check_version_13, "13", "Version was not upgrade to 13")
|
||||||
|
|
||||||
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
||||||
@unittest.skip("Skipping this test until fixed")
|
@unittest.skip("Skipping this test until fixed")
|
||||||
def test_zzz_taint_based_eviction(self):
|
def test_zzz_taint_based_eviction(self):
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,7 @@ data:
|
||||||
# logical_backup_s3_secret_access_key: ""
|
# logical_backup_s3_secret_access_key: ""
|
||||||
logical_backup_s3_sse: "AES256"
|
logical_backup_s3_sse: "AES256"
|
||||||
logical_backup_schedule: "30 00 * * *"
|
logical_backup_schedule: "30 00 * * *"
|
||||||
|
major_version_upgrade_mode: "manual"
|
||||||
master_dns_name_format: "{cluster}.{team}.{hostedzone}"
|
master_dns_name_format: "{cluster}.{team}.{hostedzone}"
|
||||||
# master_pod_move_timeout: 20m
|
# master_pod_move_timeout: 20m
|
||||||
# max_instances: "-1"
|
# max_instances: "-1"
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
apiVersion: "acid.zalan.do/v1"
|
||||||
|
kind: postgresql
|
||||||
|
metadata:
|
||||||
|
name: acid-upgrade-test
|
||||||
|
namespace: default
|
||||||
|
spec:
|
||||||
|
teamId: "acid"
|
||||||
|
volume:
|
||||||
|
size: 1Gi
|
||||||
|
numberOfInstances: 2
|
||||||
|
users:
|
||||||
|
zalando: # database owner
|
||||||
|
- superuser
|
||||||
|
- createdb
|
||||||
|
foo_user: [] # role for application foo
|
||||||
|
databases:
|
||||||
|
foo: zalando # dbname: owner
|
||||||
|
preparedDatabases:
|
||||||
|
bar: {}
|
||||||
|
postgresql:
|
||||||
|
version: "12"
|
||||||
|
|
@ -83,15 +83,16 @@ type Cluster struct {
|
||||||
deleteOptions metav1.DeleteOptions
|
deleteOptions metav1.DeleteOptions
|
||||||
podEventsQueue *cache.FIFO
|
podEventsQueue *cache.FIFO
|
||||||
|
|
||||||
teamsAPIClient teams.Interface
|
teamsAPIClient teams.Interface
|
||||||
oauthTokenGetter OAuthTokenGetter
|
oauthTokenGetter OAuthTokenGetter
|
||||||
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
||||||
currentProcess Process
|
currentProcess Process
|
||||||
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
|
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
|
||||||
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
|
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
|
||||||
ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
|
ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
|
||||||
EBSVolumes map[string]volumes.VolumeProperties
|
EBSVolumes map[string]volumes.VolumeProperties
|
||||||
VolumeResizer volumes.VolumeResizer
|
VolumeResizer volumes.VolumeResizer
|
||||||
|
currentMajorVersion int
|
||||||
}
|
}
|
||||||
|
|
||||||
type compareStatefulsetResult struct {
|
type compareStatefulsetResult struct {
|
||||||
|
|
@ -128,15 +129,16 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
|
||||||
Secrets: make(map[types.UID]*v1.Secret),
|
Secrets: make(map[types.UID]*v1.Secret),
|
||||||
Services: make(map[PostgresRole]*v1.Service),
|
Services: make(map[PostgresRole]*v1.Service),
|
||||||
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
|
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
|
||||||
userSyncStrategy: users.DefaultUserSyncStrategy{PasswordEncryption: passwordEncryption},
|
userSyncStrategy: users.DefaultUserSyncStrategy{PasswordEncryption: passwordEncryption},
|
||||||
deleteOptions: metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy},
|
deleteOptions: metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy},
|
||||||
podEventsQueue: podEventsQueue,
|
podEventsQueue: podEventsQueue,
|
||||||
KubeClient: kubeClient,
|
KubeClient: kubeClient,
|
||||||
|
currentMajorVersion: 0,
|
||||||
}
|
}
|
||||||
cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName())
|
cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName())
|
||||||
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
|
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
|
||||||
cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName)
|
cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName)
|
||||||
cluster.patroni = patroni.New(cluster.logger)
|
cluster.patroni = patroni.New(cluster.logger, nil)
|
||||||
cluster.eventRecorder = eventRecorder
|
cluster.eventRecorder = eventRecorder
|
||||||
|
|
||||||
cluster.EBSVolumes = make(map[string]volumes.VolumeProperties)
|
cluster.EBSVolumes = make(map[string]volumes.VolumeProperties)
|
||||||
|
|
@ -359,7 +361,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) {
|
if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) {
|
||||||
match = false
|
match = false
|
||||||
reasons = append(reasons, "new statefulset's annotations does not match the current one")
|
reasons = append(reasons, "new statefulset's annotations do not match the current one")
|
||||||
}
|
}
|
||||||
|
|
||||||
needsRollUpdate, reasons = c.compareContainers("initContainers", c.Statefulset.Spec.Template.Spec.InitContainers, statefulSet.Spec.Template.Spec.InitContainers, needsRollUpdate, reasons)
|
needsRollUpdate, reasons = c.compareContainers("initContainers", c.Statefulset.Spec.Template.Spec.InitContainers, statefulSet.Spec.Template.Spec.InitContainers, needsRollUpdate, reasons)
|
||||||
|
|
@ -614,17 +616,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
|
|
||||||
logNiceDiff(c.logger, oldSpec, newSpec)
|
logNiceDiff(c.logger, oldSpec, newSpec)
|
||||||
|
|
||||||
if oldSpec.Spec.PostgresqlParam.PgVersion > newSpec.Spec.PostgresqlParam.PgVersion {
|
if IsBiggerPostgresVersion(oldSpec.Spec.PostgresqlParam.PgVersion, c.GetDesiredMajorVersion()) {
|
||||||
c.logger.Warningf("postgresql version change(%q -> %q) has no effect",
|
c.logger.Infof("postgresql version increased (%s -> %s), depending on config manual upgrade needed",
|
||||||
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
|
|
||||||
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect",
|
|
||||||
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
|
|
||||||
// we need that hack to generate statefulset with the old version
|
|
||||||
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
|
|
||||||
} else if oldSpec.Spec.PostgresqlParam.PgVersion < newSpec.Spec.PostgresqlParam.PgVersion {
|
|
||||||
c.logger.Infof("postgresql version increased (%q -> %q), major version upgrade can be done manually after StatefulSet Sync",
|
|
||||||
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
|
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
|
||||||
syncStatetfulSet = true
|
syncStatetfulSet = true
|
||||||
|
} else {
|
||||||
|
c.logger.Infof("postgresql major version unchanged or smaller, no changes needed")
|
||||||
|
// sticking with old version, this will also advance GetDesiredVersion next time.
|
||||||
|
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service
|
// Service
|
||||||
|
|
@ -781,6 +780,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !updateFailed {
|
||||||
|
// Major version upgrade must only fire after success of earlier operations and should stay last
|
||||||
|
if err := c.majorVersionUpgrade(); err != nil {
|
||||||
|
c.logger.Errorf("major version upgrade failed: %v", err)
|
||||||
|
updateFailed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -734,7 +734,7 @@ func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration stri
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if c.OpConfig.EnablePgVersionEnvVar {
|
if c.OpConfig.EnablePgVersionEnvVar {
|
||||||
envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.Spec.PgVersion})
|
envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.GetDesiredMajorVersion()})
|
||||||
}
|
}
|
||||||
// Spilo expects cluster labels as JSON
|
// Spilo expects cluster labels as JSON
|
||||||
if clusterLabels, err := json.Marshal(labels.Set(c.OpConfig.ClusterLabels)); err != nil {
|
if clusterLabels, err := json.Marshal(labels.Set(c.OpConfig.ClusterLabels)); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,97 @@
|
||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/zalando/postgres-operator/pkg/spec"
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// VersionMap Map of version numbers
|
||||||
|
var VersionMap = map[string]int{
|
||||||
|
"9.5": 90500,
|
||||||
|
"9.6": 90600,
|
||||||
|
"10": 100000,
|
||||||
|
"11": 110000,
|
||||||
|
"12": 120000,
|
||||||
|
"13": 130000,
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsBiggerPostgresVersion Compare two Postgres version numbers
|
||||||
|
func IsBiggerPostgresVersion(old string, new string) bool {
|
||||||
|
oldN, _ := VersionMap[old]
|
||||||
|
newN, _ := VersionMap[new]
|
||||||
|
return newN > oldN
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDesiredMajorVersionAsInt Convert string to comparable integer of PG version
|
||||||
|
func (c *Cluster) GetDesiredMajorVersionAsInt() int {
|
||||||
|
return VersionMap[c.GetDesiredMajorVersion()]
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDesiredMajorVersion returns major version to use, incl. potential auto upgrade
|
||||||
|
func (c *Cluster) GetDesiredMajorVersion() string {
|
||||||
|
|
||||||
|
if c.Config.OpConfig.MajorVersionUpgradeMode == "full" {
|
||||||
|
// current is 9.5, minimal is 11 allowing 11 to 13 clusters, everything below is upgraded
|
||||||
|
if IsBiggerPostgresVersion(c.Spec.PgVersion, c.Config.OpConfig.MinimalMajorVersion) {
|
||||||
|
c.logger.Infof("overwriting configured major version %s to %s", c.Spec.PgVersion, c.Config.OpConfig.TargetMajorVersion)
|
||||||
|
return c.Config.OpConfig.TargetMajorVersion
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Spec.PgVersion
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) majorVersionUpgrade() error {
|
||||||
|
|
||||||
|
if c.OpConfig.MajorVersionUpgradeMode == "off" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
desiredVersion := c.GetDesiredMajorVersionAsInt()
|
||||||
|
|
||||||
|
if c.currentMajorVersion >= desiredVersion {
|
||||||
|
c.logger.Infof("cluster version up to date. current: %d desired: %d", c.currentMajorVersion, desiredVersion)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pods, err := c.listPods()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
allRunning := true
|
||||||
|
|
||||||
|
var masterPod *v1.Pod
|
||||||
|
|
||||||
|
for _, pod := range pods {
|
||||||
|
ps, _ := c.patroni.GetMemberData(&pod)
|
||||||
|
|
||||||
|
if ps.State != "running" {
|
||||||
|
allRunning = false
|
||||||
|
c.logger.Infof("identified non running pod, potentially skipping major version upgrade")
|
||||||
|
}
|
||||||
|
|
||||||
|
if ps.Role == "master" {
|
||||||
|
masterPod = &pod
|
||||||
|
c.currentMajorVersion = ps.ServerVersion
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numberOfPods := len(pods)
|
||||||
|
if allRunning && masterPod != nil {
|
||||||
|
c.logger.Infof("healthy cluster ready to upgrade, current: %d desired: %d", c.currentMajorVersion, desiredVersion)
|
||||||
|
if c.currentMajorVersion < desiredVersion {
|
||||||
|
podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name}
|
||||||
|
c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
|
||||||
|
upgradeCommand := fmt.Sprintf("/usr/bin/python3 /scripts/inplace_upgrade.py %d 2>&1 | tee last_upgrade.log", numberOfPods)
|
||||||
|
_, err := c.ExecCommand(podName, "/bin/su", "postgres", "-c", upgradeCommand)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -12,6 +12,7 @@ import (
|
||||||
|
|
||||||
"github.com/zalando/postgres-operator/pkg/spec"
|
"github.com/zalando/postgres-operator/pkg/spec"
|
||||||
"github.com/zalando/postgres-operator/pkg/util"
|
"github.com/zalando/postgres-operator/pkg/util"
|
||||||
|
"github.com/zalando/postgres-operator/pkg/util/patroni"
|
||||||
"github.com/zalando/postgres-operator/pkg/util/retryutil"
|
"github.com/zalando/postgres-operator/pkg/util/retryutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -312,14 +313,14 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
|
||||||
|
|
||||||
for _, pod := range pods.Items {
|
for _, pod := range pods.Items {
|
||||||
|
|
||||||
var state string
|
var data patroni.MemberData
|
||||||
|
|
||||||
err := retryutil.Retry(1*time.Second, 5*time.Second,
|
err := retryutil.Retry(1*time.Second, 5*time.Second,
|
||||||
func() (bool, error) {
|
func() (bool, error) {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
state, err = c.patroni.GetPatroniMemberState(&pod)
|
data, err = c.patroni.GetMemberData(&pod)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
|
@ -331,7 +332,7 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("failed to get Patroni state for pod: %s", err)
|
c.logger.Errorf("failed to get Patroni state for pod: %s", err)
|
||||||
return false
|
return false
|
||||||
} else if state == "creating replica" {
|
} else if data.State == "creating replica" {
|
||||||
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
|
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -118,6 +118,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
||||||
return fmt.Errorf("could not sync connection pooler: %v", err)
|
return fmt.Errorf("could not sync connection pooler: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Major version upgrade must only run after success of all earlier operations, must remain last item in sync
|
||||||
|
if err := c.majorVersionUpgrade(); err != nil {
|
||||||
|
c.logger.Errorf("major version upgrade failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -471,7 +476,7 @@ func (c *Cluster) syncSecrets() error {
|
||||||
for secretUsername, secretSpec := range secrets {
|
for secretUsername, secretSpec := range secrets {
|
||||||
if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Create(context.TODO(), secretSpec, metav1.CreateOptions{}); err == nil {
|
if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Create(context.TODO(), secretSpec, metav1.CreateOptions{}); err == nil {
|
||||||
c.Secrets[secret.UID] = secret
|
c.Secrets[secret.UID] = secret
|
||||||
c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID)
|
c.logger.Debugf("created new secret %s, uid: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if k8sutil.ResourceAlreadyExists(err) {
|
if k8sutil.ResourceAlreadyExists(err) {
|
||||||
|
|
@ -480,7 +485,7 @@ func (c *Cluster) syncSecrets() error {
|
||||||
return fmt.Errorf("could not get current secret: %v", err)
|
return fmt.Errorf("could not get current secret: %v", err)
|
||||||
}
|
}
|
||||||
if secretUsername != string(secret.Data["username"]) {
|
if secretUsername != string(secret.Data["username"]) {
|
||||||
c.logger.Errorf("secret %s does not contain the role %q", secretSpec.Name, secretUsername)
|
c.logger.Errorf("secret %s does not contain the role %s", secretSpec.Name, secretUsername)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c.Secrets[secret.UID] = secret
|
c.Secrets[secret.UID] = secret
|
||||||
|
|
@ -499,7 +504,7 @@ func (c *Cluster) syncSecrets() error {
|
||||||
if pwdUser.Password != string(secret.Data["password"]) &&
|
if pwdUser.Password != string(secret.Data["password"]) &&
|
||||||
pwdUser.Origin == spec.RoleOriginInfrastructure {
|
pwdUser.Origin == spec.RoleOriginInfrastructure {
|
||||||
|
|
||||||
c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name)
|
c.logger.Debugf("updating the secret %s from the infrastructure roles", secretSpec.Name)
|
||||||
if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(context.TODO(), secretSpec, metav1.UpdateOptions{}); err != nil {
|
if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(context.TODO(), secretSpec, metav1.UpdateOptions{}); err != nil {
|
||||||
return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err)
|
return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err)
|
||||||
}
|
}
|
||||||
|
|
@ -509,7 +514,7 @@ func (c *Cluster) syncSecrets() error {
|
||||||
userMap[secretUsername] = pwdUser
|
userMap[secretUsername] = pwdUser
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err)
|
return fmt.Errorf("could not create secret for user %s: %v", secretUsername, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -199,10 +199,10 @@ func (c *Controller) processEvent(event ClusterEvent) {
|
||||||
if event.EventType == EventRepair {
|
if event.EventType == EventRepair {
|
||||||
runRepair, lastOperationStatus := cl.NeedsRepair()
|
runRepair, lastOperationStatus := cl.NeedsRepair()
|
||||||
if !runRepair {
|
if !runRepair {
|
||||||
lg.Debugf("Observed cluster status %s, repair is not required", lastOperationStatus)
|
lg.Debugf("observed cluster status %s, repair is not required", lastOperationStatus)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lg.Debugf("Observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus)
|
lg.Debugf("observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus)
|
||||||
event.EventType = EventSync
|
event.EventType = EventSync
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -217,7 +217,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.submitRBACCredentials(event); err != nil {
|
if err := c.submitRBACCredentials(event); err != nil {
|
||||||
c.logger.Warnf("Pods and/or Patroni may misfunction due to the lack of permissions: %v", err)
|
c.logger.Warnf("pods and/or Patroni may misfunction due to the lack of permissions: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -225,7 +225,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
|
||||||
switch event.EventType {
|
switch event.EventType {
|
||||||
case EventAdd:
|
case EventAdd:
|
||||||
if clusterFound {
|
if clusterFound {
|
||||||
lg.Infof("Recieved add event for already existing Postgres cluster")
|
lg.Infof("recieved add event for already existing Postgres cluster")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -348,11 +348,11 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
|
||||||
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
|
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
|
||||||
|
|
||||||
deprecate := func(deprecated, replacement string) {
|
deprecate := func(deprecated, replacement string) {
|
||||||
c.logger.Warningf("Parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
|
c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
|
||||||
}
|
}
|
||||||
|
|
||||||
noeffect := func(param string, explanation string) {
|
noeffect := func(param string, explanation string) {
|
||||||
c.logger.Warningf("Parameter %q takes no effect. %s", param, explanation)
|
c.logger.Warningf("parameter %q takes no effect. %s", param, explanation)
|
||||||
}
|
}
|
||||||
|
|
||||||
if spec.UseLoadBalancer != nil {
|
if spec.UseLoadBalancer != nil {
|
||||||
|
|
@ -368,7 +368,7 @@ func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.Postg
|
||||||
|
|
||||||
if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
|
if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
|
||||||
(spec.EnableReplicaLoadBalancer != nil || spec.EnableMasterLoadBalancer != nil) {
|
(spec.EnableReplicaLoadBalancer != nil || spec.EnableMasterLoadBalancer != nil) {
|
||||||
c.logger.Warnf("Both old and new load balancer parameters are present in the manifest, ignoring old ones")
|
c.logger.Warnf("both old and new load balancer parameters are present in the manifest, ignoring old ones")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -480,3 +480,45 @@ func TestInfrastructureRoleDefinitions(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SubConfig struct {
|
||||||
|
teammap map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
type SuperConfig struct {
|
||||||
|
sub SubConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnderstandingMapsAndReferences(t *testing.T) {
|
||||||
|
teams := map[string]string{"acid": "Felix"}
|
||||||
|
|
||||||
|
sc := SubConfig{
|
||||||
|
teammap: teams,
|
||||||
|
}
|
||||||
|
|
||||||
|
ssc := SuperConfig{
|
||||||
|
sub: sc,
|
||||||
|
}
|
||||||
|
|
||||||
|
teams["24x7"] = "alex"
|
||||||
|
|
||||||
|
if len(ssc.sub.teammap) != 2 {
|
||||||
|
t.Errorf("Team Map does not contain 2 elements")
|
||||||
|
}
|
||||||
|
|
||||||
|
ssc.sub.teammap["teapot"] = "Mikkel"
|
||||||
|
|
||||||
|
if len(teams) != 3 {
|
||||||
|
t.Errorf("Team Map does not contain 3 elements")
|
||||||
|
}
|
||||||
|
|
||||||
|
teams = make(map[string]string)
|
||||||
|
|
||||||
|
if len(ssc.sub.teammap) != 3 {
|
||||||
|
t.Errorf("Team Map does not contain 0 elements")
|
||||||
|
}
|
||||||
|
|
||||||
|
if &teams == &(ssc.sub.teammap) {
|
||||||
|
t.Errorf("Identical maps")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -206,6 +206,10 @@ type Config struct {
|
||||||
EnableLazySpiloUpgrade bool `name:"enable_lazy_spilo_upgrade" default:"false"`
|
EnableLazySpiloUpgrade bool `name:"enable_lazy_spilo_upgrade" default:"false"`
|
||||||
EnablePgVersionEnvVar bool `name:"enable_pgversion_env_var" default:"true"`
|
EnablePgVersionEnvVar bool `name:"enable_pgversion_env_var" default:"true"`
|
||||||
EnableSpiloWalPathCompat bool `name:"enable_spilo_wal_path_compat" default:"false"`
|
EnableSpiloWalPathCompat bool `name:"enable_spilo_wal_path_compat" default:"false"`
|
||||||
|
MajorVersionUpgradeMode string `name:"major_version_upgrade_mode" default:"off"` // off - no actions, manual - manifest triggers action, full - manifest and minimal version violation trigger upgrade
|
||||||
|
MinimalMajorVersion string `name:"minimal_major_version" default:"9.5"`
|
||||||
|
TargetMajorVersion string `name:"target_major_version" default:"13"`
|
||||||
|
AllowedMajorUpgradeVersions []string `name:"allowed_major_upgrade_versions" default:"12,13"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustMarshal marshals the config or panics
|
// MustMarshal marshals the config or panics
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
package httpclient
|
||||||
|
|
||||||
|
//go:generate mockgen -package mocks -destination=$PWD/mocks/$GOFILE -source=$GOFILE -build_flags=-mod=vendor
|
||||||
|
|
||||||
|
import "net/http"
|
||||||
|
|
||||||
|
// HTTPClient interface
|
||||||
|
type HTTPClient interface {
|
||||||
|
Do(req *http.Request) (*http.Response, error)
|
||||||
|
Get(url string) (resp *http.Response, err error)
|
||||||
|
}
|
||||||
|
|
@ -3,7 +3,6 @@ package patroni
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
|
|
@ -11,6 +10,8 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
httpclient "github.com/zalando/postgres-operator/pkg/util/httpclient"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
)
|
)
|
||||||
|
|
@ -26,24 +27,28 @@ const (
|
||||||
type Interface interface {
|
type Interface interface {
|
||||||
Switchover(master *v1.Pod, candidate string) error
|
Switchover(master *v1.Pod, candidate string) error
|
||||||
SetPostgresParameters(server *v1.Pod, options map[string]string) error
|
SetPostgresParameters(server *v1.Pod, options map[string]string) error
|
||||||
GetPatroniMemberState(pod *v1.Pod) (string, error)
|
GetMemberData(server *v1.Pod) (MemberData, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Patroni API client
|
// Patroni API client
|
||||||
type Patroni struct {
|
type Patroni struct {
|
||||||
httpClient *http.Client
|
httpClient httpclient.HTTPClient
|
||||||
logger *logrus.Entry
|
logger *logrus.Entry
|
||||||
}
|
}
|
||||||
|
|
||||||
// New create patroni
|
// New create patroni
|
||||||
func New(logger *logrus.Entry) *Patroni {
|
func New(logger *logrus.Entry, client httpclient.HTTPClient) *Patroni {
|
||||||
cl := http.Client{
|
if client == nil {
|
||||||
Timeout: timeout,
|
|
||||||
|
client = &http.Client{
|
||||||
|
Timeout: timeout,
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Patroni{
|
return &Patroni{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
httpClient: &cl,
|
httpClient: client,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -68,7 +73,9 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer)
|
||||||
return fmt.Errorf("could not create request: %v", err)
|
return fmt.Errorf("could not create request: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.logger.Debugf("making %s http request: %s", method, request.URL.String())
|
if p.logger != nil {
|
||||||
|
p.logger.Debugf("making %s http request: %s", method, request.URL.String())
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := p.httpClient.Do(request)
|
resp, err := p.httpClient.Do(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -126,35 +133,45 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
|
||||||
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
|
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
//GetPatroniMemberState returns a state of member of a Patroni cluster
|
// MemberDataPatroni child element
|
||||||
func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) {
|
type MemberDataPatroni struct {
|
||||||
|
Version string `json:"version"`
|
||||||
|
Scope string `json:"scope"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// MemberData Patroni member data from Patroni API
|
||||||
|
type MemberData struct {
|
||||||
|
State string `json:"state"`
|
||||||
|
Role string `json:"role"`
|
||||||
|
ServerVersion int `json:"server_version"`
|
||||||
|
PendingRestart bool `json:"pending_restart"`
|
||||||
|
ClusterUnlocked bool `json:"cluster_unlocked"`
|
||||||
|
Patroni MemberDataPatroni `json:"patroni"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMemberData read member data from patroni API
|
||||||
|
func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) {
|
||||||
|
|
||||||
apiURLString, err := apiURL(server)
|
apiURLString, err := apiURL(server)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return MemberData{}, err
|
||||||
}
|
}
|
||||||
response, err := p.httpClient.Get(apiURLString)
|
response, err := p.httpClient.Get(apiURLString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("could not perform Get request: %v", err)
|
return MemberData{}, fmt.Errorf("could not perform Get request: %v", err)
|
||||||
}
|
}
|
||||||
defer response.Body.Close()
|
defer response.Body.Close()
|
||||||
|
|
||||||
body, err := ioutil.ReadAll(response.Body)
|
body, err := ioutil.ReadAll(response.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("could not read response: %v", err)
|
return MemberData{}, fmt.Errorf("could not read response: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
data := make(map[string]interface{})
|
data := MemberData{}
|
||||||
err = json.Unmarshal(body, &data)
|
err = json.Unmarshal(body, &data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return MemberData{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
state, ok := data["state"].(string)
|
return data, nil
|
||||||
if !ok {
|
|
||||||
return "", errors.New("Patroni Get call response contains wrong type for 'state' field")
|
|
||||||
}
|
|
||||||
|
|
||||||
return state, nil
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,17 @@
|
||||||
package patroni
|
package patroni
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"k8s.io/api/core/v1"
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/golang/mock/gomock"
|
||||||
|
"github.com/zalando/postgres-operator/mocks"
|
||||||
|
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newMockPod(ip string) *v1.Pod {
|
func newMockPod(ip string) *v1.Pod {
|
||||||
|
|
@ -72,3 +79,32 @@ func TestApiURL(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPatroniAPI(t *testing.T) {
|
||||||
|
ctrl := gomock.NewController(t)
|
||||||
|
defer ctrl.Finish()
|
||||||
|
|
||||||
|
json := `{"state": "running", "postmaster_start_time": "2021-02-19 14:31:50.053 CET", "role": "master", "server_version": 90621, "cluster_unlocked": false, "xlog": {"location": 55978296057856}, "timeline": 6, "database_system_identifier": "6462555844314089962", "pending_restart": true, "patroni": {"version": "2.0.1", "scope": "acid-rest92-standby"}}`
|
||||||
|
r := ioutil.NopCloser(bytes.NewReader([]byte(json)))
|
||||||
|
|
||||||
|
response := http.Response{
|
||||||
|
Status: "200",
|
||||||
|
Body: r,
|
||||||
|
}
|
||||||
|
|
||||||
|
mockClient := mocks.NewMockHTTPClient(ctrl)
|
||||||
|
mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil)
|
||||||
|
|
||||||
|
p := New(nil, mockClient)
|
||||||
|
|
||||||
|
pod := v1.Pod{
|
||||||
|
Status: v1.PodStatus{
|
||||||
|
PodIP: "192.168.100.1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := p.GetMemberData(&pod)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Could not read Patroni data: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue