diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 301db97e4..e3374f6e1 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -132,7 +132,7 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream return &zalandov1alpha1.FabricEventStream{ TypeMeta: metav1.TypeMeta{ Kind: constants.EventStreamSourceCRDKind, - APIVersion: "zalando.org/v1alphav1", + APIVersion: "zalando.org/v1alpha1", }, ObjectMeta: metav1.ObjectMeta{ Name: c.Name, @@ -149,12 +149,12 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream } func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn string) zalandov1alpha1.EventStreamSource { - _, schema := getTableSchema(tableName) + table, schema := getTableSchema(tableName) streamFilter := stream.Filter[tableName] return zalandov1alpha1.EventStreamSource{ Type: constants.EventStreamSourcePGType, Schema: schema, - EventStreamTable: getOutboxTable(tableName, idColumn), + EventStreamTable: getOutboxTable(table, idColumn), Filter: streamFilter, Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 8a339b11a..a18a00d67 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -2,6 +2,7 @@ package cluster import ( "encoding/json" + "fmt" "reflect" "context" @@ -9,8 +10,11 @@ import ( "github.com/stretchr/testify/assert" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" fakezalandov1alpha1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" + "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,13 +30,16 @@ func newFakeK8sStreamClient() (k8sutil.KubernetesClient, *fake.Clientset) { FabricEventStreamsGetter: zalandoClientSet.ZalandoV1alpha1(), PostgresqlsGetter: zalandoClientSet.AcidV1(), PodsGetter: clientSet.CoreV1(), + StatefulSetsGetter: clientSet.AppsV1(), }, clientSet } var ( clusterName string = "acid-test-cluster" namespace string = "default" - pg = acidv1.Postgresql{ + fesUser string = constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix + + pg = acidv1.Postgresql{ TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", @@ -40,6 +47,7 @@ var ( ObjectMeta: metav1.ObjectMeta{ Name: clusterName, Namespace: namespace, + Labels: map[string]string{"application": "test"}, }, Spec: acidv1.PostgresSpec{ Databases: map[string]string{ @@ -49,12 +57,15 @@ var ( { Database: "foo", Tables: map[string]acidv1.StreamTable{ - "bar": acidv1.StreamTable{ + "data.bar": acidv1.StreamTable{ EventType: "stream_type_a", IdColumn: "b_id", PayloadColumn: "b_payload", }, }, + Filter: map[string]string{ + "data.bar": "[?(@.source.txId > 500 && @.source.lsn > 123456)]", + }, BatchSize: uint32(100), }, }, @@ -66,6 +77,60 @@ var ( }, }, } + + fes = &v1alpha1.FabricEventStream{ + TypeMeta: metav1.TypeMeta{ + Kind: "FabricEventStream", + APIVersion: "zalando.org/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + metav1.OwnerReference{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "acid-test-cluster", + Controller: util.True(), + }, + }, + }, + Spec: v1alpha1.FabricEventStreamSpec{ + ApplicationId: "test", + EventStreams: []v1alpha1.EventStream{ + { + EventStreamFlow: v1alpha1.EventStreamFlow{ + PayloadColumn: "b_payload", + Type: constants.EventStreamFlowPgGenericType, + }, + EventStreamSink: v1alpha1.EventStreamSink{ + EventType: "stream_type_a", + MaxBatchSize: uint32(100), + Type: constants.EventStreamSinkNakadiType, + }, + EventStreamSource: v1alpha1.EventStreamSource{ + Filter: "[?(@.source.txId > 500 && @.source.lsn > 123456)]", + Connection: v1alpha1.Connection{ + DBAuth: v1alpha1.DBAuth{ + Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName), + PasswordKey: "password", + Type: constants.EventStreamSourceAuthType, + UserKey: "username", + }, + Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser), + SlotName: "fes_foo", + }, + Schema: "data", + EventStreamTable: v1alpha1.EventStreamTable{ + IDColumn: "b_id", + Name: "bar", + }, + Type: constants.EventStreamSourcePGType, + }, + }, + }, + }, + } ) func TestGenerateFabricEventStream(t *testing.T) { @@ -74,6 +139,9 @@ func TestGenerateFabricEventStream(t *testing.T) { var cluster = New( Config{ OpConfig: config.Config{ + Auth: config.Auth{ + SecretNameTemplate: "{username}.{cluster}.credentials.{tprkind}.{tprgroup}", + }, PodManagementPolicy: "ordered_ready", Resources: config.Resources{ ClusterLabels: map[string]string{"application": "spilo"}, @@ -87,15 +155,26 @@ func TestGenerateFabricEventStream(t *testing.T) { }, }, client, pg, logger, eventRecorder) - err := cluster.createOrUpdateStreams() + cluster.Name = clusterName + cluster.Namespace = namespace + + _, err := cluster.createStatefulSet() assert.NoError(t, err) + err = cluster.createOrUpdateStreams() + assert.NoError(t, err) + + result := cluster.generateFabricEventStream() + + if !reflect.DeepEqual(result, fes) { + t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, result) + } + streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), cluster.Name, metav1.GetOptions{}) assert.NoError(t, err) - result := cluster.generateFabricEventStream() - if !reflect.DeepEqual(result, streamCRD) { - t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result) + if !reflect.DeepEqual(streamCRD, fes) { + t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, streamCRD) } } @@ -129,7 +208,7 @@ func TestUpdateFabricEventStream(t *testing.T) { { Database: "foo", Tables: map[string]acidv1.StreamTable{ - "bar": acidv1.StreamTable{ + "data.bar": acidv1.StreamTable{ EventType: "stream_type_b", IdColumn: "b_id", PayloadColumn: "b_payload",