operator updates Postgres config and creates replication user

This commit is contained in:
Felix Kunde 2021-08-10 11:26:23 +02:00
parent 2fadb740fa
commit eb7050571d
9 changed files with 192 additions and 137 deletions

View File

@ -470,6 +470,36 @@ spec:
properties:
s3_wal_path:
type: string
streams:
type: array
nullable: true
items:
type: object
required:
- streamType
properties:
batchSize:
type: integer
database:
type: string
filter:
type: object
additionalProperties:
type: string
queueName:
type: string
sqsArn:
type: string
tables:
type: object
additionalProperties:
type: string
streamType:
type: string
enum:
- "nakadi"
- "sqs"
- "wal"
teamId:
type: string
tls:

View File

@ -466,6 +466,36 @@ spec:
properties:
s3_wal_path:
type: string
streams:
type: array
nullable: true
items:
type: object
required:
- streamType
properties:
batchSize:
type: integer
database:
type: string
filter:
type: object
additionalProperties:
type: string
queueName:
type: string
sqsArn:
type: string
tables:
type: object
additionalProperties:
type: string
streamType:
type: string
enum:
- "nakadi"
- "sqs"
- "wal"
teamId:
type: string
tls:

View File

@ -662,8 +662,11 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
Items: &apiextv1.JSONSchemaPropsOrArray{
Schema: &apiextv1.JSONSchemaProps{
Type: "object",
Required: []string{"type"},
Required: []string{"streamType"},
Properties: map[string]apiextv1.JSONSchemaProps{
"batchSize": {
Type: "integer",
},
"database": {
Type: "string",
},
@ -671,10 +674,13 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
Type: "object",
AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{
Schema: &apiextv1.JSONSchemaProps{
Type: "string",
Type: "string",
},
},
},
"queueName": {
Type: "string",
},
"sqsArn": {
Type: "string",
},
@ -682,11 +688,11 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
Type: "object",
AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{
Schema: &apiextv1.JSONSchemaProps{
Type: "string",
Type: "string",
},
},
},
"type": {
"streamType": {
Type: "string",
Enum: []apiextv1.JSON{
{

View File

@ -227,12 +227,11 @@ type ConnectionPooler struct {
}
type Stream struct {
Type string `json:"type"`
Database string `json:"database,omitempty"`
Tables map[string]string `json:"tables,omitempty"`
Filter map[string]string `json:"filter,omitempty"`
BatchSize uint32 `json:"batchSize,omitempty"`
SqsArn string `json:"sqsArn,omitempty"`
QueueName string `json:"queueName,omitempty"`
User string `json:"user,omitempty"`
StreamType string `json:"streamType"`
Database string `json:"database,omitempty"`
Tables map[string]string `json:"tables,omitempty"`
Filter map[string]string `json:"filter,omitempty"`
BatchSize uint32 `json:"batchSize,omitempty"`
SqsArn string `json:"sqsArn,omitempty"`
QueueName string `json:"queueName,omitempty"`
}

View File

@ -362,7 +362,7 @@ func (c *Cluster) Create() error {
c.createConnectionPooler(c.installLookupFunction)
if len(c.Spec.Streams) > 0 {
c.createStreams()
c.syncStreams()
}
return nil
@ -1052,6 +1052,23 @@ func (c *Cluster) initSystemUsers() {
c.systemUsers[constants.ConnectionPoolerUserKeyName] = connectionPoolerUser
}
}
// replication users for event streams are another exception
// the operator will create one replication user for all streams
if len(c.Spec.Streams) > 0 {
username := constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix
streamUser := spec.PgUser{
Origin: spec.RoleConnectionPooler,
Name: username,
Namespace: c.Namespace,
Flags: []string{constants.RoleFlagLogin, constants.RoleFlagReplication},
Password: util.RandomPassword(constants.PasswordLength),
}
if _, exists := c.pgUsers[username]; !exists {
c.pgUsers[username] = streamUser
}
}
}
func (c *Cluster) initPreparedDatabaseRoles() error {

View File

@ -8,6 +8,7 @@ import (
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
@ -19,13 +20,8 @@ var outboxTableNameTemplate config.StringTemplate = "{table}_{eventtype}_outbox"
func (c *Cluster) createStreams() error {
c.setProcessName("creating streams")
err := c.syncLogicalDecoding()
if err != nil {
return fmt.Errorf("logical decoding setup incomplete: %v", err)
}
fes := c.generateFabricEventStream()
_, err = c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{})
_, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("could not create event stream custom resource: %v", err)
}
@ -33,27 +29,61 @@ func (c *Cluster) createStreams() error {
return nil
}
func (c *Cluster) syncLogicalDecoding() error {
func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStream) error {
c.setProcessName("updating event streams")
walLevel := c.Spec.PostgresqlParam.Parameters["wal_level"]
if walLevel == "" || walLevel != "logical" {
c.logger.Debugf("setting wal level to 'logical' in postgres configuration")
pods, err := c.listPods()
if err != nil || len(pods) == 0 {
return err
}
for _, pod := range pods {
if err := c.patroni.SetPostgresParameters(&pod, map[string]string{"wal_level": "logical"}); err == nil {
return fmt.Errorf("could not set wal_level to 'logical' calling Patroni REST API: %v", err)
}
}
_, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("could not update event stream custom resource: %v", err)
}
return nil
}
func (c *Cluster) syncPostgresConfig() error {
desiredPostgresConfig := make(map[string]interface{})
slots := make(map[string]map[string]string)
c.logger.Debugf("setting wal level to 'logical' in postgres configuration")
desiredPostgresConfig["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: map[string]string{"wal_level": "logical"}}
for _, stream := range c.Spec.Streams {
slotName := c.getLogicalReplicationSlot(stream.Database)
if slotName == "" {
c.logger.Debugf("creating logical replication slot %d in database %d", constants.EventStreamSourceSlotPrefix+stream.Database, stream.Database)
c.logger.Debugf("creating logical replication slot %q in database %q", constants.EventStreamSourceSlotPrefix+stream.Database, stream.Database)
slot := map[string]string{
"database": stream.Database,
"plugin": "wal2json",
"type": "logical",
}
slots[constants.EventStreamSourceSlotPrefix+stream.Database] = slot
}
}
if len(slots) > 0 {
desiredPostgresConfig["slots"] = slots
} else {
return nil
}
pods, err := c.listPods()
if err != nil || len(pods) == 0 {
return err
}
for i, pod := range pods {
podName := util.NameFromMeta(pods[i].ObjectMeta)
effectivePostgresConfig, err := c.patroni.GetConfig(&pod)
if err != nil {
c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err)
continue
}
_, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePostgresConfig, desiredPostgresConfig)
if err != nil {
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
continue
}
}
@ -64,18 +94,18 @@ func (c *Cluster) syncStreamDbResources() error {
for _, stream := range c.Spec.Streams {
if err := c.initDbConnWithName(stream.Database); err != nil {
return fmt.Errorf("could not init connection to database %s specified for event stream: %v", stream.Database, err)
return fmt.Errorf("could not init connection to database %q specified for event stream: %v", stream.Database, err)
}
for table, eventType := range stream.Tables {
tableName, schemaName := getTableSchema(table)
if exists, err := c.tableExists(tableName, schemaName); !exists {
return fmt.Errorf("could not find table %s specified for event stream: %v", table, err)
return fmt.Errorf("could not find table %q specified for event stream: %v", table, err)
}
// check if outbox table exists and if not, create it
outboxTable := outboxTableNameTemplate.Format("table", tableName, "eventtype", eventType)
if exists, err := c.tableExists(outboxTable, schemaName); !exists {
return fmt.Errorf("could not find outbox table %s specified for event stream: %v", outboxTable, err)
return fmt.Errorf("could not find outbox table %q specified for event stream: %v", outboxTable, err)
}
}
}
@ -120,12 +150,12 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType st
Schema: schema,
EventStreamTable: getOutboxTable(table, eventType),
Filter: streamFilter,
Connection: c.getStreamConnection(stream.Database, stream.User),
Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix),
}
}
func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow {
switch stream.Type {
switch stream.StreamType {
case "nakadi":
return zalandov1alpha1.EventStreamFlow{
Type: constants.EventStreamFlowPgNakadiType,
@ -144,7 +174,7 @@ func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow {
}
func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1alpha1.EventStreamSink {
switch stream.Type {
switch stream.StreamType {
case "nakadi":
return zalandov1alpha1.EventStreamSink{
Type: constants.EventStreamSinkNakadiType,
@ -204,6 +234,11 @@ func (c *Cluster) syncStreams() error {
c.setProcessName("syncing streams")
err := c.syncPostgresConfig()
if err != nil {
return fmt.Errorf("logical decoding setup incomplete: %v", err)
}
effectiveStreams, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Get(context.TODO(), c.Name+constants.FESsuffix, metav1.GetOptions{})
if err != nil {
if !k8sutil.ResourceNotFound(err) {
@ -216,7 +251,10 @@ func (c *Cluster) syncStreams() error {
return fmt.Errorf("could not create missing streams: %v", err)
}
} else {
c.syncStreamDbResources()
err := c.syncStreamDbResources()
if err != nil {
return fmt.Warnf("database setup incomplete: %v", err)
}
desiredStreams := c.generateFabricEventStream()
if reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) {
c.updateStreams(desiredStreams)
@ -225,14 +263,3 @@ func (c *Cluster) syncStreams() error {
return nil
}
func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStream) error {
c.setProcessName("updating event streams")
_, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("could not update event stream custom resource: %v", err)
}
return nil
}

View File

@ -23,6 +23,7 @@ func newFakeK8sStreamClient() (k8sutil.KubernetesClient, *fake.Clientset) {
return k8sutil.KubernetesClient{
FabricEventStreamsGetter: zalandoClientSet.ZalandoV1alpha1(),
PodsGetter: clientSet.CoreV1(),
}, clientSet
}
@ -40,50 +41,32 @@ func TestGenerateFabricEventStream(t *testing.T) {
Databases: map[string]string{
"foo": "foo_user",
},
Patroni: acidv1.Patroni{
Slots: map[string]map[string]string{
"fes": {
"type": "logical",
"database": "foo",
"plugin": "wal2json",
},
},
},
PostgresqlParam: acidv1.PostgresqlParam{
Parameters: map[string]string{
"wal_level": "logical",
},
},
Streams: []acidv1.Stream{
{
Type: "nakadi",
Database: "foo",
StreamType: "nakadi",
Database: "foo",
Tables: map[string]string{
"bar": "stream_type_a",
},
BatchSize: uint32(100),
User: "foo_user",
},
{
Type: "wal",
Database: "foo",
StreamType: "wal",
Database: "foo",
Tables: map[string]string{
"bar": "stream_type_a",
},
BatchSize: uint32(100),
User: "zalando",
},
{
Type: "sqs",
Database: "foo",
SqsArn: "arn:aws:sqs:eu-central-1:111122223333",
QueueName: "foo-queue",
User: "foo_user",
StreamType: "sqs",
Database: "foo",
SqsArn: "arn:aws:sqs:eu-central-1:111122223333",
QueueName: "foo-queue",
},
},
Users: map[string]acidv1.UserFlags{
"foo_user": {},
"zalando": {},
"foo_user": []string{"replication"},
},
Volume: acidv1.Volume{
Size: "1Gi",

View File

@ -265,10 +265,11 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
func (c *Cluster) syncStatefulSet() error {
var (
masterPod *v1.Pod
postgresConfig map[string]interface{}
effectivePostgresConfig map[string]interface{}
instanceRestartRequired bool
)
desiredPostgresConfig := make(map[string]interface{})
podsToRecreate := make([]v1.Pod, 0)
switchoverCandidates := make([]spec.NamespacedName, 0)
@ -394,14 +395,25 @@ func (c *Cluster) syncStatefulSet() error {
// get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used.
desiredPostgresConfig["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: c.Spec.Parameters}
desiredPostgresConfig["loop_wait"] = c.Spec.Patroni.LoopWait
desiredPostgresConfig["maximum_lag_on_failover"] = c.Spec.Patroni.MaximumLagOnFailover
desiredPostgresConfig["pg_hba"] = c.Spec.Patroni.PgHba
desiredPostgresConfig["retry_timeout"] = c.Spec.Patroni.RetryTimeout
desiredPostgresConfig["slots"] = c.Spec.Patroni.Slots
desiredPostgresConfig["synchronous_mode"] = c.Spec.Patroni.SynchronousMode
desiredPostgresConfig["synchronous_mode_strict"] = c.Spec.Patroni.SynchronousModeStrict
desiredPostgresConfig["ttl"] = c.Spec.Patroni.TTL
for i, pod := range pods {
podName := util.NameFromMeta(pods[i].ObjectMeta)
config, err := c.patroni.GetConfig(&pod)
effectivePostgresConfig, err := c.patroni.GetConfig(&pod)
if err != nil {
c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err)
continue
}
instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, config)
instanceRestartRequired, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePostgresConfig, desiredPostgresConfig)
if err != nil {
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
continue
@ -412,7 +424,7 @@ func (c *Cluster) syncStatefulSet() error {
// if the config update requires a restart, call Patroni restart for replicas first, then master
if instanceRestartRequired {
c.logger.Debug("restarting Postgres server within pods")
ttl, ok := postgresConfig["ttl"].(int32)
ttl, ok := effectivePostgresConfig["ttl"].(int32)
if !ok {
ttl = 30
}
@ -493,62 +505,13 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri
// checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
// (like max_connections) have changed and if necessary sets it via the Patroni API
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig map[string]interface{}) (bool, error) {
configToSet := make(map[string]interface{})
parametersToSet := make(map[string]string)
effectivePgParameters := make(map[string]interface{})
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectivePatroniConfig, desiredPatroniConfig map[string]interface{}) (bool, error) {
// read effective Patroni config if set
if patroniConfig != nil {
effectivePostgresql := patroniConfig["postgresql"].(map[string]interface{})
effectivePgParameters = effectivePostgresql[patroniPGParametersParameterName].(map[string]interface{})
}
// compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest
desiredPgParameters := c.Spec.Parameters
for desiredOption, desiredValue := range desiredPgParameters {
effectiveValue := effectivePgParameters[desiredOption]
if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) {
parametersToSet[desiredOption] = desiredValue
}
}
if len(parametersToSet) > 0 {
configToSet["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: parametersToSet}
}
// compare other options from config with c.Spec.Patroni from manifest
desiredPatroniConfig := c.Spec.Patroni
if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != uint32(patroniConfig["loop_wait"].(float64)) {
configToSet["loop_wait"] = desiredPatroniConfig.LoopWait
}
if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != float32(patroniConfig["maximum_lag_on_failover"].(float64)) {
configToSet["maximum_lag_on_failover"] = desiredPatroniConfig.MaximumLagOnFailover
}
if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, (patroniConfig["pg_hba"])) {
configToSet["pg_hba"] = desiredPatroniConfig.PgHba
}
if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != uint32(patroniConfig["retry_timeout"].(float64)) {
configToSet["retry_timeout"] = desiredPatroniConfig.RetryTimeout
}
if desiredPatroniConfig.Slots != nil && !reflect.DeepEqual(desiredPatroniConfig.Slots, patroniConfig["slots"]) {
configToSet["slots"] = desiredPatroniConfig.Slots
}
if desiredPatroniConfig.SynchronousMode != patroniConfig["synchronous_mode"] {
configToSet["synchronous_mode"] = desiredPatroniConfig.SynchronousMode
}
if desiredPatroniConfig.SynchronousModeStrict != patroniConfig["synchronous_mode_strict"] {
configToSet["synchronous_mode_strict"] = desiredPatroniConfig.SynchronousModeStrict
}
if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != uint32(patroniConfig["ttl"].(float64)) {
configToSet["ttl"] = desiredPatroniConfig.TTL
}
if len(configToSet) == 0 {
if reflect.DeepEqual(effectivePatroniConfig, desiredPatroniConfig) {
return false, nil
}
configToSetJson, err := json.Marshal(configToSet)
configToSetJson, err := json.Marshal(desiredPatroniConfig)
if err != nil {
c.logger.Debugf("could not convert config patch to JSON: %v", err)
}
@ -558,8 +521,8 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC
podName := util.NameFromMeta(pod.ObjectMeta)
c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s",
podName, configToSetJson)
if err = c.patroni.SetConfig(pod, configToSet); err != nil {
return true, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err)
if err = c.patroni.SetConfig(pod, desiredPatroniConfig); err != nil {
return true, fmt.Errorf("could not patch postgres parameters within pod %s: %v", podName, err)
}
return true, nil

View File

@ -4,7 +4,7 @@ package constants
const (
FESsuffix = "-event-streams"
EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotPrefix = "fes"
EventStreamSourceSlotPrefix = "fes_"
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent"
EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent"