create cluster field to store stream application ids

This commit is contained in:
Felix Kunde 2022-03-30 16:44:20 +02:00
parent 26c882f0fa
commit 8fa4b3e848
2 changed files with 8 additions and 5 deletions

View File

@ -91,6 +91,7 @@ type Cluster struct {
currentProcess Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
streamApplications []string
ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
EBSVolumes map[string]volumes.VolumeProperties
VolumeResizer volumes.VolumeResizer

View File

@ -46,8 +46,7 @@ func (c *Cluster) deleteStreams() error {
}
errors := make([]string, 0)
appIds := gatherApplicationIds(c.Spec.Streams)
for _, appId := range appIds {
for _, appId := range c.streamApplications {
fesName := fmt.Sprintf("%s-%s", c.Name, appId)
err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), fesName, metav1.DeleteOptions{})
if err != nil {
@ -274,6 +273,11 @@ func (c *Cluster) syncStreams() error {
return nil
}
// fetch different application IDs from streams section
// there will be a separate event stream resource for each ID
appIds := gatherApplicationIds(c.Spec.Streams)
c.streamApplications = appIds
slots := make(map[string]map[string]string)
publications := make(map[string]map[string]acidv1.StreamTable)
@ -338,9 +342,7 @@ func (c *Cluster) syncStreams() error {
}
func (c *Cluster) createOrUpdateStreams() error {
appIds := gatherApplicationIds(c.Spec.Streams)
for _, appId := range appIds {
for _, appId := range c.streamApplications {
fesName := fmt.Sprintf("%s-%s", c.Name, appId)
effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
if err != nil {