From c5ea2989ca4c99831c41c07aafe4c4a9f66fb287 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Wed, 22 Sep 2021 16:45:06 +0200 Subject: [PATCH] check if fes CRD exists before syncing --- pkg/cluster/streams.go | 29 +++++++++++++++++++++++++---- pkg/util/constants/streams.go | 2 ++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index b6e1ef29a..173305033 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -41,9 +41,14 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStre } func (c *Cluster) deleteStreams() error { - c.setProcessName("updating event streams") + c.setProcessName("deleting event streams") - err := c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{}) + _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{}) + if k8sutil.ResourceNotFound(err) { + return nil + } + + err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("could not delete event stream custom resource: %v", err) } @@ -122,7 +127,7 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream return &zalandov1alpha1.FabricEventStream{ TypeMeta: metav1.TypeMeta{ - Kind: "FabricEventStream", + Kind: constants.EventStreamSourceCRDKind, APIVersion: "zalando.org/v1alphav1", }, ObjectMeta: metav1.ObjectMeta{ @@ -205,6 +210,22 @@ func (c *Cluster) getLogicalReplicationSlot(database string) string { func (c *Cluster) syncStreams() error { + _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{}) + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("event stream CRD not installed, skipping") + return nil + } + + err = c.createOrUpdateStreams() + if err != nil { + return err + } + + return nil +} + +func (c *Cluster) createOrUpdateStreams() error { + c.setProcessName("syncing streams") err := c.syncPostgresConfig() @@ -212,7 +233,7 @@ func (c *Cluster) syncStreams() error { return fmt.Errorf("could not update Postgres config for event streaming: %v", err) } - effectiveStreams, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Get(context.TODO(), c.Name, metav1.GetOptions{}) + effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), c.Name, metav1.GetOptions{}) if err != nil { if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("error during reading of event streams: %v", err) diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go index 25f72d85a..82d61c5c2 100644 --- a/pkg/util/constants/streams.go +++ b/pkg/util/constants/streams.go @@ -2,6 +2,8 @@ package constants // PostgreSQL specific constants const ( + EventStreamSourceCRDKind = "FabricEventStream" + EventStreamSourceCRDName = "fabriceventstreams.zalando.org" EventStreamSourcePGType = "PostgresLogicalReplication" EventStreamSourceSlotPrefix = "fes" EventStreamSourceAuthType = "DatabaseAuthenticationSecret"