resolve conflicts

This commit is contained in:
Felix Kunde 2022-02-25 17:50:51 +01:00
commit 22d3c72eb3
14 changed files with 315 additions and 248 deletions

View File

@ -343,6 +343,8 @@ spec:
type: boolean type: boolean
synchronous_mode_strict: synchronous_mode_strict:
type: boolean type: boolean
synchronous_node_count:
type: integer
ttl: ttl:
type: integer type: integer
podAnnotations: podAnnotations:

View File

@ -306,10 +306,10 @@ The interval of days can be set with `password_rotation_interval` (default
are replaced in the K8s secret. They belong to a newly created user named after are replaced in the K8s secret. They belong to a newly created user named after
the original role plus rotation date in YYMMDD format. All priviliges are the original role plus rotation date in YYMMDD format. All priviliges are
inherited meaning that migration scripts should still grant and revoke rights inherited meaning that migration scripts should still grant and revoke rights
against the original role. The timestamp of the next rotation is written to the against the original role. The timestamp of the next rotation (in RFC 3339
secret as well. Note, if the rotation interval is decreased it is reflected in format, UTC timezone) is written to the secret as well. Note, if the rotation
the secrets only if the next rotation date is more days away than the new interval is decreased it is reflected in the secrets only if the next rotation
length of the interval. date is more days away than the new length of the interval.
Pods still using the previous secret values which they keep in memory continue Pods still using the previous secret values which they keep in memory continue
to connect to the database since the password of the corresponding user is not to connect to the database since the password of the corresponding user is not

View File

@ -303,6 +303,9 @@ explanation of `ttl` and `loop_wait` parameters.
* **synchronous_mode_strict** * **synchronous_mode_strict**
Patroni `synchronous_mode_strict` parameter value. Can be used in addition to `synchronous_mode`. The default is set to `false`. Optional. Patroni `synchronous_mode_strict` parameter value. Can be used in addition to `synchronous_mode`. The default is set to `false`. Optional.
* **synchronous_node_count**
Patroni `synchronous_node_count` parameter value. Note, this option is only available for Spilo images with Patroni 2.0+. The default is set to `1`. Optional.
## Postgres container resources ## Postgres container resources
Those parameters define [CPU and memory requests and limits](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/) Those parameters define [CPU and memory requests and limits](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/)

View File

@ -205,7 +205,7 @@ class EndToEndTestCase(unittest.TestCase):
"enable_team_member_deprecation": "true", "enable_team_member_deprecation": "true",
"role_deletion_suffix": "_delete_me", "role_deletion_suffix": "_delete_me",
"resync_period": "15s", "resync_period": "15s",
"repair_period": "10s", "repair_period": "15s",
}, },
} }
k8s.update_config(enable_postgres_team_crd) k8s.update_config(enable_postgres_team_crd)
@ -296,6 +296,133 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"},
"Operator does not get in sync") "Operator does not get in sync")
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_config_update(self):
'''
Change Postgres config under Spec.Postgresql.Parameters and Spec.Patroni
and query Patroni config endpoint to check if manifest changes got applied
via restarting cluster through Patroni's rest api
'''
k8s = self.k8s
leader = k8s.get_cluster_leader_pod()
replica = k8s.get_cluster_replica_pod()
masterCreationTimestamp = leader.metadata.creation_timestamp
replicaCreationTimestamp = replica.metadata.creation_timestamp
new_max_connections_value = "50"
# adjust Postgres config
pg_patch_config = {
"spec": {
"postgresql": {
"parameters": {
"max_connections": new_max_connections_value
}
},
"patroni": {
"slots": {
"test_slot": {
"type": "physical"
}
},
"ttl": 29,
"loop_wait": 9,
"retry_timeout": 9,
"synchronous_mode": True
}
}
}
try:
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_config)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
def compare_config():
effective_config = k8s.patroni_rest(leader.metadata.name, "config")
desired_config = pg_patch_config["spec"]["patroni"]
desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"]
effective_parameters = effective_config["postgresql"]["parameters"]
self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"],
"max_connections not updated")
self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added")
self.assertEqual(desired_config["ttl"], effective_config["ttl"],
"ttl not updated")
self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"],
"loop_wait not updated")
self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"],
"retry_timeout not updated")
self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"],
"synchronous_mode not updated")
return True
# check if Patroni config has been updated
self.eventuallyTrue(compare_config, "Postgres config not applied")
# make sure that pods were not recreated
leader = k8s.get_cluster_leader_pod()
replica = k8s.get_cluster_replica_pod()
self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp,
"Master pod creation timestamp is updated")
self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp,
"Master pod creation timestamp is updated")
# query max_connections setting
setting_query = """
SELECT setting
FROM pg_settings
WHERE name = 'max_connections';
"""
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value,
"New max_connections setting not applied on master", 10, 5)
self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value,
"Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5)
# the next sync should restart the replica because it has pending_restart flag set
# force next sync by deleting the operator pod
k8s.delete_operator_pod()
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value,
"New max_connections setting not applied on replica", 10, 5)
# decrease max_connections again
# this time restart will be correct and new value should appear on both instances
lower_max_connections_value = "30"
pg_patch_max_connections = {
"spec": {
"postgresql": {
"parameters": {
"max_connections": lower_max_connections_value
}
}
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# check Patroni config again
pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value
self.eventuallyTrue(compare_config, "Postgres config not applied")
# and query max_connections setting again
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value,
"Previous max_connections setting not applied on master", 10, 5)
self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value,
"Previous max_connections setting not applied on replica", 10, 5)
except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
raise
# make sure cluster is in a good state for further tests
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2,
"No 2 pods running")
@timeout_decorator.timeout(TEST_TIMEOUT_SEC) @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_cross_namespace_secrets(self): def test_cross_namespace_secrets(self):
''' '''
@ -794,7 +921,11 @@ class EndToEndTestCase(unittest.TestCase):
Lower resource limits below configured minimum and let operator fix it Lower resource limits below configured minimum and let operator fix it
''' '''
k8s = self.k8s k8s = self.k8s
# self.eventuallyEqual(lambda: k8s.pg_get_status(), "Running", "Cluster not healthy at start") cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
# get nodes of master and replica(s) (expected target of new master)
_, replica_nodes = k8s.get_pg_nodes(cluster_label)
self.assertNotEqual(replica_nodes, [])
# configure minimum boundaries for CPU and memory limits # configure minimum boundaries for CPU and memory limits
minCPULimit = '503m' minCPULimit = '503m'
@ -827,7 +958,9 @@ class EndToEndTestCase(unittest.TestCase):
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources) "acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_resources)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2, "No two pods running after lazy rolling upgrade") # wait for switched over
k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running") self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members()), 2, "Postgres status did not enter running")
def verify_pod_limits(): def verify_pod_limits():
@ -968,15 +1101,15 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# node affinity change should cause another rolling update and relocation of replica # node affinity change should cause another rolling update and relocation of replica
k8s.wait_for_pod_failover(replica_nodes, 'spilo-role=replica,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label) k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
except timeout_decorator.TimeoutError: except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log())) print('Operator log: {}'.format(k8s.get_operator_log()))
raise raise
# toggle pod anti affinity to make sure replica and master run on separate nodes # toggle pod anti affinity to make sure replica and master run on separate nodes
self.assert_distributed_pods(replica_nodes) self.assert_distributed_pods(master_nodes)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC) @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_node_readiness_label(self): def test_node_readiness_label(self):
@ -1099,7 +1232,7 @@ class EndToEndTestCase(unittest.TestCase):
# check if next rotation date was set in secret # check if next rotation date was set in secret
secret_data = k8s.get_secret_data("zalando") secret_data = k8s.get_secret_data("zalando")
next_rotation_timestamp = datetime.fromisoformat(str(base64.b64decode(secret_data["nextRotation"]), 'utf-8')) next_rotation_timestamp = datetime.strptime(str(base64.b64decode(secret_data["nextRotation"]), 'utf-8'), "%Y-%m-%dT%H:%M:%SZ")
today90days = today+timedelta(days=90) today90days = today+timedelta(days=90)
self.assertEqual(today90days, next_rotation_timestamp.date(), self.assertEqual(today90days, next_rotation_timestamp.date(),
"Unexpected rotation date in secret of zalando user: expected {}, got {}".format(today90days, next_rotation_timestamp.date())) "Unexpected rotation date in secret of zalando user: expected {}, got {}".format(today90days, next_rotation_timestamp.date()))
@ -1114,7 +1247,7 @@ class EndToEndTestCase(unittest.TestCase):
self.query_database(leader.metadata.name, "postgres", create_fake_rotation_user) self.query_database(leader.metadata.name, "postgres", create_fake_rotation_user)
# patch foo_user secret with outdated rotation date # patch foo_user secret with outdated rotation date
fake_rotation_date = today.isoformat() + ' 00:00:00' fake_rotation_date = today.isoformat() + 'T00:00:00Z'
fake_rotation_date_encoded = base64.b64encode(fake_rotation_date.encode('utf-8')) fake_rotation_date_encoded = base64.b64encode(fake_rotation_date.encode('utf-8'))
secret_fake_rotation = { secret_fake_rotation = {
"data": { "data": {
@ -1142,7 +1275,7 @@ class EndToEndTestCase(unittest.TestCase):
# check if next rotation date and username have been replaced # check if next rotation date and username have been replaced
secret_data = k8s.get_secret_data("foo_user") secret_data = k8s.get_secret_data("foo_user")
secret_username = str(base64.b64decode(secret_data["username"]), 'utf-8') secret_username = str(base64.b64decode(secret_data["username"]), 'utf-8')
next_rotation_timestamp = datetime.fromisoformat(str(base64.b64decode(secret_data["nextRotation"]), 'utf-8')) next_rotation_timestamp = datetime.strptime(str(base64.b64decode(secret_data["nextRotation"]), 'utf-8'), "%Y-%m-%dT%H:%M:%SZ")
rotation_user = "foo_user"+today.strftime("%y%m%d") rotation_user = "foo_user"+today.strftime("%y%m%d")
today30days = today+timedelta(days=30) today30days = today+timedelta(days=30)
@ -1192,133 +1325,6 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", user_query)), 2, self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", user_query)), 2,
"Found incorrect number of rotation users", 10, 5) "Found incorrect number of rotation users", 10, 5)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_patroni_config_update(self):
'''
Change Postgres config under Spec.Postgresql.Parameters and Spec.Patroni
and query Patroni config endpoint to check if manifest changes got applied
via restarting cluster through Patroni's rest api
'''
k8s = self.k8s
leader = k8s.get_cluster_leader_pod()
replica = k8s.get_cluster_replica_pod()
masterCreationTimestamp = leader.metadata.creation_timestamp
replicaCreationTimestamp = replica.metadata.creation_timestamp
new_max_connections_value = "50"
# adjust Postgres config
pg_patch_config = {
"spec": {
"postgresql": {
"parameters": {
"max_connections": new_max_connections_value
}
},
"patroni": {
"slots": {
"test_slot": {
"type": "physical"
}
},
"ttl": 29,
"loop_wait": 9,
"retry_timeout": 9,
"synchronous_mode": True
}
}
}
try:
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_config)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
def compare_config():
effective_config = k8s.patroni_rest(leader.metadata.name, "config")
desired_config = pg_patch_config["spec"]["patroni"]
desired_parameters = pg_patch_config["spec"]["postgresql"]["parameters"]
effective_parameters = effective_config["postgresql"]["parameters"]
self.assertEqual(desired_parameters["max_connections"], effective_parameters["max_connections"],
"max_connections not updated")
self.assertTrue(effective_config["slots"] is not None, "physical replication slot not added")
self.assertEqual(desired_config["ttl"], effective_config["ttl"],
"ttl not updated")
self.assertEqual(desired_config["loop_wait"], effective_config["loop_wait"],
"loop_wait not updated")
self.assertEqual(desired_config["retry_timeout"], effective_config["retry_timeout"],
"retry_timeout not updated")
self.assertEqual(desired_config["synchronous_mode"], effective_config["synchronous_mode"],
"synchronous_mode not updated")
return True
# check if Patroni config has been updated
self.eventuallyTrue(compare_config, "Postgres config not applied")
# make sure that pods were not recreated
leader = k8s.get_cluster_leader_pod()
replica = k8s.get_cluster_replica_pod()
self.assertEqual(masterCreationTimestamp, leader.metadata.creation_timestamp,
"Master pod creation timestamp is updated")
self.assertEqual(replicaCreationTimestamp, replica.metadata.creation_timestamp,
"Master pod creation timestamp is updated")
# query max_connections setting
setting_query = """
SELECT setting
FROM pg_settings
WHERE name = 'max_connections';
"""
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], new_max_connections_value,
"New max_connections setting not applied on master", 10, 5)
self.eventuallyNotEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value,
"Expected max_connections not to be updated on replica since Postgres was restarted there first", 10, 5)
# the next sync should restart the replica because it has pending_restart flag set
# force next sync by deleting the operator pod
k8s.delete_operator_pod()
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], new_max_connections_value,
"New max_connections setting not applied on replica", 10, 5)
# decrease max_connections again
# this time restart will be correct and new value should appear on both instances
lower_max_connections_value = "30"
pg_patch_max_connections = {
"spec": {
"postgresql": {
"parameters": {
"max_connections": lower_max_connections_value
}
}
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_patch_max_connections)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# check Patroni config again
pg_patch_config["spec"]["postgresql"]["parameters"]["max_connections"] = lower_max_connections_value
self.eventuallyTrue(compare_config, "Postgres config not applied")
# and query max_connections setting again
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", setting_query)[0], lower_max_connections_value,
"Previous max_connections setting not applied on master", 10, 5)
self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value,
"Previous max_connections setting not applied on replica", 10, 5)
except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
raise
# make sure cluster is in a good state for further tests
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(lambda: k8s.count_running_pods(), 2,
"No 2 pods running")
@timeout_decorator.timeout(TEST_TIMEOUT_SEC) @timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_rolling_update_flag(self): def test_rolling_update_flag(self):
''' '''
@ -1405,7 +1411,7 @@ class EndToEndTestCase(unittest.TestCase):
"data": { "data": {
"pod_label_wait_timeout": "2s", "pod_label_wait_timeout": "2s",
"resync_period": "30s", "resync_period": "30s",
"repair_period": "10s", "repair_period": "30s",
} }
} }

View File

@ -124,6 +124,7 @@ spec:
retry_timeout: 10 retry_timeout: 10
synchronous_mode: false synchronous_mode: false
synchronous_mode_strict: false synchronous_mode_strict: false
synchronous_node_count: 1
maximum_lag_on_failover: 33554432 maximum_lag_on_failover: 33554432
# restore a Postgres DB with point-in-time-recovery # restore a Postgres DB with point-in-time-recovery

View File

@ -341,6 +341,8 @@ spec:
type: boolean type: boolean
synchronous_mode_strict: synchronous_mode_strict:
type: boolean type: boolean
synchronous_node_count:
type: integer
ttl: ttl:
type: integer type: integer
podAnnotations: podAnnotations:

View File

@ -534,6 +534,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
"synchronous_mode_strict": { "synchronous_mode_strict": {
Type: "boolean", Type: "boolean",
}, },
"synchronous_node_count": {
Type: "integer",
},
"ttl": { "ttl": {
Type: "integer", Type: "integer",
}, },

View File

@ -165,6 +165,7 @@ type Patroni struct {
Slots map[string]map[string]string `json:"slots,omitempty"` Slots map[string]map[string]string `json:"slots,omitempty"`
SynchronousMode bool `json:"synchronous_mode,omitempty"` SynchronousMode bool `json:"synchronous_mode,omitempty"`
SynchronousModeStrict bool `json:"synchronous_mode_strict,omitempty"` SynchronousModeStrict bool `json:"synchronous_mode_strict,omitempty"`
SynchronousNodeCount uint32 `json:"synchronous_node_count,omitempty" defaults:1`
} }
// StandbyDescription contains s3 wal path // StandbyDescription contains s3 wal path

View File

@ -50,6 +50,7 @@ type patroniDCS struct {
MaximumLagOnFailover float32 `json:"maximum_lag_on_failover,omitempty"` MaximumLagOnFailover float32 `json:"maximum_lag_on_failover,omitempty"`
SynchronousMode bool `json:"synchronous_mode,omitempty"` SynchronousMode bool `json:"synchronous_mode,omitempty"`
SynchronousModeStrict bool `json:"synchronous_mode_strict,omitempty"` SynchronousModeStrict bool `json:"synchronous_mode_strict,omitempty"`
SynchronousNodeCount uint32 `json:"synchronous_node_count,omitempty"`
PGBootstrapConfiguration map[string]interface{} `json:"postgresql,omitempty"` PGBootstrapConfiguration map[string]interface{} `json:"postgresql,omitempty"`
Slots map[string]map[string]string `json:"slots,omitempty"` Slots map[string]map[string]string `json:"slots,omitempty"`
} }
@ -262,6 +263,9 @@ PatroniInitDBParams:
if patroni.SynchronousModeStrict { if patroni.SynchronousModeStrict {
config.Bootstrap.DCS.SynchronousModeStrict = patroni.SynchronousModeStrict config.Bootstrap.DCS.SynchronousModeStrict = patroni.SynchronousModeStrict
} }
if patroni.SynchronousNodeCount >= 1 {
config.Bootstrap.DCS.SynchronousNodeCount = patroni.SynchronousNodeCount
}
config.PgLocalConfiguration = make(map[string]interface{}) config.PgLocalConfiguration = make(map[string]interface{})

View File

@ -91,11 +91,12 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) {
MaximumLagOnFailover: 33554432, MaximumLagOnFailover: 33554432,
SynchronousMode: true, SynchronousMode: true,
SynchronousModeStrict: true, SynchronousModeStrict: true,
SynchronousNodeCount: 1,
Slots: map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "pgoutput"}}, Slots: map[string]map[string]string{"permanent_logical_1": {"type": "logical", "database": "foo", "plugin": "pgoutput"}},
}, },
role: "zalandos", role: "zalandos",
opConfig: config.Config{}, opConfig: config.Config{},
result: `{"postgresql":{"bin_dir":"/usr/lib/postgresql/11/bin","pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"]},"bootstrap":{"initdb":[{"auth-host":"md5"},{"auth-local":"trust"},"data-checksums",{"encoding":"UTF8"},{"locale":"en_US.UTF-8"}],"users":{"zalandos":{"password":"","options":["CREATEDB","NOLOGIN"]}},"dcs":{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"synchronous_mode":true,"synchronous_mode_strict":true,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}}}}`, result: `{"postgresql":{"bin_dir":"/usr/lib/postgresql/11/bin","pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"]},"bootstrap":{"initdb":[{"auth-host":"md5"},{"auth-local":"trust"},"data-checksums",{"encoding":"UTF8"},{"locale":"en_US.UTF-8"}],"users":{"zalandos":{"password":"","options":["CREATEDB","NOLOGIN"]}},"dcs":{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"synchronous_mode":true,"synchronous_mode_strict":true,"synchronous_node_count":1,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}}}}`,
}, },
} }
for _, tt := range tests { for _, tt := range tests {

View File

@ -13,6 +13,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/patroni" "github.com/zalando/postgres-operator/pkg/util/patroni"
@ -349,6 +350,54 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st
return nil return nil
} }
func (c *Cluster) getPatroniConfig(pod *v1.Pod) (acidv1.Patroni, map[string]string, error) {
var (
patroniConfig acidv1.Patroni
pgParameters map[string]string
)
podName := util.NameFromMeta(pod.ObjectMeta)
err := retryutil.Retry(1*time.Second, 5*time.Second,
func() (bool, error) {
var err error
patroniConfig, pgParameters, err = c.patroni.GetConfig(pod)
if err != nil {
return false, err
}
return true, nil
},
)
if err != nil {
return acidv1.Patroni{}, nil, fmt.Errorf("could not get Postgres config from pod %s: %v", podName, err)
}
return patroniConfig, pgParameters, nil
}
func (c *Cluster) getPatroniMemberData(pod *v1.Pod) (patroni.MemberData, error) {
var memberData patroni.MemberData
err := retryutil.Retry(1*time.Second, 5*time.Second,
func() (bool, error) {
var err error
memberData, err = c.patroni.GetMemberData(pod)
if err != nil {
return false, err
}
return true, nil
},
)
if err != nil {
return patroni.MemberData{}, fmt.Errorf("could not get member data: %v", err)
}
if memberData.State == "creating replica" {
return patroni.MemberData{}, fmt.Errorf("replica currently being initialized")
}
return memberData, nil
}
func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
ch := c.registerPodSubscriber(podName) ch := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName) defer c.unregisterPodSubscriber(podName)
@ -380,54 +429,10 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
return pod, nil return pod, nil
} }
func (c *Cluster) isSafeToRecreatePods(pods []v1.Pod) bool {
/*
Operator should not re-create pods if there is at least one replica being bootstrapped
because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag).
XXX operator cannot forbid replica re-init, so we might still fail if re-init is started
after this check succeeds but before a pod is re-created
*/
for _, pod := range pods {
c.logger.Debugf("name=%s phase=%s ip=%s", pod.Name, pod.Status.Phase, pod.Status.PodIP)
}
for _, pod := range pods {
var data patroni.MemberData
err := retryutil.Retry(1*time.Second, 5*time.Second,
func() (bool, error) {
var err error
data, err = c.patroni.GetMemberData(&pod)
if err != nil {
return false, err
}
return true, nil
},
)
if err != nil {
c.logger.Errorf("failed to get Patroni state for pod: %s", err)
return false
} else if data.State == "creating replica" {
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
return false
}
}
return true
}
func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error { func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.NamespacedName) error {
c.setProcessName("starting to recreate pods") c.setProcessName("starting to recreate pods")
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods)) c.logger.Infof("there are %d pods in the cluster to recreate", len(pods))
if !c.isSafeToRecreatePods(pods) {
return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initialized")
}
var ( var (
masterPod, newMasterPod *v1.Pod masterPod, newMasterPod *v1.Pod
) )

View File

@ -285,6 +285,7 @@ func (c *Cluster) syncStatefulSet() error {
restartMasterFirst bool restartMasterFirst bool
) )
podsToRecreate := make([]v1.Pod, 0) podsToRecreate := make([]v1.Pod, 0)
isSafeToRecreatePods := true
switchoverCandidates := make([]spec.NamespacedName, 0) switchoverCandidates := make([]spec.NamespacedName, 0)
pods, err := c.listPods() pods, err := c.listPods()
@ -410,23 +411,21 @@ func (c *Cluster) syncStatefulSet() error {
// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs // get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used
for i, pod := range pods { for i, pod := range pods {
emptyPatroniConfig := acidv1.Patroni{} patroniConfig, pgParameters, err := c.getPatroniConfig(&pod)
podName := util.NameFromMeta(pods[i].ObjectMeta)
patroniConfig, pgParameters, err := c.patroni.GetConfig(&pod)
if err != nil { if err != nil {
c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err) c.logger.Warningf("%v", err)
isSafeToRecreatePods = false
continue continue
} }
restartWait = patroniConfig.LoopWait restartWait = patroniConfig.LoopWait
// empty config probably means cluster is not fully initialized yet, e.g. restoring from backup // empty config probably means cluster is not fully initialized yet, e.g. restoring from backup
// do not attempt a restart // do not attempt a restart
if !reflect.DeepEqual(patroniConfig, emptyPatroniConfig) || len(pgParameters) > 0 { if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 {
// compare config returned from Patroni with what is specified in the manifest // compare config returned from Patroni with what is specified in the manifest
restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, c.Spec.Patroni, pgParameters, c.Spec.Parameters) restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, c.Spec.Patroni, pgParameters, c.Spec.Parameters)
if err != nil { if err != nil {
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err)
continue continue
} }
@ -448,50 +447,59 @@ func (c *Cluster) syncStatefulSet() error {
remainingPods = append(remainingPods, &pods[i]) remainingPods = append(remainingPods, &pods[i])
continue continue
} }
c.restartInstance(&pod, restartWait) if err = c.restartInstance(&pod, restartWait); err != nil {
c.logger.Errorf("%v", err)
isSafeToRecreatePods = false
}
} }
// in most cases only the master should be left to restart // in most cases only the master should be left to restart
if len(remainingPods) > 0 { if len(remainingPods) > 0 {
for _, remainingPod := range remainingPods { for _, remainingPod := range remainingPods {
c.restartInstance(remainingPod, restartWait) if err = c.restartInstance(remainingPod, restartWait); err != nil {
c.logger.Errorf("%v", err)
isSafeToRecreatePods = false
}
} }
} }
// if we get here we also need to re-create the pods (either leftovers from the old // if we get here we also need to re-create the pods (either leftovers from the old
// statefulset or those that got their configuration from the outdated statefulset) // statefulset or those that got their configuration from the outdated statefulset)
if len(podsToRecreate) > 0 { if len(podsToRecreate) > 0 {
c.logger.Debugln("performing rolling update") if isSafeToRecreatePods {
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") c.logger.Debugln("performing rolling update")
if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil { c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
return fmt.Errorf("could not recreate pods: %v", err) if err := c.recreatePods(podsToRecreate, switchoverCandidates); err != nil {
return fmt.Errorf("could not recreate pods: %v", err)
}
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
} else {
c.logger.Warningf("postpone pod recreation until next sync")
} }
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
} }
return nil return nil
} }
func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) { func (c *Cluster) restartInstance(pod *v1.Pod, restartWait uint32) error {
// if the config update requires a restart, call Patroni restart
podName := util.NameFromMeta(pod.ObjectMeta) podName := util.NameFromMeta(pod.ObjectMeta)
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
memberData, err := c.getPatroniMemberData(pod)
// if the config update requires a restart, call Patroni restart
memberData, err := c.patroni.GetMemberData(pod)
if err != nil { if err != nil {
c.logger.Debugf("could not get member data of %s pod %s - skipping possible restart attempt: %v", role, podName, err) return fmt.Errorf("could not restart Postgres in %s pod %s: %v", role, podName, err)
return
} }
// do restart only when it is pending // do restart only when it is pending
if memberData.PendingRestart { if memberData.PendingRestart {
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, pod.Name)) c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("restarting Postgres server within %s pod %s", role, podName))
if err := c.patroni.Restart(pod); err != nil { if err := c.patroni.Restart(pod); err != nil {
c.logger.Warningf("could not restart Postgres server within %s pod %s: %v", role, podName, err) return err
return
} }
time.Sleep(time.Duration(restartWait) * time.Second) time.Sleep(time.Duration(restartWait) * time.Second)
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, pod.Name)) c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", fmt.Sprintf("Postgres server restart done for %s pod %s", role, podName))
} }
return nil
} }
// AnnotationsToPropagate get the annotations to update if required // AnnotationsToPropagate get the annotations to update if required
@ -620,11 +628,6 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
return requiresMasterRestart, nil return requiresMasterRestart, nil
} }
func (c *Cluster) getNextRotationDate(currentDate time.Time) (time.Time, string) {
nextRotationDate := currentDate.AddDate(0, 0, int(c.OpConfig.PasswordRotationInterval))
return nextRotationDate, nextRotationDate.Format("2006-01-02 15:04:05")
}
func (c *Cluster) syncSecrets() error { func (c *Cluster) syncSecrets() error {
c.logger.Info("syncing secrets") c.logger.Info("syncing secrets")
@ -682,6 +685,11 @@ func (c *Cluster) syncSecrets() error {
return nil return nil
} }
func (c *Cluster) getNextRotationDate(currentDate time.Time) (time.Time, string) {
nextRotationDate := currentDate.AddDate(0, 0, int(c.OpConfig.PasswordRotationInterval))
return nextRotationDate, nextRotationDate.Format(time.RFC3339)
}
func (c *Cluster) updateSecret( func (c *Cluster) updateSecret(
secretUsername string, secretUsername string,
generatedSecret *v1.Secret, generatedSecret *v1.Secret,
@ -727,7 +735,7 @@ func (c *Cluster) updateSecret(
// initialize password rotation setting first rotation date // initialize password rotation setting first rotation date
nextRotationDateStr = string(secret.Data["nextRotation"]) nextRotationDateStr = string(secret.Data["nextRotation"])
if nextRotationDate, err = time.ParseInLocation("2006-01-02 15:04:05", nextRotationDateStr, time.Local); err != nil { if nextRotationDate, err = time.ParseInLocation(time.RFC3339, nextRotationDateStr, currentTime.UTC().Location()); err != nil {
nextRotationDate, nextRotationDateStr = c.getNextRotationDate(currentTime) nextRotationDate, nextRotationDateStr = c.getNextRotationDate(currentTime)
secret.Data["nextRotation"] = []byte(nextRotationDateStr) secret.Data["nextRotation"] = []byte(nextRotationDateStr)
updateSecret = true updateSecret = true
@ -736,7 +744,7 @@ func (c *Cluster) updateSecret(
// check if next rotation can happen sooner // check if next rotation can happen sooner
// if rotation interval has been decreased // if rotation interval has been decreased
currentRotationDate, _ := c.getNextRotationDate(currentTime) currentRotationDate, nextRotationDateStr := c.getNextRotationDate(currentTime)
if nextRotationDate.After(currentRotationDate) { if nextRotationDate.After(currentRotationDate) {
nextRotationDate = currentRotationDate nextRotationDate = currentRotationDate
} }
@ -756,8 +764,6 @@ func (c *Cluster) updateSecret(
*retentionUsers = append(*retentionUsers, secretUsername) *retentionUsers = append(*retentionUsers, secretUsername)
} }
secret.Data["password"] = []byte(util.RandomPassword(constants.PasswordLength)) secret.Data["password"] = []byte(util.RandomPassword(constants.PasswordLength))
_, nextRotationDateStr = c.getNextRotationDate(nextRotationDate)
secret.Data["nextRotation"] = []byte(nextRotationDateStr) secret.Data["nextRotation"] = []byte(nextRotationDateStr)
updateSecret = true updateSecret = true

View File

@ -270,13 +270,29 @@ func TestUpdateSecret(t *testing.T) {
clusterName := "acid-test-cluster" clusterName := "acid-test-cluster"
namespace := "default" namespace := "default"
username := "foo" dbname := "app"
dbowner := "appowner"
secretTemplate := config.StringTemplate("{username}.{cluster}.credentials") secretTemplate := config.StringTemplate("{username}.{cluster}.credentials")
rotationUsers := make(spec.PgUserMap) rotationUsers := make(spec.PgUserMap)
retentionUsers := make([]string, 0) retentionUsers := make([]string, 0)
yesterday := time.Now().AddDate(0, 0, -1)
// new cluster with pvc storage resize mode and configured labels // define manifest users and enable rotation for dbowner
pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
},
Spec: acidv1.PostgresSpec{
Databases: map[string]string{dbname: dbowner},
Users: map[string]acidv1.UserFlags{"foo": {}, dbowner: {}},
UsersWithInPlaceSecretRotation: []string{dbowner},
Volume: acidv1.Volume{
Size: "1Gi",
},
},
}
// new cluster with enabled password rotation
var cluster = New( var cluster = New(
Config{ Config{
OpConfig: config.Config{ OpConfig: config.Config{
@ -291,44 +307,61 @@ func TestUpdateSecret(t *testing.T) {
ClusterNameLabel: "cluster-name", ClusterNameLabel: "cluster-name",
}, },
}, },
}, client, acidv1.Postgresql{}, logger, eventRecorder) }, client, pg, logger, eventRecorder)
cluster.Name = clusterName cluster.Name = clusterName
cluster.Namespace = namespace cluster.Namespace = namespace
cluster.pgUsers = map[string]spec.PgUser{} cluster.pgUsers = map[string]spec.PgUser{}
cluster.Spec.Users = map[string]acidv1.UserFlags{username: {}}
cluster.initRobotUsers() cluster.initRobotUsers()
// create a secret for user foo // create secrets
cluster.syncSecrets()
// initialize rotation with current time
cluster.syncSecrets() cluster.syncSecrets()
secret, err := cluster.KubeClient.Secrets(namespace).Get(context.TODO(), secretTemplate.Format("username", username, "cluster", clusterName), metav1.GetOptions{}) dayAfterTomorrow := time.Now().AddDate(0, 0, 2)
assert.NoError(t, err)
generatedSecret := cluster.Secrets[secret.UID]
// now update the secret setting next rotation date (yesterday + interval) for username := range cluster.Spec.Users {
cluster.updateSecret(username, generatedSecret, &rotationUsers, &retentionUsers, yesterday) pgUser := cluster.pgUsers[username]
updatedSecret, err := cluster.KubeClient.Secrets(namespace).Get(context.TODO(), secretTemplate.Format("username", username, "cluster", clusterName), metav1.GetOptions{})
assert.NoError(t, err)
nextRotation := string(updatedSecret.Data["nextRotation"]) // first, get the secret
_, nextRotationDate := cluster.getNextRotationDate(yesterday) secret, err := cluster.KubeClient.Secrets(namespace).Get(context.TODO(), secretTemplate.Format("username", username, "cluster", clusterName), metav1.GetOptions{})
if nextRotation != nextRotationDate { assert.NoError(t, err)
t.Errorf("%s: updated secret does not contain correct rotation date: expected %s, got %s", testName, nextRotationDate, nextRotation) secretPassword := string(secret.Data["password"])
}
// update secret again but use current time to trigger rotation // now update the secret setting a next rotation date (tomorrow + interval)
cluster.updateSecret(username, generatedSecret, &rotationUsers, &retentionUsers, time.Now()) cluster.updateSecret(username, secret, &rotationUsers, &retentionUsers, dayAfterTomorrow)
updatedSecret, err = cluster.KubeClient.Secrets(namespace).Get(context.TODO(), secretTemplate.Format("username", username, "cluster", clusterName), metav1.GetOptions{}) updatedSecret, err := cluster.KubeClient.Secrets(namespace).Get(context.TODO(), secretTemplate.Format("username", username, "cluster", clusterName), metav1.GetOptions{})
assert.NoError(t, err) assert.NoError(t, err)
if len(rotationUsers) != 1 && len(retentionUsers) != 1 { // check that passwords are different
t.Errorf("%s: unexpected number of users to rotate - expected only foo, found %d", testName, len(rotationUsers)) rotatedPassword := string(updatedSecret.Data["password"])
} if secretPassword == rotatedPassword {
t.Errorf("%s: password unchanged in updated secret for %s", testName, username)
}
secretUsername := string(updatedSecret.Data["username"]) // check that next rotation date is tomorrow + interval, not date in secret + interval
rotatedUsername := username + time.Now().Format("060102") nextRotation := string(updatedSecret.Data["nextRotation"])
if secretUsername != rotatedUsername { _, nextRotationDate := cluster.getNextRotationDate(dayAfterTomorrow)
t.Errorf("%s: updated secret does not contain correct username: expected %s, got %s", testName, rotatedUsername, secretUsername) if nextRotation != nextRotationDate {
t.Errorf("%s: updated secret of %s does not contain correct rotation date: expected %s, got %s", testName, username, nextRotationDate, nextRotation)
}
// compare username, when it's dbowner they should be equal because of UsersWithInPlaceSecretRotation
secretUsername := string(updatedSecret.Data["username"])
if pgUser.IsDbOwner {
if secretUsername != username {
t.Errorf("%s: username differs in updated secret: expected %s, got %s", testName, username, secretUsername)
}
} else {
rotatedUsername := username + dayAfterTomorrow.Format("060102")
if secretUsername != rotatedUsername {
t.Errorf("%s: updated secret does not contain correct username: expected %s, got %s", testName, rotatedUsername, secretUsername)
}
if len(rotationUsers) != 1 && len(retentionUsers) != 1 {
t.Errorf("%s: unexpected number of users to rotate - expected only %s, found %d", testName, username, len(rotationUsers))
}
}
} }
} }

View File

@ -98,7 +98,7 @@ COST_MEMORY = 30.5 * 24 * float(getenv('COST_MEMORY', 0.014375)) # Memory GB m5
WALE_S3_ENDPOINT = getenv( WALE_S3_ENDPOINT = getenv(
'WALE_S3_ENDPOINT', 'WALE_S3_ENDPOINT',
'https+path://s3-eu-central-1.amazonaws.com:443', 'https+path://s3.eu-central-1.amazonaws.com:443',
) )
USE_AWS_INSTANCE_PROFILE = ( USE_AWS_INSTANCE_PROFILE = (