check manifest settings for logical decoding before creating streams

This commit is contained in:
Felix Kunde 2021-07-29 14:29:42 +02:00
parent 1bd12961fd
commit 11f3715ed1
3 changed files with 57 additions and 4 deletions

View File

@ -19,8 +19,13 @@ var outboxTableNameTemplate config.StringTemplate = "{table}_{eventtype}_outbox"
func (c *Cluster) createStreams() error {
c.setProcessName("creating streams")
logicalDecodingEnabled, err := c.logicalDecodingEnabled()
if !logicalDecodingEnabled || err != nil {
return fmt.Errorf("logical decoding setup incomplete: %v", err)
}
fes := c.generateFabricEventStream()
_, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{})
_, err = c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("could not create event stream custom resource: %v", err)
}
@ -28,6 +33,29 @@ func (c *Cluster) createStreams() error {
return nil
}
func (c *Cluster) logicalDecodingEnabled() (bool, error) {
var errors []string
walLevel := c.Spec.PostgresqlParam.Parameters["wal_level"]
if walLevel == "" || walLevel != "logical" {
errors = append(errors, "setting 'wal_level: logical' missing under spec.postgresql.parameters")
}
for _, stream := range c.Spec.Streams {
slotName := c.getLogicalReplicationSlot(stream.Database)
if slotName == "" {
errors = append(errors, fmt.Sprintf("no logical replication slot defined under spec.patroni.slots for database %q", stream.Database))
}
}
if len(errors) > 0 {
return false, fmt.Errorf("logical decoding setup incomplete: %v", errors)
}
return true, nil
}
func (c *Cluster) syncStreamDbResources() error {
for _, stream := range c.Spec.Streams {
@ -148,7 +176,7 @@ func getOutboxTable(tableName, eventType string) zalandov1alpha1.EventStreamTabl
func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Connection {
return zalandov1alpha1.Connection{
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user),
SlotName: constants.EventStreamSourceSlotName,
SlotName: c.getLogicalReplicationSlot(database),
DBAuth: zalandov1alpha1.DBAuth{
Type: constants.EventStreamSourceAuthType,
Name: c.credentialSecretNameForCluster(user, c.ClusterName),
@ -158,6 +186,16 @@ func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Con
}
}
func (c *Cluster) getLogicalReplicationSlot(database string) string {
for slotName, slot := range c.Spec.Patroni.Slots {
if strings.HasPrefix(slotName, constants.EventStreamSourceSlotPrefix) && slot["type"] == "logical" && slot["database"] == database {
return slotName
}
}
return ""
}
func (c *Cluster) syncStreams() error {
c.setProcessName("syncing streams")

View File

@ -40,6 +40,20 @@ func TestGenerateFabricEventStream(t *testing.T) {
Databases: map[string]string{
"foo": "foo_user",
},
Patroni: acidv1.Patroni{
Slots: map[string]map[string]string{
"fes": {
"type": "logical",
"database": "foo",
"plugin": "wal2json",
},
},
},
PostgresqlParam: acidv1.PostgresqlParam{
Parameters: map[string]string{
"wal_level": "logical",
},
},
Streams: []acidv1.Stream{
{
Type: "nakadi",
@ -93,7 +107,8 @@ func TestGenerateFabricEventStream(t *testing.T) {
},
}, client, pg, logger, eventRecorder)
cluster.syncStreams()
err := cluster.syncStreams()
assert.NoError(t, err)
streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), cluster.Name+constants.FESsuffix, metav1.GetOptions{})
assert.NoError(t, err)

View File

@ -4,7 +4,7 @@ package constants
const (
FESsuffix = "-event-streams"
EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotName = "fes"
EventStreamSourceSlotPrefix = "fes"
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent"
EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent"