remove fields from FES api and fix update

This commit is contained in:
Felix Kunde 2021-09-21 12:31:05 +02:00
parent e4c7e3648f
commit a17d63088b
4 changed files with 34 additions and 52 deletions

View File

@ -46,8 +46,6 @@ type EventStreamFlow struct {
MetadataColumn string `json:"metadataColumn,omitempty"` MetadataColumn string `json:"metadataColumn,omitempty"`
DataColumn string `json:"dataColumn,omitempty"` DataColumn string `json:"dataColumn,omitempty"`
PayloadColumn string `json:"payloadColumn,omitempty"` PayloadColumn string `json:"payloadColumn,omitempty"`
CallHomeIdColumn string `json:"callHomeIdColumn,omitempty"`
CallHomeUrl string `json:"callHomeUrl,omitempty"`
} }
// EventStreamSink defines the target of the event stream // EventStreamSink defines the target of the event stream
@ -55,8 +53,6 @@ type EventStreamSink struct {
Type string `json:"type"` Type string `json:"type"`
EventType string `json:"eventType,omitempty"` EventType string `json:"eventType,omitempty"`
MaxBatchSize uint32 `json:"maxBatchSize,omitempty"` MaxBatchSize uint32 `json:"maxBatchSize,omitempty"`
QueueName string `json:"queueName,omitempty"`
QueueUrl string `json:"queueUrl,omitempty"`
} }
// EventStreamSource defines the source of the event stream and connection for FES operator // EventStreamSource defines the source of the event stream and connection for FES operator

View File

@ -1060,7 +1060,7 @@ func (c *Cluster) initSystemUsers() {
// replication users for event streams are another exception // replication users for event streams are another exception
// the operator will create one replication user for all streams // the operator will create one replication user for all streams
if len(c.Spec.Streams) > 0 { if len(c.Spec.Streams) > 0 {
username := constants.EventStreamSourceSlotPrefix + "user" username := constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix
streamUser := spec.PgUser{ streamUser := spec.PgUser{
Origin: spec.RoleConnectionPooler, Origin: spec.RoleConnectionPooler,
Name: username, Name: username,

View File

@ -21,7 +21,7 @@ func (c *Cluster) createStreams() error {
c.setProcessName("creating streams") c.setProcessName("creating streams")
fes := c.generateFabricEventStream() fes := c.generateFabricEventStream()
_, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) _, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{})
if err != nil { if err != nil {
return fmt.Errorf("could not create event stream custom resource: %v", err) return fmt.Errorf("could not create event stream custom resource: %v", err)
} }
@ -32,7 +32,7 @@ func (c *Cluster) createStreams() error {
func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStream) error { func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStream) error {
c.setProcessName("updating event streams") c.setProcessName("updating event streams")
_, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}) _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{})
if err != nil { if err != nil {
return fmt.Errorf("could not update event stream custom resource: %v", err) return fmt.Errorf("could not update event stream custom resource: %v", err)
} }
@ -43,7 +43,7 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStre
func (c *Cluster) deleteStreams() error { func (c *Cluster) deleteStreams() error {
c.setProcessName("updating event streams") c.setProcessName("updating event streams")
err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{}) err := c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{})
if err != nil { if err != nil {
return fmt.Errorf("could not delete event stream custom resource: %v", err) return fmt.Errorf("could not delete event stream custom resource: %v", err)
} }
@ -68,7 +68,7 @@ func (c *Cluster) syncPostgresConfig() error {
"plugin": "wal2json", "plugin": "wal2json",
"type": "logical", "type": "logical",
} }
slots[constants.EventStreamSourceSlotPrefix+stream.Database] = slot slots[constants.EventStreamSourceSlotPrefix+"_"+stream.Database] = slot
} }
} }
@ -121,6 +121,10 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream
} }
return &zalandov1alpha1.FabricEventStream{ return &zalandov1alpha1.FabricEventStream{
TypeMeta: metav1.TypeMeta{
Kind: "FabricEventStream",
APIVersion: "zalando.org/v1alphav1",
},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.Name, Name: c.Name,
Namespace: c.Namespace, Namespace: c.Namespace,
@ -135,8 +139,6 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream
func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType string) zalandov1alpha1.EventStreamSource { func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType string) zalandov1alpha1.EventStreamSource {
_, schema := getTableSchema(table) _, schema := getTableSchema(table)
switch stream.StreamType {
case "nakadi":
streamFilter := stream.Filter[table] streamFilter := stream.Filter[table]
return zalandov1alpha1.EventStreamSource{ return zalandov1alpha1.EventStreamSource{
Type: constants.EventStreamSourcePGType, Type: constants.EventStreamSourcePGType,
@ -145,16 +147,6 @@ func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType st
Filter: streamFilter, Filter: streamFilter,
Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix),
} }
case "default":
return zalandov1alpha1.EventStreamSource{
Type: constants.EventStreamSourcePGType,
Schema: schema,
EventStreamTable: getSourceTable(table),
Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix),
}
}
return zalandov1alpha1.EventStreamSource{}
} }
func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow {
@ -167,7 +159,7 @@ func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow {
MetadataColumn: constants.EventStreamFlowMetadataColumn, MetadataColumn: constants.EventStreamFlowMetadataColumn,
DataColumn: constants.EventStreamFlowDataColumn, DataColumn: constants.EventStreamFlowDataColumn,
} }
case "default": case "wal":
return zalandov1alpha1.EventStreamFlow{ return zalandov1alpha1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType, Type: constants.EventStreamFlowPgGenericType,
PayloadColumn: constants.EventStreamFlowPayloadColumn, PayloadColumn: constants.EventStreamFlowPayloadColumn,
@ -203,19 +195,13 @@ func getOutboxTable(tableName, eventType string) zalandov1alpha1.EventStreamTabl
} }
} }
func getSourceTable(tableName string) zalandov1alpha1.EventStreamTable {
return zalandov1alpha1.EventStreamTable{
Name: outboxTableNameTemplate.Format("table", tableName),
}
}
func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Connection { func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Connection {
return zalandov1alpha1.Connection{ return zalandov1alpha1.Connection{
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user),
SlotName: c.getLogicalReplicationSlot(database), SlotName: c.getLogicalReplicationSlot(database),
DBAuth: zalandov1alpha1.DBAuth{ DBAuth: zalandov1alpha1.DBAuth{
Type: constants.EventStreamSourceAuthType, Type: constants.EventStreamSourceAuthType,
Name: c.credentialSecretNameForCluster(user, c.ClusterName), Name: c.credentialSecretNameForCluster(user, c.Name),
UserKey: "username", UserKey: "username",
PasswordKey: "password", PasswordKey: "password",
}, },
@ -224,12 +210,12 @@ func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Con
func (c *Cluster) getLogicalReplicationSlot(database string) string { func (c *Cluster) getLogicalReplicationSlot(database string) string {
for slotName, slot := range c.Spec.Patroni.Slots { for slotName, slot := range c.Spec.Patroni.Slots {
if strings.HasPrefix(slotName, constants.EventStreamSourceSlotPrefix) && slot["type"] == "logical" && slot["database"] == database { if slot["type"] == "logical" && slot["database"] == database {
return slotName return slotName
} }
} }
return "" return constants.EventStreamSourceSlotPrefix + "_" + database
} }
func (c *Cluster) syncStreams() error { func (c *Cluster) syncStreams() error {
@ -247,18 +233,19 @@ func (c *Cluster) syncStreams() error {
return fmt.Errorf("error during reading of event streams: %v", err) return fmt.Errorf("error during reading of event streams: %v", err)
} }
c.logger.Infof("event streams do not exist") c.logger.Infof("event streams do not exist, create it")
err := c.createStreams() err := c.createStreams()
if err != nil { if err != nil {
return fmt.Errorf("could not create missing streams: %v", err) return fmt.Errorf("event stream creation failed: %v", err)
} }
} else { } else {
if err != nil {
c.logger.Warnf("database setup might be incomplete : %v", err)
}
desiredStreams := c.generateFabricEventStream() desiredStreams := c.generateFabricEventStream()
if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) {
c.updateStreams(desiredStreams) desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion
err = c.updateStreams(desiredStreams)
if err != nil {
return fmt.Errorf("event stream update failed: %v", err)
}
} }
} }

View File

@ -3,11 +3,10 @@ package constants
// PostgreSQL specific constants // PostgreSQL specific constants
const ( const (
EventStreamSourcePGType = "PostgresLogicalReplication" EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotPrefix = "fes_" EventStreamSourceSlotPrefix = "fes"
EventStreamSourceAuthType = "DatabaseAuthenticationSecret" EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent" EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent"
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent" EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent"
EventStreamFlowDataTypeColumn = "data_type" EventStreamFlowDataTypeColumn = "data_type"
EventStreamFlowDataOpColumn = "data_op" EventStreamFlowDataOpColumn = "data_op"
EventStreamFlowMetadataColumn = "metadata" EventStreamFlowMetadataColumn = "metadata"