From 4534a4cd9e4701286dde554aa914770e7355d745 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 30 Dec 2022 13:09:15 +0100 Subject: [PATCH] fix syncing of stream CRDs (#2152) * fix syncing of stream CRDs and improve corresponding unit tests --- pkg/cluster/streams.go | 18 ++++++++---- pkg/cluster/streams_test.go | 57 +++++++++++++++++++++++-------------- 2 files changed, 48 insertions(+), 27 deletions(-) diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index bf1c2b742..bb03c2e36 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -374,13 +374,16 @@ func (c *Cluster) createOrUpdateStreams() error { } for _, appId := range appIds { + streamExists := false + // update stream when it exists and EventStreams array differs for _, stream := range streams.Items { if appId == stream.Spec.ApplicationId { + streamExists = true desiredStreams := c.generateFabricEventStream(appId) if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match { c.logger.Debugf("updating event streams: %s", reason) - desiredStreams.ObjectMeta.ResourceVersion = stream.ObjectMeta.ResourceVersion + desiredStreams.ObjectMeta = stream.ObjectMeta err = c.updateStreams(desiredStreams) if err != nil { return fmt.Errorf("failed updating event stream %s: %v", stream.Name, err) @@ -390,12 +393,15 @@ func (c *Cluster) createOrUpdateStreams() error { continue } } - c.logger.Infof("event streams with applicationId %s do not exist, create it", appId) - streamCRD, err := c.createStreams(appId) - if err != nil { - return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err) + + if !streamExists { + c.logger.Infof("event streams with applicationId %s do not exist, create it", appId) + streamCRD, err := c.createStreams(appId) + if err != nil { + return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err) + } + c.logger.Infof("event streams %q have been successfully created", streamCRD.Name) } - c.logger.Infof("event streams %q have been successfully created", streamCRD.Name) } return nil diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 271942ac9..1e59e0f86 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -1,7 +1,6 @@ package cluster import ( - "encoding/json" "fmt" "reflect" "strings" @@ -167,6 +166,15 @@ var ( } ) +func TestGatherApplicationIds(t *testing.T) { + testAppIds := []string{appId} + appIds := gatherApplicationIds(pg.Spec.Streams) + + if !util.IsEqualIgnoreOrder(testAppIds, appIds) { + t.Errorf("gathered applicationIds do not match, expected %#v, got %#v", testAppIds, appIds) + } +} + func TestGenerateFabricEventStream(t *testing.T) { client, _ := newFakeK8sStreamClient() @@ -206,12 +214,18 @@ func TestGenerateFabricEventStream(t *testing.T) { t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result) } - // compare stream returned from API with expected stream listOptions := metav1.ListOptions{ LabelSelector: cluster.labelsSet(true).String(), } streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) + + // check if there is only one stream + if len(streams.Items) > 1 { + t.Errorf("too many stream CRDs found: got %d, but expected only one", len(streams.Items)) + } + + // compare stream returned from API with expected stream if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, fes.Spec.EventStreams); !match { t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streams.Items[0]) } @@ -220,9 +234,15 @@ func TestGenerateFabricEventStream(t *testing.T) { err = cluster.createOrUpdateStreams() assert.NoError(t, err) - // compare stream resturned from API with generated stream streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) + + // check if there is still only one stream + if len(streams.Items) > 1 { + t.Errorf("too many stream CRDs found after sync: got %d, but expected only one", len(streams.Items)) + } + + // compare stream resturned from API with generated stream if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streams.Items[0]) } @@ -335,31 +355,26 @@ func TestUpdateFabricEventStream(t *testing.T) { _, err = cluster.createStatefulSet() assert.NoError(t, err) + // now create the stream err = cluster.createOrUpdateStreams() assert.NoError(t, err) - var pgSpec acidv1.PostgresSpec - pgSpec.Streams = []acidv1.Stream{ - { - ApplicationId: appId, - Database: dbName, - Tables: map[string]acidv1.StreamTable{ - "data.bar": acidv1.StreamTable{ - EventType: "stream-type-c", - IdColumn: k8sutil.StringToPointer("b_id"), - PayloadColumn: k8sutil.StringToPointer("b_payload"), - }, - }, - BatchSize: k8sutil.UInt32ToPointer(uint32(250)), - }, + // change specs of streams and patch CRD + for i, stream := range pg.Spec.Streams { + if stream.ApplicationId == appId { + streamTable := stream.Tables["data.bar"] + streamTable.EventType = "stream-type-c" + stream.Tables["data.bar"] = streamTable + stream.BatchSize = k8sutil.UInt32ToPointer(uint32(250)) + pg.Spec.Streams[i] = stream + } } - patch, err := json.Marshal(struct { - PostgresSpec interface{} `json:"spec"` - }{&pgSpec}) + + patchData, err := specPatch(pg.Spec) assert.NoError(t, err) pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch( - context.TODO(), cluster.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "spec") + context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec") assert.NoError(t, err) cluster.Postgresql.Spec = pgPatched.Spec