use defined fes CRD in unit test

This commit is contained in:
Felix Kunde 2021-12-07 22:32:35 +01:00
parent 90d6016dc8
commit 96a2da1fca
2 changed files with 89 additions and 10 deletions

View File

@ -132,7 +132,7 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream
return &zalandov1alpha1.FabricEventStream{ return &zalandov1alpha1.FabricEventStream{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: constants.EventStreamSourceCRDKind, Kind: constants.EventStreamSourceCRDKind,
APIVersion: "zalando.org/v1alphav1", APIVersion: "zalando.org/v1alpha1",
}, },
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.Name, Name: c.Name,
@ -149,12 +149,12 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream
} }
func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn string) zalandov1alpha1.EventStreamSource { func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn string) zalandov1alpha1.EventStreamSource {
_, schema := getTableSchema(tableName) table, schema := getTableSchema(tableName)
streamFilter := stream.Filter[tableName] streamFilter := stream.Filter[tableName]
return zalandov1alpha1.EventStreamSource{ return zalandov1alpha1.EventStreamSource{
Type: constants.EventStreamSourcePGType, Type: constants.EventStreamSourcePGType,
Schema: schema, Schema: schema,
EventStreamTable: getOutboxTable(tableName, idColumn), EventStreamTable: getOutboxTable(table, idColumn),
Filter: streamFilter, Filter: streamFilter,
Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix), Connection: c.getStreamConnection(stream.Database, constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix),
} }

View File

@ -2,6 +2,7 @@ package cluster
import ( import (
"encoding/json" "encoding/json"
"fmt"
"reflect" "reflect"
"context" "context"
@ -9,8 +10,11 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1"
fakezalandov1alpha1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" fakezalandov1alpha1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/k8sutil"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -26,12 +30,15 @@ func newFakeK8sStreamClient() (k8sutil.KubernetesClient, *fake.Clientset) {
FabricEventStreamsGetter: zalandoClientSet.ZalandoV1alpha1(), FabricEventStreamsGetter: zalandoClientSet.ZalandoV1alpha1(),
PostgresqlsGetter: zalandoClientSet.AcidV1(), PostgresqlsGetter: zalandoClientSet.AcidV1(),
PodsGetter: clientSet.CoreV1(), PodsGetter: clientSet.CoreV1(),
StatefulSetsGetter: clientSet.AppsV1(),
}, clientSet }, clientSet
} }
var ( var (
clusterName string = "acid-test-cluster" clusterName string = "acid-test-cluster"
namespace string = "default" namespace string = "default"
fesUser string = constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix
pg = acidv1.Postgresql{ pg = acidv1.Postgresql{
TypeMeta: metav1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: "Postgresql", Kind: "Postgresql",
@ -40,6 +47,7 @@ var (
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: clusterName, Name: clusterName,
Namespace: namespace, Namespace: namespace,
Labels: map[string]string{"application": "test"},
}, },
Spec: acidv1.PostgresSpec{ Spec: acidv1.PostgresSpec{
Databases: map[string]string{ Databases: map[string]string{
@ -49,12 +57,15 @@ var (
{ {
Database: "foo", Database: "foo",
Tables: map[string]acidv1.StreamTable{ Tables: map[string]acidv1.StreamTable{
"bar": acidv1.StreamTable{ "data.bar": acidv1.StreamTable{
EventType: "stream_type_a", EventType: "stream_type_a",
IdColumn: "b_id", IdColumn: "b_id",
PayloadColumn: "b_payload", PayloadColumn: "b_payload",
}, },
}, },
Filter: map[string]string{
"data.bar": "[?(@.source.txId > 500 && @.source.lsn > 123456)]",
},
BatchSize: uint32(100), BatchSize: uint32(100),
}, },
}, },
@ -66,6 +77,60 @@ var (
}, },
}, },
} }
fes = &v1alpha1.FabricEventStream{
TypeMeta: metav1.TypeMeta{
Kind: "FabricEventStream",
APIVersion: "zalando.org/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
metav1.OwnerReference{
APIVersion: "apps/v1",
Kind: "StatefulSet",
Name: "acid-test-cluster",
Controller: util.True(),
},
},
},
Spec: v1alpha1.FabricEventStreamSpec{
ApplicationId: "test",
EventStreams: []v1alpha1.EventStream{
{
EventStreamFlow: v1alpha1.EventStreamFlow{
PayloadColumn: "b_payload",
Type: constants.EventStreamFlowPgGenericType,
},
EventStreamSink: v1alpha1.EventStreamSink{
EventType: "stream_type_a",
MaxBatchSize: uint32(100),
Type: constants.EventStreamSinkNakadiType,
},
EventStreamSource: v1alpha1.EventStreamSource{
Filter: "[?(@.source.txId > 500 && @.source.lsn > 123456)]",
Connection: v1alpha1.Connection{
DBAuth: v1alpha1.DBAuth{
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
PasswordKey: "password",
Type: constants.EventStreamSourceAuthType,
UserKey: "username",
},
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser),
SlotName: "fes_foo",
},
Schema: "data",
EventStreamTable: v1alpha1.EventStreamTable{
IDColumn: "b_id",
Name: "bar",
},
Type: constants.EventStreamSourcePGType,
},
},
},
},
}
) )
func TestGenerateFabricEventStream(t *testing.T) { func TestGenerateFabricEventStream(t *testing.T) {
@ -74,6 +139,9 @@ func TestGenerateFabricEventStream(t *testing.T) {
var cluster = New( var cluster = New(
Config{ Config{
OpConfig: config.Config{ OpConfig: config.Config{
Auth: config.Auth{
SecretNameTemplate: "{username}.{cluster}.credentials.{tprkind}.{tprgroup}",
},
PodManagementPolicy: "ordered_ready", PodManagementPolicy: "ordered_ready",
Resources: config.Resources{ Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"}, ClusterLabels: map[string]string{"application": "spilo"},
@ -87,15 +155,26 @@ func TestGenerateFabricEventStream(t *testing.T) {
}, },
}, client, pg, logger, eventRecorder) }, client, pg, logger, eventRecorder)
err := cluster.createOrUpdateStreams() cluster.Name = clusterName
cluster.Namespace = namespace
_, err := cluster.createStatefulSet()
assert.NoError(t, err) assert.NoError(t, err)
err = cluster.createOrUpdateStreams()
assert.NoError(t, err)
result := cluster.generateFabricEventStream()
if !reflect.DeepEqual(result, fes) {
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, result)
}
streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), cluster.Name, metav1.GetOptions{}) streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), cluster.Name, metav1.GetOptions{})
assert.NoError(t, err) assert.NoError(t, err)
result := cluster.generateFabricEventStream() if !reflect.DeepEqual(streamCRD, fes) {
if !reflect.DeepEqual(result, streamCRD) { t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, streamCRD)
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result)
} }
} }
@ -129,7 +208,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
{ {
Database: "foo", Database: "foo",
Tables: map[string]acidv1.StreamTable{ Tables: map[string]acidv1.StreamTable{
"bar": acidv1.StreamTable{ "data.bar": acidv1.StreamTable{
EventType: "stream_type_b", EventType: "stream_type_b",
IdColumn: "b_id", IdColumn: "b_id",
PayloadColumn: "b_payload", PayloadColumn: "b_payload",