reflect code review and additional refactoring

This commit is contained in:
Felix Kunde 2022-02-21 17:01:58 +01:00
parent 47aa7b1c64
commit 9007475ed8
4 changed files with 150 additions and 128 deletions

View File

@ -536,11 +536,18 @@ Those parameters are grouped under the `tls` top-level key.
## Change data capture streams
This sections enables change data capture (CDC) streams e.g. into Zalandos
distributed event broker [Nakadi](https://nakadi.io/). Parameters grouped
under the `streams` top-level key will be used by the operator to create
custom resources for Zalando's internal CDC operator. Each stream object can
have the following properties:
This sections enables change data capture (CDC) streams via Postgres'
[logical decoding](https://www.postgresql.org/docs/14/logicaldecoding.html)
feature and `pgoutput` plugin. While the Postgres operator takes responsibility
for providing the setup to publish change events, it relies on external tools
to consume them. At Zalando, we are using a workflow based on
[Debezium Connector](https://debezium.io/documentation/reference/stable/connectors/postgresql.html)
which can feed streams into Zalandos distributed event broker [Nakadi](https://nakadi.io/)
among others.
The Postgres Operator creates custom resources for Zalando's internal CDC
operator which will be used to set up the consumer part. Each stream object
can have the following properties:
* **applicationId**
The application name to which the database and CDC belongs to. For each

View File

@ -15,32 +15,22 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func (c *Cluster) createStreams(appId string) {
func (c *Cluster) createStreams(appId string) error {
c.setProcessName("creating streams")
var (
fes *zalandov1.FabricEventStream
err error
)
msg := "could not create event stream custom resource with applicationId %s: %v"
fes = c.generateFabricEventStream(appId)
if err != nil {
c.logger.Warningf(msg, appId, err)
}
_, err = c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{})
if err != nil {
c.logger.Warningf(msg, appId, err)
fes := c.generateFabricEventStream(appId)
if _, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}); err != nil {
return err
}
return nil
}
func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) error {
c.setProcessName("updating event streams")
_, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("could not update event stream custom resource: %v", err)
if _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}); err != nil {
return err
}
return nil
@ -50,7 +40,7 @@ func (c *Cluster) deleteStreams() error {
c.setProcessName("deleting event streams")
// check if stream CRD is installed before trying a delete
_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{})
_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{})
if k8sutil.ResourceNotFound(err) {
return nil
}
@ -74,9 +64,39 @@ func gatherApplicationIds(streams []acidv1.Stream) []string {
return appIds
}
func (c *Cluster) syncPostgresConfig() error {
slots := make(map[string]map[string]string)
publications := make(map[string]map[string]acidv1.StreamTable)
func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) error {
errorMsg := "no pods found to update config"
// if streams are defined wal_level must be switched to logical
requiredPgParameters := map[string]string{"wal_level": "logical"}
// apply config changes in pods
pods, err := c.listPods()
if err != nil {
errorMsg = fmt.Sprintf("could not list pods of the statefulset: %v", err)
}
for i, pod := range pods {
podName := util.NameFromMeta(pods[i].ObjectMeta)
effectivePatroniConfig, effectivePgParameters, err := c.patroni.GetConfig(&pod)
if err != nil {
errorMsg = fmt.Sprintf("could not get Postgres config from pod %s: %v", podName, err)
continue
}
_, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters)
if err != nil {
errorMsg = fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
continue
}
// Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used
return nil
}
return fmt.Errorf(errorMsg)
}
func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error {
createPublications := make(map[string]string)
alterPublications := make(map[string]string)
@ -86,106 +106,45 @@ func (c *Cluster) syncPostgresConfig() error {
}
}()
desiredPatroniConfig := c.Spec.Patroni
if len(desiredPatroniConfig.Slots) > 0 {
slots = desiredPatroniConfig.Slots
// check for existing publications
if err := c.initDbConnWithName(dbName); err != nil {
return fmt.Errorf("could not init database connection")
}
// define extra logical slots for Patroni config
for _, stream := range c.Spec.Streams {
slot := map[string]string{
"database": stream.Database,
"plugin": constants.EventStreamSourcePluginType,
"type": "logical",
}
slotName := getSlotName(stream.Database, stream.ApplicationId)
if _, exists := slots[slotName]; !exists {
slots[slotName] = slot
publications[slotName] = stream.Tables
} else {
streamTables := publications[slotName]
for tableName, table := range stream.Tables {
if _, exists := streamTables[tableName]; !exists {
streamTables[tableName] = table
}
}
publications[slotName] = streamTables
}
currentPublications, err := c.getPublications()
if err != nil {
return fmt.Errorf("could not get current publications: %v", err)
}
if len(slots) > 0 {
desiredPatroniConfig.Slots = slots
} else {
tableNames := make([]string, len(tables))
i := 0
for t := range tables {
tableName, schemaName := getTableSchema(t)
tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
i++
}
sort.Strings(tableNames)
tableList := strings.Join(tableNames, ", ")
currentTables, exists := currentPublications[publication]
if !exists {
createPublications[publication] = tableList
} else if currentTables != tableList {
alterPublications[publication] = tableList
}
if len(createPublications)+len(alterPublications) == 0 {
return nil
}
// if streams are defined wal_level must be switched to logical
desiredPgParameters := map[string]string{"wal_level": "logical"}
// apply config changes in pods
pods, err := c.listPods()
if err != nil || len(pods) == 0 {
c.logger.Warningf("could not list pods of the statefulset: %v", err)
}
for i, pod := range pods {
podName := util.NameFromMeta(pods[i].ObjectMeta)
effectivePatroniConfig, effectivePgParameters, err := c.patroni.GetConfig(&pod)
if err != nil {
c.logger.Warningf("could not get Postgres config from pod %s: %v", podName, err)
continue
}
_, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, desiredPatroniConfig, effectivePgParameters, desiredPgParameters)
if err != nil {
c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", podName, err)
continue
for publicationName, tables := range createPublications {
if err = c.executeCreatePublication(publicationName, tables); err != nil {
return fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
}
}
// next, create publications to each created slot
c.logger.Debug("syncing database publications")
for publication, tables := range publications {
// but first check for existing publications
dbName := slots[publication]["database"]
if err := c.initDbConnWithName(dbName); err != nil {
return fmt.Errorf("could not init database connection")
}
currentPublications, err := c.getPublications()
if err != nil {
return fmt.Errorf("could not get current publications: %v", err)
}
tableNames := make([]string, len(tables))
i := 0
for t := range tables {
tableName, schemaName := getTableSchema(t)
tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
i++
}
sort.Strings(tableNames)
tableList := strings.Join(tableNames, ", ")
currentTables, exists := currentPublications[publication]
if !exists {
createPublications[publication] = tableList
} else if currentTables != tableList {
alterPublications[publication] = tableList
}
if len(createPublications)+len(alterPublications) == 0 {
return nil
}
for publicationName, tables := range createPublications {
if err = c.executeCreatePublication(publicationName, tables); err != nil {
c.logger.Warningf("%v", err)
}
}
for publicationName, tables := range alterPublications {
if err = c.executeAlterPublication(publicationName, tables); err != nil {
c.logger.Warningf("%v", err)
}
for publicationName, tables := range alterPublications {
if err = c.executeAlterPublication(publicationName, tables); err != nil {
return fmt.Errorf("update of publication %q failed: %v", publicationName, err)
}
}
@ -213,8 +172,8 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
return &zalandov1.FabricEventStream{
TypeMeta: metav1.TypeMeta{
Kind: constants.EventStreamSourceCRDKind,
APIVersion: "zalando.org/v1",
APIVersion: constants.EventStreamCRDApiVersion,
Kind: constants.EventStreamCRDKind,
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", c.Name, appId),
@ -300,15 +259,65 @@ func (c *Cluster) syncStreams() error {
c.setProcessName("syncing streams")
_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{})
_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{})
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("event stream CRD not installed, skipping")
return nil
}
err = c.syncPostgresConfig()
slots := make(map[string]map[string]string)
publications := make(map[string]map[string]acidv1.StreamTable)
requiredPatroniConfig := c.Spec.Patroni
if len(requiredPatroniConfig.Slots) > 0 {
slots = requiredPatroniConfig.Slots
}
// gather list of required slots and publications
for _, stream := range c.Spec.Streams {
slot := map[string]string{
"database": stream.Database,
"plugin": constants.EventStreamSourcePluginType,
"type": "logical",
}
slotName := getSlotName(stream.Database, stream.ApplicationId)
if _, exists := slots[slotName]; !exists {
slots[slotName] = slot
publications[slotName] = stream.Tables
} else {
streamTables := publications[slotName]
for tableName, table := range stream.Tables {
if _, exists := streamTables[tableName]; !exists {
streamTables[tableName] = table
}
}
publications[slotName] = streamTables
}
}
// no slots = no streams defined
if len(slots) > 0 {
requiredPatroniConfig.Slots = slots
} else {
return nil
}
// add extra logical slots to Patroni config
c.logger.Debug("syncing Postgres config for logical decoding")
err = c.syncPostgresConfig(requiredPatroniConfig)
if err != nil {
return fmt.Errorf("could not update Postgres config for event streaming: %v", err)
return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err)
}
// next, create publications to each created slot
c.logger.Debug("syncing database publications")
for publication, tables := range publications {
// but first check for existing publications
dbName := slots[publication]["database"]
err = c.syncPublication(publication, dbName, tables)
if err != nil {
c.logger.Warningf("could not sync publication %q in database %d: %v", publication, dbName, err)
}
}
err = c.createOrUpdateStreams()
@ -331,7 +340,11 @@ func (c *Cluster) createOrUpdateStreams() error {
}
c.logger.Infof("event streams do not exist, create it")
c.createStreams(appId)
err = c.createStreams(appId)
if err != nil {
return fmt.Errorf("failed creating event stream %s: %v", fesName, err)
}
c.logger.Infof("event stream %q has been successfully created", fesName)
} else {
desiredStreams := c.generateFabricEventStream(appId)
if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) {
@ -341,6 +354,7 @@ func (c *Cluster) createOrUpdateStreams() error {
if err != nil {
return fmt.Errorf("failed updating event stream %s: %v", fesName, err)
}
c.logger.Infof("event stream %q has been successfully updated", fesName)
}
}
}

View File

@ -82,8 +82,8 @@ var (
fes = &v1.FabricEventStream{
TypeMeta: metav1.TypeMeta{
Kind: "FabricEventStream",
APIVersion: "zalando.org/v1",
APIVersion: constants.EventStreamCRDApiVersion,
Kind: constants.EventStreamCRDKind,
},
ObjectMeta: metav1.ObjectMeta{
Name: fesName,

View File

@ -2,8 +2,9 @@ package constants
// PostgreSQL specific constants
const (
EventStreamSourceCRDKind = "FabricEventStream"
EventStreamSourceCRDName = "fabriceventstreams.zalando.org"
EventStreamCRDApiVersion = "zalando.org/v1"
EventStreamCRDKind = "FabricEventStream"
EventStreamCRDName = "fabriceventstreams.zalando.org"
EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotPrefix = "fes"
EventStreamSourcePluginType = "pgoutput"