From a17d63088b7d293fdb062d8eae8f744e72a72000 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 21 Sep 2021 12:31:05 +0200 Subject: [PATCH] remove fields from FES api and fix update --- .../zalando.org/v1alpha1/fabriceventstream.go | 16 ++--- pkg/cluster/cluster.go | 2 +- pkg/cluster/streams.go | 65 ++++++++----------- pkg/util/constants/streams.go | 3 +- 4 files changed, 34 insertions(+), 52 deletions(-) diff --git a/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go b/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go index 36f68ca2f..111179d07 100644 --- a/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go +++ b/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go @@ -40,14 +40,12 @@ type EventStream struct { // EventStreamFlow defines the flow characteristics of the event stream type EventStreamFlow struct { - Type string `json:"type"` - DataTypeColumn string `json:"dataTypeColumn,omitempty"` - DataOpColumn string `json:"dataOpColumn,omitempty"` - MetadataColumn string `json:"metadataColumn,omitempty"` - DataColumn string `json:"dataColumn,omitempty"` - PayloadColumn string `json:"payloadColumn,omitempty"` - CallHomeIdColumn string `json:"callHomeIdColumn,omitempty"` - CallHomeUrl string `json:"callHomeUrl,omitempty"` + Type string `json:"type"` + DataTypeColumn string `json:"dataTypeColumn,omitempty"` + DataOpColumn string `json:"dataOpColumn,omitempty"` + MetadataColumn string `json:"metadataColumn,omitempty"` + DataColumn string `json:"dataColumn,omitempty"` + PayloadColumn string `json:"payloadColumn,omitempty"` } // EventStreamSink defines the target of the event stream @@ -55,8 +53,6 @@ type EventStreamSink struct { Type string `json:"type"` EventType string `json:"eventType,omitempty"` MaxBatchSize uint32 `json:"maxBatchSize,omitempty"` - QueueName string `json:"queueName,omitempty"` - QueueUrl string `json:"queueUrl,omitempty"` } // EventStreamSource defines the source of the event stream and connection for FES operator diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 569ba463c..195bfba27 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1060,7 +1060,7 @@ func (c *Cluster) initSystemUsers() { // replication users for event streams are another exception // the operator will create one replication user for all streams if len(c.Spec.Streams) > 0 { - username := constants.EventStreamSourceSlotPrefix + "user" + username := constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix streamUser := spec.PgUser{ Origin: spec.RoleConnectionPooler, Name: username, diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 5d33fa1c3..e9e3ce66c 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -21,7 +21,7 @@ func (c *Cluster) createStreams() error { c.setProcessName("creating streams") fes := c.generateFabricEventStream() - _, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) + _, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("could not create event stream custom resource: %v", err) } @@ -32,7 +32,7 @@ func (c *Cluster) createStreams() error { func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStream) error { c.setProcessName("updating event streams") - _, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}) + _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("could not update event stream custom resource: %v", err) } @@ -43,7 +43,7 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStre func (c *Cluster) deleteStreams() error { c.setProcessName("updating event streams") - err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{}) + err := c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("could not delete event stream custom resource: %v", err) } @@ -68,7 +68,7 @@ func (c *Cluster) syncPostgresConfig() error { "plugin": "wal2json", "type": "logical", } - slots[constants.EventStreamSourceSlotPrefix+stream.Database] = slot + slots[constants.EventStreamSourceSlotPrefix+"_"+stream.Database] = slot } } @@ -121,6 +121,10 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream } return &zalandov1alpha1.FabricEventStream{ + TypeMeta: metav1.TypeMeta{ + Kind: "FabricEventStream", + APIVersion: "zalando.org/v1alphav1", + }, ObjectMeta: metav1.ObjectMeta{ Name: c.Name, Namespace: c.Namespace, @@ -135,26 +139,14 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType string) zalandov1alpha1.EventStreamSource { _, schema := getTableSchema(table) - switch stream.StreamType { - case "nakadi": - streamFilter := stream.Filter[table] - return zalandov1alpha1.EventStreamSource{ - Type: constants.EventStreamSourcePGType, - Schema: schema, - EventStreamTable: getOutboxTable(table, eventType), - Filter: streamFilter, - Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), - } - case "default": - return zalandov1alpha1.EventStreamSource{ - Type: constants.EventStreamSourcePGType, - Schema: schema, - EventStreamTable: getSourceTable(table), - Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), - } + streamFilter := stream.Filter[table] + return zalandov1alpha1.EventStreamSource{ + Type: constants.EventStreamSourcePGType, + Schema: schema, + EventStreamTable: getOutboxTable(table, eventType), + Filter: streamFilter, + Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), } - - return zalandov1alpha1.EventStreamSource{} } func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { @@ -167,7 +159,7 @@ func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { MetadataColumn: constants.EventStreamFlowMetadataColumn, DataColumn: constants.EventStreamFlowDataColumn, } - case "default": + case "wal": return zalandov1alpha1.EventStreamFlow{ Type: constants.EventStreamFlowPgGenericType, PayloadColumn: constants.EventStreamFlowPayloadColumn, @@ -203,19 +195,13 @@ func getOutboxTable(tableName, eventType string) zalandov1alpha1.EventStreamTabl } } -func getSourceTable(tableName string) zalandov1alpha1.EventStreamTable { - return zalandov1alpha1.EventStreamTable{ - Name: outboxTableNameTemplate.Format("table", tableName), - } -} - func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Connection { return zalandov1alpha1.Connection{ Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), SlotName: c.getLogicalReplicationSlot(database), DBAuth: zalandov1alpha1.DBAuth{ Type: constants.EventStreamSourceAuthType, - Name: c.credentialSecretNameForCluster(user, c.ClusterName), + Name: c.credentialSecretNameForCluster(user, c.Name), UserKey: "username", PasswordKey: "password", }, @@ -224,12 +210,12 @@ func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Con func (c *Cluster) getLogicalReplicationSlot(database string) string { for slotName, slot := range c.Spec.Patroni.Slots { - if strings.HasPrefix(slotName, constants.EventStreamSourceSlotPrefix) && slot["type"] == "logical" && slot["database"] == database { + if slot["type"] == "logical" && slot["database"] == database { return slotName } } - return "" + return constants.EventStreamSourceSlotPrefix + "_" + database } func (c *Cluster) syncStreams() error { @@ -247,18 +233,19 @@ func (c *Cluster) syncStreams() error { return fmt.Errorf("error during reading of event streams: %v", err) } - c.logger.Infof("event streams do not exist") + c.logger.Infof("event streams do not exist, create it") err := c.createStreams() if err != nil { - return fmt.Errorf("could not create missing streams: %v", err) + return fmt.Errorf("event stream creation failed: %v", err) } } else { - if err != nil { - c.logger.Warnf("database setup might be incomplete : %v", err) - } desiredStreams := c.generateFabricEventStream() if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { - c.updateStreams(desiredStreams) + desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion + err = c.updateStreams(desiredStreams) + if err != nil { + return fmt.Errorf("event stream update failed: %v", err) + } } } diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go index 1a8028747..fb721d309 100644 --- a/pkg/util/constants/streams.go +++ b/pkg/util/constants/streams.go @@ -3,11 +3,10 @@ package constants // PostgreSQL specific constants const ( EventStreamSourcePGType = "PostgresLogicalReplication" - EventStreamSourceSlotPrefix = "fes_" + EventStreamSourceSlotPrefix = "fes" EventStreamSourceAuthType = "DatabaseAuthenticationSecret" EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent" EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent" - EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent" EventStreamFlowDataTypeColumn = "data_type" EventStreamFlowDataOpColumn = "data_op" EventStreamFlowMetadataColumn = "metadata"