From e80cccb93b30c27d0bfb3eade0874208be2fa70f Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 27 Dec 2022 16:52:01 +0100 Subject: [PATCH] use random short name for stream CRDs (#2137) * use random short name for stream CRDs --- e2e/Makefile | 2 +- pkg/cluster/cluster.go | 1 - pkg/cluster/streams.go | 87 ++++++++++++++++++++++--------------- pkg/cluster/streams_test.go | 39 +++++++++-------- 4 files changed, 74 insertions(+), 55 deletions(-) diff --git a/e2e/Makefile b/e2e/Makefile index a32017192..9b1b5ea11 100644 --- a/e2e/Makefile +++ b/e2e/Makefile @@ -32,7 +32,7 @@ clean: copy: clean mkdir manifests - cp ../manifests -r . + cp -r ../manifests . docker: scm-source.json docker build -t "$(IMAGE):$(TAG)" . diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 812965854..f18cf8053 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -91,7 +91,6 @@ type Cluster struct { currentProcess Process processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex - streamApplications []string ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects EBSVolumes map[string]volumes.VolumeProperties VolumeResizer volumes.VolumeResizer diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index d911e6a83..2f403ab23 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -15,15 +15,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func (c *Cluster) createStreams(appId string) error { +func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, error) { c.setProcessName("creating streams") fes := c.generateFabricEventStream(appId) - if _, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}); err != nil { - return err + streamCRD, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) + if err != nil { + return nil, err } - return nil + return streamCRD, nil } func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) error { @@ -46,11 +47,17 @@ func (c *Cluster) deleteStreams() error { } errors := make([]string, 0) - for _, appId := range c.streamApplications { - fesName := fmt.Sprintf("%s-%s", c.Name, appId) - err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), fesName, metav1.DeleteOptions{}) + listOptions := metav1.ListOptions{ + LabelSelector: c.labelsSet(true).String(), + } + streams, err := c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions) + if err != nil { + return fmt.Errorf("could not list of FabricEventStreams: %v", err) + } + for _, stream := range streams.Items { + err = c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{}) if err != nil { - errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", fesName, err)) + errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", stream.Name, err)) } } @@ -184,8 +191,10 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent Kind: constants.EventStreamCRDKind, }, ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", c.Name, appId), + // max length for cluster name is 58 so we can only add 5 more characters / numbers + Name: fmt.Sprintf("%s-%s", c.Name, util.RandomPassword(5)), Namespace: c.Namespace, + Labels: c.labelsSet(true), Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)), // make cluster StatefulSet the owner (like with connection pooler objects) OwnerReferences: c.ownerReferences(), @@ -284,11 +293,6 @@ func (c *Cluster) syncStreams() error { return nil } - // fetch different application IDs from streams section - // there will be a separate event stream resource for each ID - appIds := gatherApplicationIds(c.Spec.Streams) - c.streamApplications = appIds - slots := make(map[string]map[string]string) slotsToSync := make(map[string]map[string]string) publications := make(map[string]map[string]acidv1.StreamTable) @@ -355,32 +359,43 @@ func (c *Cluster) syncStreams() error { } func (c *Cluster) createOrUpdateStreams() error { - for _, appId := range c.streamApplications { - fesName := fmt.Sprintf("%s-%s", c.Name, appId) - effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) - if err != nil { - if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("failed reading event stream %s: %v", fesName, err) - } - c.logger.Infof("event streams do not exist, create it") - err = c.createStreams(appId) - if err != nil { - return fmt.Errorf("failed creating event stream %s: %v", fesName, err) - } - c.logger.Infof("event stream %q has been successfully created", fesName) - } else { - desiredStreams := c.generateFabricEventStream(appId) - if match, reason := sameStreams(effectiveStreams.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match { - c.logger.Debugf("updating event streams: %s", reason) - desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion - err = c.updateStreams(desiredStreams) - if err != nil { - return fmt.Errorf("failed updating event stream %s: %v", fesName, err) + // fetch different application IDs from streams section + // there will be a separate event stream resource for each ID + appIds := gatherApplicationIds(c.Spec.Streams) + + // list all existing stream CRDs + listOptions := metav1.ListOptions{ + LabelSelector: c.labelsSet(true).String(), + } + streams, err := c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions) + if err != nil { + return fmt.Errorf("could not list of FabricEventStreams: %v", err) + } + + for _, appId := range appIds { + // update stream when it exists and EventStreams array differs + for _, stream := range streams.Items { + if appId == stream.Spec.ApplicationId { + desiredStreams := c.generateFabricEventStream(appId) + if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match { + c.logger.Debugf("updating event streams: %s", reason) + desiredStreams.ObjectMeta.ResourceVersion = stream.ObjectMeta.ResourceVersion + err = c.updateStreams(desiredStreams) + if err != nil { + return fmt.Errorf("failed updating event stream %s: %v", stream.Name, err) + } + c.logger.Infof("event stream %q has been successfully updated", stream.Name) } - c.logger.Infof("event stream %q has been successfully updated", fesName) + continue } } + c.logger.Infof("event streams with applicationId %s do not exist, create it", appId) + streamCRD, err := c.createStreams(appId) + if err != nil { + return fmt.Errorf("failed creating event streams with applicationId %s: %v", appId, err) + } + c.logger.Infof("event streams %q have been successfully created", streamCRD.Name) } return nil diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 674d2738d..271942ac9 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -41,7 +41,6 @@ var ( appId string = "test-app" dbName string = "foo" fesUser string = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix) - fesName string = fmt.Sprintf("%s-%s", clusterName, appId) slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)) pg = acidv1.Postgresql{ @@ -77,6 +76,7 @@ var ( BatchSize: k8sutil.UInt32ToPointer(uint32(100)), }, }, + TeamID: "acid", Volume: acidv1.Volume{ Size: "1Gi", }, @@ -89,7 +89,7 @@ var ( Kind: constants.EventStreamCRDKind, }, ObjectMeta: metav1.ObjectMeta{ - Name: fesName, + Name: fmt.Sprintf("%s-12345", clusterName), Namespace: namespace, OwnerReferences: []metav1.OwnerReference{ metav1.OwnerReference{ @@ -196,9 +196,6 @@ func TestGenerateFabricEventStream(t *testing.T) { _, err := cluster.createStatefulSet() assert.NoError(t, err) - // createOrUpdateStreams will loop over existing apps - cluster.streamApplications = []string{appId} - // create the streams err = cluster.createOrUpdateStreams() assert.NoError(t, err) @@ -209,11 +206,14 @@ func TestGenerateFabricEventStream(t *testing.T) { t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result) } - // compare stream resturned from API with expected stream - streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) + // compare stream returned from API with expected stream + listOptions := metav1.ListOptions{ + LabelSelector: cluster.labelsSet(true).String(), + } + streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) - if match, _ := sameStreams(streamCRD.Spec.EventStreams, fes.Spec.EventStreams); !match { - t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streamCRD) + if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, fes.Spec.EventStreams); !match { + t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streams.Items[0]) } // sync streams once again @@ -221,10 +221,10 @@ func TestGenerateFabricEventStream(t *testing.T) { assert.NoError(t, err) // compare stream resturned from API with generated stream - streamCRD, err = cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) + streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) - if match, _ := sameStreams(streamCRD.Spec.EventStreams, result.Spec.EventStreams); !match { - t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streamCRD) + if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { + t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streams.Items[0]) } } @@ -331,8 +331,9 @@ func TestUpdateFabricEventStream(t *testing.T) { context.TODO(), &pg, metav1.CreateOptions{}) assert.NoError(t, err) - // createOrUpdateStreams will loop over existing apps - cluster.streamApplications = []string{appId} + // create statefulset to have ownerReference for streams + _, err = cluster.createStatefulSet() + assert.NoError(t, err) err = cluster.createOrUpdateStreams() assert.NoError(t, err) @@ -365,11 +366,15 @@ func TestUpdateFabricEventStream(t *testing.T) { err = cluster.createOrUpdateStreams() assert.NoError(t, err) - streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) + // compare stream returned from API with expected stream + listOptions := metav1.ListOptions{ + LabelSelector: cluster.labelsSet(true).String(), + } + streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) result := cluster.generateFabricEventStream(appId) - if !reflect.DeepEqual(result, streamCRD) { - t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result) + if !reflect.DeepEqual(result.Spec.EventStreams, streams.Items[0].Spec.EventStreams) { + t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streams.Items[0], result) } }