Merge branch 'master' of github.com:zalando/postgres-operator
This commit is contained in:
commit
ef264ac792
|
|
@ -4,7 +4,7 @@ watch -c "
|
|||
kubectl get postgresql --all-namespaces
|
||||
echo
|
||||
echo -n 'Rolling upgrade pending: '
|
||||
kubectl get statefulset -o jsonpath='{.items..metadata.annotations.zalando-postgres-operator-rolling-update-required}'
|
||||
kubectl get pods -o jsonpath='{.items[].metadata.annotations.zalando-postgres-operator-rolling-update-required}'
|
||||
echo
|
||||
echo
|
||||
echo 'Pods'
|
||||
|
|
|
|||
|
|
@ -211,7 +211,7 @@ class K8s:
|
|||
self.wait_for_logical_backup_job(expected_num_of_jobs=1)
|
||||
|
||||
def delete_operator_pod(self, step="Delete operator pod"):
|
||||
# patching the pod template in the deployment restarts the operator pod
|
||||
# patching the pod template in the deployment restarts the operator pod
|
||||
self.api.apps_v1.patch_namespaced_deployment("postgres-operator", "default", {"spec": {"template": {"metadata": {"annotations": {"step": "{}-{}".format(step, time.time())}}}}})
|
||||
self.wait_for_operator_pod_start()
|
||||
|
||||
|
|
@ -219,8 +219,8 @@ class K8s:
|
|||
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
|
||||
self.delete_operator_pod(step=step)
|
||||
|
||||
def patch_statefulset(self, data, name="acid-minimal-cluster", namespace="default"):
|
||||
self.api.apps_v1.patch_namespaced_stateful_set(name, namespace, data)
|
||||
def patch_pod(self, data, pod_name, namespace="default"):
|
||||
self.api.core_v1.patch_namespaced_pod(pod_name, namespace, data)
|
||||
|
||||
def create_with_kubectl(self, path):
|
||||
return subprocess.run(
|
||||
|
|
@ -280,19 +280,21 @@ class K8s:
|
|||
return None
|
||||
return pod.items[0].spec.containers[0].image
|
||||
|
||||
def get_cluster_leader_pod(self, pg_cluster_name, namespace='default'):
|
||||
labels = {
|
||||
'application': 'spilo',
|
||||
'cluster-name': pg_cluster_name,
|
||||
'spilo-role': 'master',
|
||||
}
|
||||
def get_cluster_pod(self, role, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
|
||||
labels = labels + ',spilo-role=' + role
|
||||
|
||||
pods = self.api.core_v1.list_namespaced_pod(
|
||||
namespace, label_selector=to_selector(labels)).items
|
||||
namespace, label_selector=labels).items
|
||||
|
||||
if pods:
|
||||
return pods[0]
|
||||
|
||||
def get_cluster_leader_pod(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
|
||||
return self.get_cluster_pod('master', labels, namespace)
|
||||
|
||||
def get_cluster_replica_pod(self, labels='application=spilo,cluster-name=acid-minimal-cluster', namespace='default'):
|
||||
return self.get_cluster_pod('replica', labels, namespace)
|
||||
|
||||
|
||||
class K8sBase:
|
||||
'''
|
||||
|
|
|
|||
|
|
@ -168,12 +168,25 @@ class EndToEndTestCase(unittest.TestCase):
|
|||
"additional_pod_capabilities": ','.join(capabilities),
|
||||
},
|
||||
}
|
||||
self.k8s.update_config(patch_capabilities)
|
||||
self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"},
|
||||
"Operator does not get in sync")
|
||||
|
||||
self.eventuallyEqual(lambda: self.k8s.count_pods_with_container_capabilities(capabilities, cluster_label),
|
||||
2, "Container capabilities not updated")
|
||||
|
||||
# get node and replica (expected target of new master)
|
||||
_, replica_nodes = self.k8s.get_pg_nodes(cluster_label)
|
||||
|
||||
try:
|
||||
self.k8s.update_config(patch_capabilities)
|
||||
self.eventuallyEqual(lambda: self.k8s.get_operator_state(), {"0": "idle"},
|
||||
"Operator does not get in sync")
|
||||
|
||||
# changed security context of postrges container should trigger a rolling update
|
||||
self.k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label)
|
||||
self.k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
|
||||
|
||||
self.eventuallyEqual(lambda: self.k8s.count_pods_with_container_capabilities(capabilities, cluster_label),
|
||||
2, "Container capabilities not updated")
|
||||
|
||||
except timeout_decorator.TimeoutError:
|
||||
print('Operator log: {}'.format(k8s.get_operator_log()))
|
||||
raise
|
||||
|
||||
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
||||
def test_additional_teams_and_members(self):
|
||||
|
|
@ -212,7 +225,7 @@ class EndToEndTestCase(unittest.TestCase):
|
|||
# make sure we let one sync pass and the new user being added
|
||||
time.sleep(15)
|
||||
|
||||
leader = self.k8s.get_cluster_leader_pod('acid-minimal-cluster')
|
||||
leader = self.k8s.get_cluster_leader_pod()
|
||||
user_query = """
|
||||
SELECT usename
|
||||
FROM pg_catalog.pg_user
|
||||
|
|
@ -392,7 +405,7 @@ class EndToEndTestCase(unittest.TestCase):
|
|||
# credentials.
|
||||
db_list = []
|
||||
|
||||
leader = k8s.get_cluster_leader_pod('acid-minimal-cluster')
|
||||
leader = k8s.get_cluster_leader_pod()
|
||||
schemas_query = """
|
||||
select schema_name
|
||||
from information_schema.schemata
|
||||
|
|
@ -611,7 +624,7 @@ class EndToEndTestCase(unittest.TestCase):
|
|||
k8s.update_config(unpatch_lazy_spilo_upgrade, step="patch lazy upgrade")
|
||||
|
||||
# at this point operator will complete the normal rolling upgrade
|
||||
# so we additonally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works
|
||||
# so we additionally test if disabling the lazy upgrade - forcing the normal rolling upgrade - works
|
||||
self.eventuallyEqual(lambda: k8s.get_effective_pod_image(pod0),
|
||||
conf_image, "Rolling upgrade was not executed",
|
||||
50, 3)
|
||||
|
|
@ -750,12 +763,6 @@ class EndToEndTestCase(unittest.TestCase):
|
|||
|
||||
self.eventuallyTrue(verify_pod_limits, "Pod limits where not adjusted")
|
||||
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
# cls.k8s.update_config({}, step="Setup")
|
||||
cls.k8s.patch_statefulset({"meta": {"annotations": {"zalando-postgres-operator-rolling-update-required": False}}})
|
||||
pass
|
||||
|
||||
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
||||
def test_multi_namespace_support(self):
|
||||
'''
|
||||
|
|
@ -784,6 +791,139 @@ class EndToEndTestCase(unittest.TestCase):
|
|||
"acid.zalan.do", "v1", self.test_namespace, "postgresqls", "acid-test-cluster")
|
||||
time.sleep(5)
|
||||
|
||||
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
||||
def test_rolling_update_flag(self):
|
||||
'''
|
||||
Add rolling update flag to only the master and see it failing over
|
||||
'''
|
||||
k8s = self.k8s
|
||||
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
|
||||
|
||||
# verify we are in good state from potential previous tests
|
||||
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
|
||||
|
||||
# get node and replica (expected target of new master)
|
||||
_, replica_nodes = k8s.get_pg_nodes(cluster_label)
|
||||
|
||||
# rolling update annotation
|
||||
flag = {
|
||||
"metadata": {
|
||||
"annotations": {
|
||||
"zalando-postgres-operator-rolling-update-required": "true",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try:
|
||||
podsList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label)
|
||||
for pod in podsList.items:
|
||||
# add flag only to the master to make it appear to the operator as a leftover from a rolling update
|
||||
if pod.metadata.labels.get('spilo-role') == 'master':
|
||||
old_creation_timestamp = pod.metadata.creation_timestamp
|
||||
k8s.patch_pod(flag, pod.metadata.name, pod.metadata.namespace)
|
||||
else:
|
||||
# remember replica name to check if operator does a switchover
|
||||
switchover_target = pod.metadata.name
|
||||
|
||||
# do not wait until the next sync
|
||||
k8s.delete_operator_pod()
|
||||
|
||||
# operator should now recreate the master pod and do a switchover before
|
||||
k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label)
|
||||
|
||||
# check if the former replica is now the new master
|
||||
leader = k8s.get_cluster_leader_pod()
|
||||
self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover")
|
||||
|
||||
# check that the old master has been recreated
|
||||
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
|
||||
replica = k8s.get_cluster_replica_pod()
|
||||
self.assertTrue(replica.metadata.creation_timestamp > old_creation_timestamp, "Old master pod was not recreated")
|
||||
|
||||
|
||||
except timeout_decorator.TimeoutError:
|
||||
print('Operator log: {}'.format(k8s.get_operator_log()))
|
||||
raise
|
||||
|
||||
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
||||
def test_rolling_update_label_timeout(self):
|
||||
'''
|
||||
Simulate case when replica does not receive label in time and rolling update does not finish
|
||||
'''
|
||||
k8s = self.k8s
|
||||
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
|
||||
flag = "zalando-postgres-operator-rolling-update-required"
|
||||
|
||||
# verify we are in good state from potential previous tests
|
||||
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No 2 pods running")
|
||||
|
||||
# get node and replica (expected target of new master)
|
||||
_, replica_nodes = k8s.get_pg_nodes(cluster_label)
|
||||
|
||||
# rolling update annotation
|
||||
rolling_update_patch = {
|
||||
"metadata": {
|
||||
"annotations": {
|
||||
flag: "true",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# make pod_label_wait_timeout so short that rolling update fails on first try
|
||||
# temporarily lower resync interval to reduce waiting for further tests
|
||||
# pods should get healthy in the meantime
|
||||
patch_resync_config = {
|
||||
"data": {
|
||||
"pod_label_wait_timeout": "2s",
|
||||
"resync_period": "20s",
|
||||
}
|
||||
}
|
||||
|
||||
try:
|
||||
# patch both pods for rolling update
|
||||
podList = k8s.api.core_v1.list_namespaced_pod('default', label_selector=cluster_label)
|
||||
for pod in podList.items:
|
||||
k8s.patch_pod(rolling_update_patch, pod.metadata.name, pod.metadata.namespace)
|
||||
if pod.metadata.labels.get('spilo-role') == 'replica':
|
||||
switchover_target = pod.metadata.name
|
||||
|
||||
# update config and restart operator
|
||||
k8s.update_config(patch_resync_config, "update resync interval and pod_label_wait_timeout")
|
||||
|
||||
# operator should now recreate the replica pod first and do a switchover after
|
||||
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
|
||||
|
||||
# pod_label_wait_timeout should have been exceeded hence the rolling update is continued on next sync
|
||||
# check if the cluster state is "SyncFailed"
|
||||
self.eventuallyEqual(lambda: k8s.pg_get_status(), "SyncFailed", "Expected SYNC event to fail")
|
||||
|
||||
# wait for next sync, replica should be running normally by now and be ready for switchover
|
||||
k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label)
|
||||
|
||||
# check if the former replica is now the new master
|
||||
leader = k8s.get_cluster_leader_pod()
|
||||
self.eventuallyEqual(lambda: leader.metadata.name, switchover_target, "Rolling update flag did not trigger switchover")
|
||||
|
||||
# wait for the old master to get restarted
|
||||
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
|
||||
|
||||
# status should again be "SyncFailed" but turn into "Running" on the next sync
|
||||
time.sleep(10)
|
||||
self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Expected running cluster after two syncs")
|
||||
|
||||
# revert config changes
|
||||
patch_resync_config = {
|
||||
"data": {
|
||||
"pod_label_wait_timeout": "10m",
|
||||
"resync_period": "30m",
|
||||
}
|
||||
}
|
||||
k8s.update_config(patch_resync_config, "revert resync interval and pod_label_wait_timeout")
|
||||
|
||||
|
||||
except timeout_decorator.TimeoutError:
|
||||
print('Operator log: {}'.format(k8s.get_operator_log()))
|
||||
raise
|
||||
|
||||
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
||||
def test_zz_node_readiness_label(self):
|
||||
|
|
@ -926,6 +1066,33 @@ class EndToEndTestCase(unittest.TestCase):
|
|||
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
|
||||
self.eventuallyTrue(lambda: k8s.check_statefulset_annotations(cluster_label, annotations), "Annotations missing")
|
||||
|
||||
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
||||
@unittest.skip("Skipping this test until fixed")
|
||||
def test_zaa_test_major_version_upgrade(self):
|
||||
k8s = self.k8s
|
||||
result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest-12.yaml")
|
||||
self.eventuallyEqual(lambda: k8s.count_running_pods(labels="application=spilo,cluster-name=acid-upgrade-test"), 2, "No 2 pods running")
|
||||
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
|
||||
|
||||
pg_patch_version = {
|
||||
"spec": {
|
||||
"postgres": {
|
||||
"version": "13"
|
||||
}
|
||||
}
|
||||
}
|
||||
k8s.api.custom_objects_api.patch_namespaced_custom_object(
|
||||
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version)
|
||||
|
||||
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
|
||||
|
||||
def check_version_13():
|
||||
p = k8s.get_patroni_state("acid-upgrade-test-0")
|
||||
version = p["server_version"][0:2]
|
||||
return version
|
||||
|
||||
self.evantuallyEqual(check_version_13, "13", "Version was not upgrade to 13")
|
||||
|
||||
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
|
||||
@unittest.skip("Skipping this test until fixed")
|
||||
def test_zzz_taint_based_eviction(self):
|
||||
|
|
|
|||
|
|
@ -74,6 +74,7 @@ data:
|
|||
# logical_backup_s3_secret_access_key: ""
|
||||
logical_backup_s3_sse: "AES256"
|
||||
logical_backup_schedule: "30 00 * * *"
|
||||
major_version_upgrade_mode: "manual"
|
||||
master_dns_name_format: "{cluster}.{team}.{hostedzone}"
|
||||
# master_pod_move_timeout: 20m
|
||||
# max_instances: "-1"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,21 @@
|
|||
apiVersion: "acid.zalan.do/v1"
|
||||
kind: postgresql
|
||||
metadata:
|
||||
name: acid-upgrade-test
|
||||
namespace: default
|
||||
spec:
|
||||
teamId: "acid"
|
||||
volume:
|
||||
size: 1Gi
|
||||
numberOfInstances: 2
|
||||
users:
|
||||
zalando: # database owner
|
||||
- superuser
|
||||
- createdb
|
||||
foo_user: [] # role for application foo
|
||||
databases:
|
||||
foo: zalando # dbname: owner
|
||||
preparedDatabases:
|
||||
bar: {}
|
||||
postgresql:
|
||||
version: "12"
|
||||
|
|
@ -83,15 +83,16 @@ type Cluster struct {
|
|||
deleteOptions metav1.DeleteOptions
|
||||
podEventsQueue *cache.FIFO
|
||||
|
||||
teamsAPIClient teams.Interface
|
||||
oauthTokenGetter OAuthTokenGetter
|
||||
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
||||
currentProcess Process
|
||||
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
|
||||
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
|
||||
ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
|
||||
EBSVolumes map[string]volumes.VolumeProperties
|
||||
VolumeResizer volumes.VolumeResizer
|
||||
teamsAPIClient teams.Interface
|
||||
oauthTokenGetter OAuthTokenGetter
|
||||
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
||||
currentProcess Process
|
||||
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
|
||||
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
|
||||
ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
|
||||
EBSVolumes map[string]volumes.VolumeProperties
|
||||
VolumeResizer volumes.VolumeResizer
|
||||
currentMajorVersion int
|
||||
}
|
||||
|
||||
type compareStatefulsetResult struct {
|
||||
|
|
@ -128,15 +129,16 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
|
|||
Secrets: make(map[types.UID]*v1.Secret),
|
||||
Services: make(map[PostgresRole]*v1.Service),
|
||||
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
|
||||
userSyncStrategy: users.DefaultUserSyncStrategy{PasswordEncryption: passwordEncryption},
|
||||
deleteOptions: metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy},
|
||||
podEventsQueue: podEventsQueue,
|
||||
KubeClient: kubeClient,
|
||||
userSyncStrategy: users.DefaultUserSyncStrategy{PasswordEncryption: passwordEncryption},
|
||||
deleteOptions: metav1.DeleteOptions{PropagationPolicy: &deletePropagationPolicy},
|
||||
podEventsQueue: podEventsQueue,
|
||||
KubeClient: kubeClient,
|
||||
currentMajorVersion: 0,
|
||||
}
|
||||
cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName())
|
||||
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
|
||||
cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName)
|
||||
cluster.patroni = patroni.New(cluster.logger)
|
||||
cluster.patroni = patroni.New(cluster.logger, nil)
|
||||
cluster.eventRecorder = eventRecorder
|
||||
|
||||
cluster.EBSVolumes = make(map[string]volumes.VolumeProperties)
|
||||
|
|
@ -359,7 +361,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
|
|||
}
|
||||
if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) {
|
||||
match = false
|
||||
reasons = append(reasons, "new statefulset's annotations does not match the current one")
|
||||
reasons = append(reasons, "new statefulset's annotations do not match the current one")
|
||||
}
|
||||
|
||||
needsRollUpdate, reasons = c.compareContainers("initContainers", c.Statefulset.Spec.Template.Spec.InitContainers, statefulSet.Spec.Template.Spec.InitContainers, needsRollUpdate, reasons)
|
||||
|
|
@ -614,17 +616,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
|
||||
logNiceDiff(c.logger, oldSpec, newSpec)
|
||||
|
||||
if oldSpec.Spec.PostgresqlParam.PgVersion > newSpec.Spec.PostgresqlParam.PgVersion {
|
||||
c.logger.Warningf("postgresql version change(%q -> %q) has no effect",
|
||||
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
|
||||
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect",
|
||||
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
|
||||
// we need that hack to generate statefulset with the old version
|
||||
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
|
||||
} else if oldSpec.Spec.PostgresqlParam.PgVersion < newSpec.Spec.PostgresqlParam.PgVersion {
|
||||
c.logger.Infof("postgresql version increased (%q -> %q), major version upgrade can be done manually after StatefulSet Sync",
|
||||
if IsBiggerPostgresVersion(oldSpec.Spec.PostgresqlParam.PgVersion, c.GetDesiredMajorVersion()) {
|
||||
c.logger.Infof("postgresql version increased (%s -> %s), depending on config manual upgrade needed",
|
||||
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
|
||||
syncStatetfulSet = true
|
||||
} else {
|
||||
c.logger.Infof("postgresql major version unchanged or smaller, no changes needed")
|
||||
// sticking with old version, this will also advance GetDesiredVersion next time.
|
||||
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
|
||||
}
|
||||
|
||||
// Service
|
||||
|
|
@ -781,6 +780,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
updateFailed = true
|
||||
}
|
||||
|
||||
if !updateFailed {
|
||||
// Major version upgrade must only fire after success of earlier operations and should stay last
|
||||
if err := c.majorVersionUpgrade(); err != nil {
|
||||
c.logger.Errorf("major version upgrade failed: %v", err)
|
||||
updateFailed = true
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -1302,7 +1309,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
|
|||
err = fmt.Errorf("could not get master pod label: %v", err)
|
||||
}
|
||||
} else {
|
||||
err = fmt.Errorf("could not switch over: %v", err)
|
||||
err = fmt.Errorf("could not switch over from %q to %q: %v", curMaster.Name, candidate, err)
|
||||
}
|
||||
|
||||
// signal the role label waiting goroutine to close the shop and go home
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import (
|
|||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
|
@ -734,7 +733,7 @@ func (c *Cluster) generateSpiloPodEnvVars(uid types.UID, spiloConfiguration stri
|
|||
},
|
||||
}
|
||||
if c.OpConfig.EnablePgVersionEnvVar {
|
||||
envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.Spec.PgVersion})
|
||||
envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.GetDesiredMajorVersion()})
|
||||
}
|
||||
// Spilo expects cluster labels as JSON
|
||||
if clusterLabels, err := json.Marshal(labels.Set(c.OpConfig.ClusterLabels)); err != nil {
|
||||
|
|
@ -1279,7 +1278,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
|
|||
}
|
||||
|
||||
stsAnnotations := make(map[string]string)
|
||||
stsAnnotations[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(false)
|
||||
stsAnnotations = c.AnnotationsToPropagate(c.annotationsSet(nil))
|
||||
|
||||
statefulSet := &appsv1.StatefulSet{
|
||||
|
|
|
|||
|
|
@ -0,0 +1,99 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/zalando/postgres-operator/pkg/spec"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// VersionMap Map of version numbers
|
||||
var VersionMap = map[string]int{
|
||||
"9.5": 90500,
|
||||
"9.6": 90600,
|
||||
"10": 100000,
|
||||
"11": 110000,
|
||||
"12": 120000,
|
||||
"13": 130000,
|
||||
}
|
||||
|
||||
// IsBiggerPostgresVersion Compare two Postgres version numbers
|
||||
func IsBiggerPostgresVersion(old string, new string) bool {
|
||||
oldN, _ := VersionMap[old]
|
||||
newN, _ := VersionMap[new]
|
||||
return newN > oldN
|
||||
}
|
||||
|
||||
// GetDesiredMajorVersionAsInt Convert string to comparable integer of PG version
|
||||
func (c *Cluster) GetDesiredMajorVersionAsInt() int {
|
||||
return VersionMap[c.GetDesiredMajorVersion()]
|
||||
}
|
||||
|
||||
// GetDesiredMajorVersion returns major version to use, incl. potential auto upgrade
|
||||
func (c *Cluster) GetDesiredMajorVersion() string {
|
||||
|
||||
if c.Config.OpConfig.MajorVersionUpgradeMode == "full" {
|
||||
// current is 9.5, minimal is 11 allowing 11 to 13 clusters, everything below is upgraded
|
||||
if IsBiggerPostgresVersion(c.Spec.PgVersion, c.Config.OpConfig.MinimalMajorVersion) {
|
||||
c.logger.Infof("overwriting configured major version %s to %s", c.Spec.PgVersion, c.Config.OpConfig.TargetMajorVersion)
|
||||
return c.Config.OpConfig.TargetMajorVersion
|
||||
}
|
||||
}
|
||||
|
||||
return c.Spec.PgVersion
|
||||
}
|
||||
|
||||
func (c *Cluster) majorVersionUpgrade() error {
|
||||
|
||||
if c.OpConfig.MajorVersionUpgradeMode == "off" {
|
||||
return nil
|
||||
}
|
||||
|
||||
desiredVersion := c.GetDesiredMajorVersionAsInt()
|
||||
|
||||
if c.currentMajorVersion >= desiredVersion {
|
||||
c.logger.Infof("cluster version up to date. current: %d desired: %d", c.currentMajorVersion, desiredVersion)
|
||||
return nil
|
||||
}
|
||||
|
||||
pods, err := c.listPods()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allRunning := true
|
||||
|
||||
var masterPod *v1.Pod
|
||||
|
||||
for _, pod := range pods {
|
||||
ps, _ := c.patroni.GetMemberData(&pod)
|
||||
|
||||
if ps.State != "running" {
|
||||
allRunning = false
|
||||
c.logger.Infof("identified non running pod, potentially skipping major version upgrade")
|
||||
}
|
||||
|
||||
if ps.Role == "master" {
|
||||
masterPod = &pod
|
||||
c.currentMajorVersion = ps.ServerVersion
|
||||
}
|
||||
}
|
||||
|
||||
numberOfPods := len(pods)
|
||||
if allRunning && masterPod != nil {
|
||||
c.logger.Infof("healthy cluster ready to upgrade, current: %d desired: %d", c.currentMajorVersion, desiredVersion)
|
||||
if c.currentMajorVersion < desiredVersion {
|
||||
podName := &spec.NamespacedName{Namespace: masterPod.Namespace, Name: masterPod.Name}
|
||||
c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
|
||||
upgradeCommand := fmt.Sprintf("/usr/bin/python3 /scripts/inplace_upgrade.py %d 2>&1 | tee last_upgrade.log", numberOfPods)
|
||||
result, err := c.ExecCommand(podName, "/bin/su", "postgres", "-c", upgradeCommand)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Infof("upgrade action triggered and command completed: %s", result[:50])
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -4,14 +4,17 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
||||
"github.com/zalando/postgres-operator/pkg/spec"
|
||||
"github.com/zalando/postgres-operator/pkg/util"
|
||||
"github.com/zalando/postgres-operator/pkg/util/patroni"
|
||||
"github.com/zalando/postgres-operator/pkg/util/retryutil"
|
||||
)
|
||||
|
||||
|
|
@ -45,6 +48,64 @@ func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) {
|
|||
return pods.Items, nil
|
||||
}
|
||||
|
||||
// markRollingUpdateFlagForPod sets the indicator for the rolling update requirement
|
||||
// in the Pod annotation.
|
||||
func (c *Cluster) markRollingUpdateFlagForPod(pod *v1.Pod, msg string) error {
|
||||
// no need to patch pod if annotation is already there
|
||||
if c.getRollingUpdateFlagFromPod(pod) {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.logger.Debugf("mark rolling update annotation for %s: reason %s", pod.Name, msg)
|
||||
flag := make(map[string]string)
|
||||
flag[rollingUpdatePodAnnotationKey] = strconv.FormatBool(true)
|
||||
|
||||
patchData, err := metaAnnotationsPatch(flag)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not form patch for pod's rolling update flag: %v", err)
|
||||
}
|
||||
|
||||
err = retryutil.Retry(1*time.Second, 5*time.Second,
|
||||
func() (bool, error) {
|
||||
_, err2 := c.KubeClient.Pods(pod.Namespace).Patch(
|
||||
context.TODO(),
|
||||
pod.Name,
|
||||
types.MergePatchType,
|
||||
[]byte(patchData),
|
||||
metav1.PatchOptions{},
|
||||
"")
|
||||
if err2 != nil {
|
||||
return false, err2
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not patch pod rolling update flag %q: %v", patchData, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getRollingUpdateFlagFromPod returns the value of the rollingUpdate flag from the given pod
|
||||
func (c *Cluster) getRollingUpdateFlagFromPod(pod *v1.Pod) (flag bool) {
|
||||
anno := pod.GetAnnotations()
|
||||
flag = false
|
||||
|
||||
stringFlag, exists := anno[rollingUpdatePodAnnotationKey]
|
||||
if exists {
|
||||
var err error
|
||||
c.logger.Debugf("found rolling update flag on pod %q", pod.Name)
|
||||
if flag, err = strconv.ParseBool(stringFlag); err != nil {
|
||||
c.logger.Warnf("error when parsing %q annotation for the pod %q: expected boolean value, got %q\n",
|
||||
rollingUpdatePodAnnotationKey,
|
||||
types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name},
|
||||
stringFlag)
|
||||
}
|
||||
}
|
||||
|
||||
return flag
|
||||
}
|
||||
|
||||
func (c *Cluster) deletePods() error {
|
||||
c.logger.Debugln("deleting pods")
|
||||
pods, err := c.listPods()
|
||||
|
|
@ -281,7 +342,18 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
|||
defer c.unregisterPodSubscriber(podName)
|
||||
stopChan := make(chan struct{})
|
||||
|
||||
if err := c.KubeClient.Pods(podName.Namespace).Delete(context.TODO(), podName.Name, c.deleteOptions); err != nil {
|
||||
err := retryutil.Retry(1*time.Second, 5*time.Second,
|
||||
func() (bool, error) {
|
||||
err2 := c.KubeClient.Pods(podName.Namespace).Delete(
|
||||
context.TODO(),
|
||||
podName.Name,
|
||||
c.deleteOptions)
|
||||
if err2 != nil {
|
||||
return false, err2
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not delete pod: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -296,7 +368,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
|||
return pod, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
|
||||
func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool {
|
||||
|
||||
/*
|
||||
Operator should not re-create pods if there is at least one replica being bootstrapped
|
||||
|
|
@ -305,21 +377,18 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
|
|||
XXX operator cannot forbid replica re-init, so we might still fail if re-init is started
|
||||
after this check succeeds but before a pod is re-created
|
||||
*/
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
for _, pod := range pods {
|
||||
c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP)
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
for _, pod := range pods {
|
||||
|
||||
var state string
|
||||
var data patroni.MemberData
|
||||
|
||||
err := retryutil.Retry(1*time.Second, 5*time.Second,
|
||||
func() (bool, error) {
|
||||
|
||||
var err error
|
||||
|
||||
state, err = c.patroni.GetPatroniMemberState(&pod)
|
||||
data, err = c.patroni.GetMemberData(&pod)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
|
@ -331,51 +400,43 @@ func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
|
|||
if err != nil {
|
||||
c.logger.Errorf("failed to get Patroni state for pod: %s", err)
|
||||
return false
|
||||
} else if state == "creating replica" {
|
||||
} else if data.State == "creating replica" {
|
||||
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
|
||||
return false
|
||||
}
|
||||
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Cluster) recreatePods() error {
|
||||
func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error {
|
||||
c.setProcessName("starting to recreate pods")
|
||||
ls := c.labelsSet(false)
|
||||
namespace := c.Namespace
|
||||
|
||||
listOptions := metav1.ListOptions{
|
||||
LabelSelector: ls.String(),
|
||||
}
|
||||
|
||||
pods, err := c.KubeClient.Pods(namespace).List(context.TODO(), listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get the list of pods: %v", err)
|
||||
}
|
||||
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))
|
||||
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods))
|
||||
|
||||
if !c.isSafeToRecreatePods(pods) {
|
||||
return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized")
|
||||
}
|
||||
|
||||
var (
|
||||
masterPod, newMasterPod, newPod *v1.Pod
|
||||
masterPod, newMasterPod *v1.Pod
|
||||
)
|
||||
replicas := make([]spec.NamespacedName, 0)
|
||||
for i, pod := range pods.Items {
|
||||
replicas := switchoverCandidates
|
||||
|
||||
for i, pod := range pods {
|
||||
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
|
||||
|
||||
if role == Master {
|
||||
masterPod = &pods.Items[i]
|
||||
masterPod = &pods[i]
|
||||
continue
|
||||
}
|
||||
|
||||
podName := util.NameFromMeta(pods.Items[i].ObjectMeta)
|
||||
if newPod, err = c.recreatePod(podName); err != nil {
|
||||
podName := util.NameFromMeta(pod.ObjectMeta)
|
||||
newPod, err := c.recreatePod(podName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
|
||||
}
|
||||
if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica {
|
||||
|
||||
newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel])
|
||||
if newRole == Replica {
|
||||
replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta))
|
||||
} else if newRole == Master {
|
||||
newMasterPod = newPod
|
||||
|
|
@ -383,7 +444,9 @@ func (c *Cluster) recreatePods() error {
|
|||
}
|
||||
|
||||
if masterPod != nil {
|
||||
// failover if we have not observed a master pod when re-creating former replicas.
|
||||
// switchover if
|
||||
// 1. we have not observed a new master pod when re-creating former replicas
|
||||
// 2. we know possible switchover targets even when no replicas were recreated
|
||||
if newMasterPod == nil && len(replicas) > 0 {
|
||||
if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil {
|
||||
c.logger.Warningf("could not perform switch over: %v", err)
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
rollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update-required"
|
||||
rollingUpdatePodAnnotationKey = "zalando-postgres-operator-rolling-update-required"
|
||||
)
|
||||
|
||||
func (c *Cluster) listResources() error {
|
||||
|
|
@ -147,79 +147,6 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// setRollingUpdateFlagForStatefulSet sets the indicator or the rolling update requirement
|
||||
// in the StatefulSet annotation.
|
||||
func (c *Cluster) setRollingUpdateFlagForStatefulSet(sset *appsv1.StatefulSet, val bool, msg string) {
|
||||
anno := sset.GetAnnotations()
|
||||
if anno == nil {
|
||||
anno = make(map[string]string)
|
||||
}
|
||||
|
||||
anno[rollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val)
|
||||
sset.SetAnnotations(anno)
|
||||
c.logger.Debugf("set statefulset's rolling update annotation to %t: caller/reason %s", val, msg)
|
||||
}
|
||||
|
||||
// applyRollingUpdateFlagforStatefulSet sets the rolling update flag for the cluster's StatefulSet
|
||||
// and applies that setting to the actual running cluster.
|
||||
func (c *Cluster) applyRollingUpdateFlagforStatefulSet(val bool) error {
|
||||
c.setRollingUpdateFlagForStatefulSet(c.Statefulset, val, "applyRollingUpdateFlag")
|
||||
sset, err := c.updateStatefulSetAnnotations(c.Statefulset.GetAnnotations())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Statefulset = sset
|
||||
return nil
|
||||
}
|
||||
|
||||
// getRollingUpdateFlagFromStatefulSet returns the value of the rollingUpdate flag from the passed
|
||||
// StatefulSet, reverting to the default value in case of errors
|
||||
func (c *Cluster) getRollingUpdateFlagFromStatefulSet(sset *appsv1.StatefulSet, defaultValue bool) (flag bool) {
|
||||
anno := sset.GetAnnotations()
|
||||
flag = defaultValue
|
||||
|
||||
stringFlag, exists := anno[rollingUpdateStatefulsetAnnotationKey]
|
||||
if exists {
|
||||
var err error
|
||||
if flag, err = strconv.ParseBool(stringFlag); err != nil {
|
||||
c.logger.Warnf("error when parsing %q annotation for the statefulset %q: expected boolean value, got %q\n",
|
||||
rollingUpdateStatefulsetAnnotationKey,
|
||||
types.NamespacedName{Namespace: sset.Namespace, Name: sset.Name},
|
||||
stringFlag)
|
||||
flag = defaultValue
|
||||
}
|
||||
}
|
||||
return flag
|
||||
}
|
||||
|
||||
// mergeRollingUpdateFlagUsingCache returns the value of the rollingUpdate flag from the passed
|
||||
// statefulset, however, the value can be cleared if there is a cached flag in the cluster that
|
||||
// is set to false (the discrepancy could be a result of a failed StatefulSet update)
|
||||
func (c *Cluster) mergeRollingUpdateFlagUsingCache(runningStatefulSet *appsv1.StatefulSet) bool {
|
||||
var (
|
||||
cachedStatefulsetExists, clearRollingUpdateFromCache, podsRollingUpdateRequired bool
|
||||
)
|
||||
|
||||
if c.Statefulset != nil {
|
||||
// if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update
|
||||
// the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying
|
||||
// on the 'cached' in-memory flag.
|
||||
cachedStatefulsetExists = true
|
||||
clearRollingUpdateFromCache = !c.getRollingUpdateFlagFromStatefulSet(c.Statefulset, true)
|
||||
c.logger.Debugf("cached StatefulSet value exists, rollingUpdate flag is %t", clearRollingUpdateFromCache)
|
||||
}
|
||||
|
||||
if podsRollingUpdateRequired = c.getRollingUpdateFlagFromStatefulSet(runningStatefulSet, false); podsRollingUpdateRequired {
|
||||
if cachedStatefulsetExists && clearRollingUpdateFromCache {
|
||||
c.logger.Infof("clearing the rolling update flag based on the cached information")
|
||||
podsRollingUpdateRequired = false
|
||||
} else {
|
||||
c.logger.Infof("found a statefulset with an unfinished rolling update of the pods")
|
||||
}
|
||||
}
|
||||
return podsRollingUpdateRequired
|
||||
}
|
||||
|
||||
func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*appsv1.StatefulSet, error) {
|
||||
c.logger.Debugf("patching statefulset annotations")
|
||||
patchData, err := metaAnnotationsPatch(annotations)
|
||||
|
|
@ -237,8 +164,8 @@ func (c *Cluster) updateStatefulSetAnnotations(annotations map[string]string) (*
|
|||
return nil, fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err)
|
||||
}
|
||||
return result, nil
|
||||
|
||||
}
|
||||
|
||||
func (c *Cluster) updateStatefulSet(newStatefulSet *appsv1.StatefulSet) error {
|
||||
c.setProcessName("updating statefulset")
|
||||
if c.Statefulset == nil {
|
||||
|
|
|
|||
|
|
@ -118,6 +118,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
|||
return fmt.Errorf("could not sync connection pooler: %v", err)
|
||||
}
|
||||
|
||||
// Major version upgrade must only run after success of all earlier operations, must remain last item in sync
|
||||
if err := c.majorVersionUpgrade(); err != nil {
|
||||
c.logger.Errorf("major version upgrade failed: %v", err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -278,22 +283,24 @@ func (c *Cluster) mustUpdatePodsAfterLazyUpdate(desiredSset *appsv1.StatefulSet)
|
|||
}
|
||||
|
||||
func (c *Cluster) syncStatefulSet() error {
|
||||
var (
|
||||
podsRollingUpdateRequired bool
|
||||
)
|
||||
|
||||
podsToRecreate := make([]v1.Pod, 0)
|
||||
switchoverCandidates := make([]spec.NamespacedName, 0)
|
||||
|
||||
pods, err := c.listPods()
|
||||
if err != nil {
|
||||
c.logger.Infof("could not list pods of the statefulset: %v", err)
|
||||
}
|
||||
|
||||
// NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early.
|
||||
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(context.TODO(), c.statefulSetName(), metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if !k8sutil.ResourceNotFound(err) {
|
||||
return fmt.Errorf("could not get statefulset: %v", err)
|
||||
return fmt.Errorf("error during reading of statefulset: %v", err)
|
||||
}
|
||||
// statefulset does not exist, try to re-create it
|
||||
c.Statefulset = nil
|
||||
c.logger.Infof("could not find the cluster's statefulset")
|
||||
pods, err := c.listPods()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not list pods of the statefulset: %v", err)
|
||||
}
|
||||
c.logger.Infof("cluster's statefulset does not exist")
|
||||
|
||||
sset, err = c.createStatefulSet()
|
||||
if err != nil {
|
||||
|
|
@ -304,41 +311,63 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
return fmt.Errorf("cluster is not ready: %v", err)
|
||||
}
|
||||
|
||||
podsRollingUpdateRequired = (len(pods) > 0)
|
||||
if podsRollingUpdateRequired {
|
||||
c.logger.Warningf("found pods from the previous statefulset: trigger rolling update")
|
||||
if err := c.applyRollingUpdateFlagforStatefulSet(podsRollingUpdateRequired); err != nil {
|
||||
return fmt.Errorf("could not set rolling update flag for the statefulset: %v", err)
|
||||
if len(pods) > 0 {
|
||||
for _, pod := range pods {
|
||||
if err = c.markRollingUpdateFlagForPod(&pod, "pod from previous statefulset"); err != nil {
|
||||
c.logger.Warnf("marking old pod for rolling update failed: %v", err)
|
||||
}
|
||||
podsToRecreate = append(podsToRecreate, pod)
|
||||
}
|
||||
}
|
||||
c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
|
||||
|
||||
} else {
|
||||
podsRollingUpdateRequired = c.mergeRollingUpdateFlagUsingCache(sset)
|
||||
// check if there are still pods with a rolling update flag
|
||||
for _, pod := range pods {
|
||||
if c.getRollingUpdateFlagFromPod(&pod) {
|
||||
podsToRecreate = append(podsToRecreate, pod)
|
||||
} else {
|
||||
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
|
||||
if role == Master {
|
||||
continue
|
||||
}
|
||||
switchoverCandidates = append(switchoverCandidates, util.NameFromMeta(pod.ObjectMeta))
|
||||
}
|
||||
}
|
||||
|
||||
if len(podsToRecreate) > 0 {
|
||||
c.logger.Debugf("%d / %d pod(s) still need to be rotated", len(podsToRecreate), len(pods))
|
||||
}
|
||||
|
||||
// statefulset is already there, make sure we use its definition in order to compare with the spec.
|
||||
c.Statefulset = sset
|
||||
|
||||
desiredSS, err := c.generateStatefulSet(&c.Spec)
|
||||
desiredSts, err := c.generateStatefulSet(&c.Spec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not generate statefulset: %v", err)
|
||||
}
|
||||
c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "from cache")
|
||||
|
||||
cmp := c.compareStatefulSetWith(desiredSS)
|
||||
cmp := c.compareStatefulSetWith(desiredSts)
|
||||
if !cmp.match {
|
||||
if cmp.rollingUpdate && !podsRollingUpdateRequired {
|
||||
podsRollingUpdateRequired = true
|
||||
c.setRollingUpdateFlagForStatefulSet(desiredSS, podsRollingUpdateRequired, "statefulset changes")
|
||||
if cmp.rollingUpdate {
|
||||
podsToRecreate = make([]v1.Pod, 0)
|
||||
switchoverCandidates = make([]spec.NamespacedName, 0)
|
||||
for _, pod := range pods {
|
||||
if err = c.markRollingUpdateFlagForPod(&pod, "pod changes"); err != nil {
|
||||
return fmt.Errorf("updating rolling update flag for pod failed: %v", err)
|
||||
}
|
||||
podsToRecreate = append(podsToRecreate, pod)
|
||||
}
|
||||
}
|
||||
|
||||
c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons)
|
||||
c.logStatefulSetChanges(c.Statefulset, desiredSts, false, cmp.reasons)
|
||||
|
||||
if !cmp.replace {
|
||||
if err := c.updateStatefulSet(desiredSS); err != nil {
|
||||
if err := c.updateStatefulSet(desiredSts); err != nil {
|
||||
return fmt.Errorf("could not update statefulset: %v", err)
|
||||
}
|
||||
} else {
|
||||
if err := c.replaceStatefulSet(desiredSS); err != nil {
|
||||
if err := c.replaceStatefulSet(desiredSts); err != nil {
|
||||
return fmt.Errorf("could not replace statefulset: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -346,18 +375,30 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
|
||||
c.updateStatefulSetAnnotations(c.AnnotationsToPropagate(c.annotationsSet(c.Statefulset.Annotations)))
|
||||
|
||||
if !podsRollingUpdateRequired && !c.OpConfig.EnableLazySpiloUpgrade {
|
||||
// even if desired and actual statefulsets match
|
||||
if len(podsToRecreate) == 0 && !c.OpConfig.EnableLazySpiloUpgrade {
|
||||
// even if the desired and the running statefulsets match
|
||||
// there still may be not up-to-date pods on condition
|
||||
// (a) the lazy update was just disabled
|
||||
// and
|
||||
// (b) some of the pods were not restarted when the lazy update was still in place
|
||||
podsRollingUpdateRequired, err = c.mustUpdatePodsAfterLazyUpdate(desiredSS)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not list pods of the statefulset: %v", err)
|
||||
for _, pod := range pods {
|
||||
effectivePodImage := pod.Spec.Containers[0].Image
|
||||
stsImage := desiredSts.Spec.Template.Spec.Containers[0].Image
|
||||
|
||||
if stsImage != effectivePodImage {
|
||||
if err = c.markRollingUpdateFlagForPod(&pod, "pod not yet restarted due to lazy update"); err != nil {
|
||||
c.logger.Warnf("updating rolling update flag failed for pod %q: %v", pod.Name, err)
|
||||
}
|
||||
podsToRecreate = append(podsToRecreate, pod)
|
||||
} else {
|
||||
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
|
||||
if role == Master {
|
||||
continue
|
||||
}
|
||||
switchoverCandidates = append(switchoverCandidates, util.NameFromMeta(pod.ObjectMeta))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Apply special PostgreSQL parameters that can only be set via the Patroni API.
|
||||
|
|
@ -369,17 +410,13 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
|
||||
// if we get here we also need to re-create the pods (either leftovers from the old
|
||||
// statefulset or those that got their configuration from the outdated statefulset)
|
||||
if podsRollingUpdateRequired {
|
||||
if len(podsToRecreate) > 0 {
|
||||
c.logger.Debugln("performing rolling update")
|
||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
|
||||
if err := c.recreatePods(); err != nil {
|
||||
if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil {
|
||||
return fmt.Errorf("could not recreate pods: %v", err)
|
||||
}
|
||||
c.logger.Infof("pods have been recreated")
|
||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
|
||||
if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil {
|
||||
c.logger.Warningf("could not clear rolling update for the statefulset: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -471,7 +508,7 @@ func (c *Cluster) syncSecrets() error {
|
|||
for secretUsername, secretSpec := range secrets {
|
||||
if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Create(context.TODO(), secretSpec, metav1.CreateOptions{}); err == nil {
|
||||
c.Secrets[secret.UID] = secret
|
||||
c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID)
|
||||
c.logger.Debugf("created new secret %s, uid: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID)
|
||||
continue
|
||||
}
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
|
|
@ -480,7 +517,7 @@ func (c *Cluster) syncSecrets() error {
|
|||
return fmt.Errorf("could not get current secret: %v", err)
|
||||
}
|
||||
if secretUsername != string(secret.Data["username"]) {
|
||||
c.logger.Errorf("secret %s does not contain the role %q", secretSpec.Name, secretUsername)
|
||||
c.logger.Errorf("secret %s does not contain the role %s", secretSpec.Name, secretUsername)
|
||||
continue
|
||||
}
|
||||
c.Secrets[secret.UID] = secret
|
||||
|
|
@ -499,7 +536,7 @@ func (c *Cluster) syncSecrets() error {
|
|||
if pwdUser.Password != string(secret.Data["password"]) &&
|
||||
pwdUser.Origin == spec.RoleOriginInfrastructure {
|
||||
|
||||
c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name)
|
||||
c.logger.Debugf("updating the secret %s from the infrastructure roles", secretSpec.Name)
|
||||
if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(context.TODO(), secretSpec, metav1.UpdateOptions{}); err != nil {
|
||||
return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err)
|
||||
}
|
||||
|
|
@ -509,7 +546,7 @@ func (c *Cluster) syncSecrets() error {
|
|||
userMap[secretUsername] = pwdUser
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err)
|
||||
return fmt.Errorf("could not create secret for user %s: %v", secretUsername, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -199,10 +199,10 @@ func (c *Controller) processEvent(event ClusterEvent) {
|
|||
if event.EventType == EventRepair {
|
||||
runRepair, lastOperationStatus := cl.NeedsRepair()
|
||||
if !runRepair {
|
||||
lg.Debugf("Observed cluster status %s, repair is not required", lastOperationStatus)
|
||||
lg.Debugf("observed cluster status %s, repair is not required", lastOperationStatus)
|
||||
return
|
||||
}
|
||||
lg.Debugf("Observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus)
|
||||
lg.Debugf("observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus)
|
||||
event.EventType = EventSync
|
||||
}
|
||||
|
||||
|
|
@ -217,7 +217,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
|
|||
}
|
||||
|
||||
if err := c.submitRBACCredentials(event); err != nil {
|
||||
c.logger.Warnf("Pods and/or Patroni may misfunction due to the lack of permissions: %v", err)
|
||||
c.logger.Warnf("pods and/or Patroni may misfunction due to the lack of permissions: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -225,7 +225,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
|
|||
switch event.EventType {
|
||||
case EventAdd:
|
||||
if clusterFound {
|
||||
lg.Infof("Recieved add event for already existing Postgres cluster")
|
||||
lg.Infof("recieved add event for already existing Postgres cluster")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -348,11 +348,11 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
|
|||
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
|
||||
|
||||
deprecate := func(deprecated, replacement string) {
|
||||
c.logger.Warningf("Parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
|
||||
c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
|
||||
}
|
||||
|
||||
noeffect := func(param string, explanation string) {
|
||||
c.logger.Warningf("Parameter %q takes no effect. %s", param, explanation)
|
||||
c.logger.Warningf("parameter %q takes no effect. %s", param, explanation)
|
||||
}
|
||||
|
||||
if spec.UseLoadBalancer != nil {
|
||||
|
|
@ -368,7 +368,7 @@ func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.Postg
|
|||
|
||||
if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
|
||||
(spec.EnableReplicaLoadBalancer != nil || spec.EnableMasterLoadBalancer != nil) {
|
||||
c.logger.Warnf("Both old and new load balancer parameters are present in the manifest, ignoring old ones")
|
||||
c.logger.Warnf("both old and new load balancer parameters are present in the manifest, ignoring old ones")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -480,3 +480,45 @@ func TestInfrastructureRoleDefinitions(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
type SubConfig struct {
|
||||
teammap map[string]string
|
||||
}
|
||||
|
||||
type SuperConfig struct {
|
||||
sub SubConfig
|
||||
}
|
||||
|
||||
func TestUnderstandingMapsAndReferences(t *testing.T) {
|
||||
teams := map[string]string{"acid": "Felix"}
|
||||
|
||||
sc := SubConfig{
|
||||
teammap: teams,
|
||||
}
|
||||
|
||||
ssc := SuperConfig{
|
||||
sub: sc,
|
||||
}
|
||||
|
||||
teams["24x7"] = "alex"
|
||||
|
||||
if len(ssc.sub.teammap) != 2 {
|
||||
t.Errorf("Team Map does not contain 2 elements")
|
||||
}
|
||||
|
||||
ssc.sub.teammap["teapot"] = "Mikkel"
|
||||
|
||||
if len(teams) != 3 {
|
||||
t.Errorf("Team Map does not contain 3 elements")
|
||||
}
|
||||
|
||||
teams = make(map[string]string)
|
||||
|
||||
if len(ssc.sub.teammap) != 3 {
|
||||
t.Errorf("Team Map does not contain 0 elements")
|
||||
}
|
||||
|
||||
if &teams == &(ssc.sub.teammap) {
|
||||
t.Errorf("Identical maps")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -206,6 +206,10 @@ type Config struct {
|
|||
EnableLazySpiloUpgrade bool `name:"enable_lazy_spilo_upgrade" default:"false"`
|
||||
EnablePgVersionEnvVar bool `name:"enable_pgversion_env_var" default:"true"`
|
||||
EnableSpiloWalPathCompat bool `name:"enable_spilo_wal_path_compat" default:"false"`
|
||||
MajorVersionUpgradeMode string `name:"major_version_upgrade_mode" default:"off"` // off - no actions, manual - manifest triggers action, full - manifest and minimal version violation trigger upgrade
|
||||
MinimalMajorVersion string `name:"minimal_major_version" default:"9.5"`
|
||||
TargetMajorVersion string `name:"target_major_version" default:"13"`
|
||||
AllowedMajorUpgradeVersions []string `name:"allowed_major_upgrade_versions" default:"12,13"`
|
||||
}
|
||||
|
||||
// MustMarshal marshals the config or panics
|
||||
|
|
|
|||
|
|
@ -0,0 +1,11 @@
|
|||
package httpclient
|
||||
|
||||
//go:generate mockgen -package mocks -destination=$PWD/mocks/$GOFILE -source=$GOFILE -build_flags=-mod=vendor
|
||||
|
||||
import "net/http"
|
||||
|
||||
// HTTPClient interface
|
||||
type HTTPClient interface {
|
||||
Do(req *http.Request) (*http.Response, error)
|
||||
Get(url string) (resp *http.Response, err error)
|
||||
}
|
||||
|
|
@ -3,7 +3,6 @@ package patroni
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
|
|
@ -11,6 +10,8 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
httpclient "github.com/zalando/postgres-operator/pkg/util/httpclient"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
|
@ -26,24 +27,28 @@ const (
|
|||
type Interface interface {
|
||||
Switchover(master *v1.Pod, candidate string) error
|
||||
SetPostgresParameters(server *v1.Pod, options map[string]string) error
|
||||
GetPatroniMemberState(pod *v1.Pod) (string, error)
|
||||
GetMemberData(server *v1.Pod) (MemberData, error)
|
||||
}
|
||||
|
||||
// Patroni API client
|
||||
type Patroni struct {
|
||||
httpClient *http.Client
|
||||
httpClient httpclient.HTTPClient
|
||||
logger *logrus.Entry
|
||||
}
|
||||
|
||||
// New create patroni
|
||||
func New(logger *logrus.Entry) *Patroni {
|
||||
cl := http.Client{
|
||||
Timeout: timeout,
|
||||
func New(logger *logrus.Entry, client httpclient.HTTPClient) *Patroni {
|
||||
if client == nil {
|
||||
|
||||
client = &http.Client{
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return &Patroni{
|
||||
logger: logger,
|
||||
httpClient: &cl,
|
||||
httpClient: client,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -68,7 +73,9 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer)
|
|||
return fmt.Errorf("could not create request: %v", err)
|
||||
}
|
||||
|
||||
p.logger.Debugf("making %s http request: %s", method, request.URL.String())
|
||||
if p.logger != nil {
|
||||
p.logger.Debugf("making %s http request: %s", method, request.URL.String())
|
||||
}
|
||||
|
||||
resp, err := p.httpClient.Do(request)
|
||||
if err != nil {
|
||||
|
|
@ -126,35 +133,45 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
|
|||
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
|
||||
}
|
||||
|
||||
//GetPatroniMemberState returns a state of member of a Patroni cluster
|
||||
func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) {
|
||||
// MemberDataPatroni child element
|
||||
type MemberDataPatroni struct {
|
||||
Version string `json:"version"`
|
||||
Scope string `json:"scope"`
|
||||
}
|
||||
|
||||
// MemberData Patroni member data from Patroni API
|
||||
type MemberData struct {
|
||||
State string `json:"state"`
|
||||
Role string `json:"role"`
|
||||
ServerVersion int `json:"server_version"`
|
||||
PendingRestart bool `json:"pending_restart"`
|
||||
ClusterUnlocked bool `json:"cluster_unlocked"`
|
||||
Patroni MemberDataPatroni `json:"patroni"`
|
||||
}
|
||||
|
||||
// GetMemberData read member data from patroni API
|
||||
func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) {
|
||||
|
||||
apiURLString, err := apiURL(server)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return MemberData{}, err
|
||||
}
|
||||
response, err := p.httpClient.Get(apiURLString)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not perform Get request: %v", err)
|
||||
return MemberData{}, fmt.Errorf("could not perform Get request: %v", err)
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not read response: %v", err)
|
||||
return MemberData{}, fmt.Errorf("could not read response: %v", err)
|
||||
}
|
||||
|
||||
data := make(map[string]interface{})
|
||||
data := MemberData{}
|
||||
err = json.Unmarshal(body, &data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return MemberData{}, err
|
||||
}
|
||||
|
||||
state, ok := data["state"].(string)
|
||||
if !ok {
|
||||
return "", errors.New("Patroni Get call response contains wrong type for 'state' field")
|
||||
}
|
||||
|
||||
return state, nil
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,17 @@
|
|||
package patroni
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"k8s.io/api/core/v1"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/zalando/postgres-operator/mocks"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
func newMockPod(ip string) *v1.Pod {
|
||||
|
|
@ -72,3 +79,32 @@ func TestApiURL(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPatroniAPI(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
json := `{"state": "running", "postmaster_start_time": "2021-02-19 14:31:50.053 CET", "role": "master", "server_version": 90621, "cluster_unlocked": false, "xlog": {"location": 55978296057856}, "timeline": 6, "database_system_identifier": "6462555844314089962", "pending_restart": true, "patroni": {"version": "2.0.1", "scope": "acid-rest92-standby"}}`
|
||||
r := ioutil.NopCloser(bytes.NewReader([]byte(json)))
|
||||
|
||||
response := http.Response{
|
||||
Status: "200",
|
||||
Body: r,
|
||||
}
|
||||
|
||||
mockClient := mocks.NewMockHTTPClient(ctrl)
|
||||
mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil)
|
||||
|
||||
p := New(nil, mockClient)
|
||||
|
||||
pod := v1.Pod{
|
||||
Status: v1.PodStatus{
|
||||
PodIP: "192.168.100.1",
|
||||
},
|
||||
}
|
||||
_, err := p.GetMemberData(&pod)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Could not read Patroni data: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue