From 8fa4b3e848d00c4449d424c9a8284b36f5e35558 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Wed, 30 Mar 2022 16:44:20 +0200 Subject: [PATCH] create cluster field to store stream application ids --- pkg/cluster/cluster.go | 1 + pkg/cluster/streams.go | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index dcef602b9..dcd5b97dd 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -91,6 +91,7 @@ type Cluster struct { currentProcess Process processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex + streamApplications []string ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects EBSVolumes map[string]volumes.VolumeProperties VolumeResizer volumes.VolumeResizer diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 8b62c86b5..0236925ca 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -46,8 +46,7 @@ func (c *Cluster) deleteStreams() error { } errors := make([]string, 0) - appIds := gatherApplicationIds(c.Spec.Streams) - for _, appId := range appIds { + for _, appId := range c.streamApplications { fesName := fmt.Sprintf("%s-%s", c.Name, appId) err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), fesName, metav1.DeleteOptions{}) if err != nil { @@ -274,6 +273,11 @@ func (c *Cluster) syncStreams() error { return nil } + // fetch different application IDs from streams section + // there will be a separate event stream resource for each ID + appIds := gatherApplicationIds(c.Spec.Streams) + c.streamApplications = appIds + slots := make(map[string]map[string]string) publications := make(map[string]map[string]acidv1.StreamTable) @@ -338,9 +342,7 @@ func (c *Cluster) syncStreams() error { } func (c *Cluster) createOrUpdateStreams() error { - - appIds := gatherApplicationIds(c.Spec.Streams) - for _, appId := range appIds { + for _, appId := range c.streamApplications { fesName := fmt.Sprintf("%s-%s", c.Name, appId) effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) if err != nil {