remove sqs and a few bugs

This commit is contained in:
Felix Kunde 2021-09-16 15:58:11 +02:00
parent 79bd69d3ab
commit e4c7e3648f
12 changed files with 47 additions and 112 deletions

View File

@ -477,8 +477,6 @@ spec:
nullable: true
items:
type: object
required:
- streamType
properties:
batchSize:
type: integer
@ -488,12 +486,6 @@ spec:
type: object
additionalProperties:
type: string
queueName:
type: string
sqsArn:
type: string
sqsFifo:
type: boolean
tables:
type: object
additionalProperties:
@ -502,7 +494,6 @@ spec:
type: string
enum:
- "nakadi"
- "sqs"
- "wal"
teamId:
type: string

View File

@ -523,8 +523,9 @@ CRD for Zalando's internal CDC operator named like the Postgres cluster.
Each stream object can have the following properties:
* **streamType**
Defines the sink. Either `nakadi`, `sqs` or `wal` (which is just plain wal
files). Required.
Defines the stream flow. Choose `nakadi` when you want to specify certain
nakadi event types of or `wal` if changes should be mapped to a generic
event type. Default is `wal`.
* **database**
Name of the database from where events will be published via Postgres'
@ -545,9 +546,3 @@ Each stream object can have the following properties:
* **batchSize**
Defines the size of batches in which events are consumed.
* **sqsArn**
ARN to the SQS service used as event sink when streamType is `sqs`.
* **queueName**
Name of the queue to be used in SQS service when streamType is `sqs`.

View File

@ -198,23 +198,15 @@ spec:
# Enables change data capture streams for defined database tables
# streams:
# - type: nakadi
# - streamType: nakadi
# batchSize: 100
# database: foo
# tables:
# ta: event_type_a
# tb: event_type_b
# - type: wal
# ta: event_type_a
# tb: event_type_b
# - streamType: wal
# batchSize: 100
# database: foo
# tables:
# public.tx: event_type_a
# public.ty: event_type_b
# - type: sqs
# database: foo
# tables:
# ta: ""
# tb: ""
# sqsArn: arn:aws:sqs:eu-central-1:111122223333
# sqsFifo: true
# queueName: foo-queue
# public.tx: event_type_a
# public.ty: event_type_b

View File

@ -39,7 +39,7 @@ rules:
- apiGroups:
- zalando.org
resources:
- fabriceventstream
- fabriceventstreams
verbs:
- create
- delete

View File

@ -473,8 +473,6 @@ spec:
nullable: true
items:
type: object
required:
- streamType
properties:
batchSize:
type: integer
@ -484,12 +482,6 @@ spec:
type: object
additionalProperties:
type: string
queueName:
type: string
sqsArn:
type: string
sqsFifo:
type: boolean
tables:
type: object
additionalProperties:
@ -498,7 +490,6 @@ spec:
type: string
enum:
- "nakadi"
- "sqs"
- "wal"
teamId:
type: string

View File

@ -664,8 +664,7 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
Type: "array",
Items: &apiextv1.JSONSchemaPropsOrArray{
Schema: &apiextv1.JSONSchemaProps{
Type: "object",
Required: []string{"streamType"},
Type: "object",
Properties: map[string]apiextv1.JSONSchemaProps{
"batchSize": {
Type: "integer",
@ -681,15 +680,6 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
},
},
},
"queueName": {
Type: "string",
},
"sqsArn": {
Type: "string",
},
"sqsFifo": {
Type: "boolean",
},
"tables": {
Type: "object",
AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{
@ -704,9 +694,6 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
{
Raw: []byte(`"nakadi"`),
},
{
Raw: []byte(`"sqs"`),
},
{
Raw: []byte(`"wal"`),
},

View File

@ -74,7 +74,7 @@ type PostgresSpec struct {
ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"`
TLS *TLSDescription `json:"tls,omitempty"`
AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"`
Streams []Stream `json:"stream,omitempty"`
Streams []Stream `json:"streams,omitempty"`
// deprecated json tags
InitContainersOld []v1.Container `json:"init_containers,omitempty"`
@ -229,12 +229,9 @@ type ConnectionPooler struct {
}
type Stream struct {
StreamType string `json:"streamType"`
StreamType string `json:"streamType,omitempty"`
Database string `json:"database,omitempty"`
Tables map[string]string `json:"tables,omitempty"`
Filter map[string]string `json:"filter,omitempty"`
BatchSize uint32 `json:"batchSize,omitempty"`
SqsArn string `json:"sqsArn,omitempty"`
SqsFifo bool `json:"sqsFifo,omitempty"`
QueueName string `json:"queueName,omitempty"`
}

View File

@ -361,8 +361,8 @@ func (c *Cluster) Create() error {
// something fails, report warning
c.createConnectionPooler(c.installLookupFunction)
if len(c.Spec.Streams) > 0 {
c.syncStreams()
if err = c.syncStreams(); err != nil {
return fmt.Errorf("could not create streams: %v", err)
}
return nil
@ -1060,7 +1060,7 @@ func (c *Cluster) initSystemUsers() {
// replication users for event streams are another exception
// the operator will create one replication user for all streams
if len(c.Spec.Streams) > 0 {
username := constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix
username := constants.EventStreamSourceSlotPrefix + "user"
streamUser := spec.PgUser{
Origin: spec.RoleConnectionPooler,
Name: username,

View File

@ -56,14 +56,13 @@ func (c *Cluster) syncPostgresConfig() error {
desiredPostgresConfig := make(map[string]interface{})
slots := make(map[string]map[string]string)
c.logger.Debugf("setting wal level to 'logical' in postgres configuration")
// if streams are defined wal_level must be switched to logical and slots have to be defined
desiredPostgresConfig["postgresql"] = map[string]interface{}{patroniPGParametersParameterName: map[string]string{"wal_level": "logical"}}
for _, stream := range c.Spec.Streams {
slotName := c.getLogicalReplicationSlot(stream.Database)
if slotName == "" {
c.logger.Debugf("creating logical replication slot %q in database %q", constants.EventStreamSourceSlotPrefix+stream.Database, stream.Database)
slot := map[string]string{
"database": stream.Database,
"plugin": "wal2json",
@ -74,6 +73,10 @@ func (c *Cluster) syncPostgresConfig() error {
}
if len(slots) > 0 {
c.logger.Debugf("setting wal level to 'logical' in Postgres configuration")
for slotName, slot := range slots {
c.logger.Debugf("creating logical replication slot %q in database %q", slotName, slot["database"])
}
desiredPostgresConfig["slots"] = slots
} else {
return nil
@ -175,27 +178,11 @@ func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow {
}
func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1alpha1.EventStreamSink {
switch stream.StreamType {
case "sqs":
sqsSinkType := constants.EventStreamSinkSqsStandardType
if stream.SqsFifo {
sqsSinkType = constants.EventStreamSinkSqsFifoType
}
return zalandov1alpha1.EventStreamSink{
Type: sqsSinkType,
QueueName: stream.QueueName,
QueueUrl: stream.SqsArn,
MaxBatchSize: stream.BatchSize,
}
case "default":
return zalandov1alpha1.EventStreamSink{
Type: constants.EventStreamSinkNakadiType,
EventType: eventType,
MaxBatchSize: stream.BatchSize,
}
return zalandov1alpha1.EventStreamSink{
Type: constants.EventStreamSinkNakadiType,
EventType: eventType,
MaxBatchSize: stream.BatchSize,
}
return zalandov1alpha1.EventStreamSink{}
}
func getTableSchema(fullTableName string) (tableName, schemaName string) {

View File

@ -62,15 +62,6 @@ var (
},
BatchSize: uint32(100),
},
{
StreamType: "sqs",
Database: "foo",
Tables: map[string]string{
"bar": "stream_type_a",
},
SqsArn: "arn:aws:sqs:eu-central-1:111122223333",
QueueName: "foo-queue",
},
},
Users: map[string]acidv1.UserFlags{
"foo_user": []string{"replication"},

View File

@ -72,7 +72,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
return err
}
c.logger.Debugf("syncing statefulsets")
c.logger.Debug("syncing statefulsets")
if err = c.syncStatefulSet(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
err = fmt.Errorf("could not sync statefulsets: %v", err)
@ -98,17 +98,17 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
// create database objects unless we are running without pods or disabled that feature explicitly
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
c.logger.Debugf("syncing roles")
c.logger.Debug("syncing roles")
if err = c.syncRoles(); err != nil {
err = fmt.Errorf("could not sync roles: %v", err)
return err
}
c.logger.Debugf("syncing databases")
c.logger.Debug("syncing databases")
if err = c.syncDatabases(); err != nil {
err = fmt.Errorf("could not sync databases: %v", err)
return err
}
c.logger.Debugf("syncing prepared databases with schemas")
c.logger.Debug("syncing prepared databases with schemas")
if err = c.syncPreparedDatabases(); err != nil {
err = fmt.Errorf("could not sync prepared database: %v", err)
return err
@ -120,6 +120,12 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
return fmt.Errorf("could not sync connection pooler: %v", err)
}
c.logger.Debug("syncing streams")
if err = c.syncStreams(); err != nil {
err = fmt.Errorf("could not sync streams: %v", err)
return err
}
// Major version upgrade must only run after success of all earlier operations, must remain last item in sync
if err := c.majorVersionUpgrade(); err != nil {
c.logger.Errorf("major version upgrade failed: %v", err)

View File

@ -2,18 +2,16 @@ package constants
// PostgreSQL specific constants
const (
EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotPrefix = "fes_"
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent"
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent"
EventStreamFlowDataTypeColumn = "data_type"
EventStreamFlowDataOpColumn = "data_op"
EventStreamFlowMetadataColumn = "metadata"
EventStreamFlowDataColumn = "data"
EventStreamFlowPayloadColumn = "payload"
EventStreamSinkNakadiType = "Nakadi"
EventStreamSinkSqsStandardType = "SqsStandard"
EventStreamSinkSqsFifoType = "SqsFifo"
EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotPrefix = "fes_"
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent"
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent"
EventStreamFlowDataTypeColumn = "data_type"
EventStreamFlowDataOpColumn = "data_op"
EventStreamFlowMetadataColumn = "metadata"
EventStreamFlowDataColumn = "data"
EventStreamFlowPayloadColumn = "payload"
EventStreamSinkNakadiType = "Nakadi"
)