check if fes CRD exists before syncing
This commit is contained in:
parent
223ffa75ef
commit
c5ea2989ca
|
|
@ -41,9 +41,14 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStre
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) deleteStreams() error {
|
func (c *Cluster) deleteStreams() error {
|
||||||
c.setProcessName("updating event streams")
|
c.setProcessName("deleting event streams")
|
||||||
|
|
||||||
err := c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{})
|
_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{})
|
||||||
|
if k8sutil.ResourceNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not delete event stream custom resource: %v", err)
|
return fmt.Errorf("could not delete event stream custom resource: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -122,7 +127,7 @@ func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream
|
||||||
|
|
||||||
return &zalandov1alpha1.FabricEventStream{
|
return &zalandov1alpha1.FabricEventStream{
|
||||||
TypeMeta: metav1.TypeMeta{
|
TypeMeta: metav1.TypeMeta{
|
||||||
Kind: "FabricEventStream",
|
Kind: constants.EventStreamSourceCRDKind,
|
||||||
APIVersion: "zalando.org/v1alphav1",
|
APIVersion: "zalando.org/v1alphav1",
|
||||||
},
|
},
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
|
@ -205,6 +210,22 @@ func (c *Cluster) getLogicalReplicationSlot(database string) string {
|
||||||
|
|
||||||
func (c *Cluster) syncStreams() error {
|
func (c *Cluster) syncStreams() error {
|
||||||
|
|
||||||
|
_, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamSourceCRDName, metav1.GetOptions{})
|
||||||
|
if k8sutil.ResourceNotFound(err) {
|
||||||
|
c.logger.Debugf("event stream CRD not installed, skipping")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.createOrUpdateStreams()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) createOrUpdateStreams() error {
|
||||||
|
|
||||||
c.setProcessName("syncing streams")
|
c.setProcessName("syncing streams")
|
||||||
|
|
||||||
err := c.syncPostgresConfig()
|
err := c.syncPostgresConfig()
|
||||||
|
|
@ -212,7 +233,7 @@ func (c *Cluster) syncStreams() error {
|
||||||
return fmt.Errorf("could not update Postgres config for event streaming: %v", err)
|
return fmt.Errorf("could not update Postgres config for event streaming: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
effectiveStreams, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Get(context.TODO(), c.Name, metav1.GetOptions{})
|
effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), c.Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !k8sutil.ResourceNotFound(err) {
|
if !k8sutil.ResourceNotFound(err) {
|
||||||
return fmt.Errorf("error during reading of event streams: %v", err)
|
return fmt.Errorf("error during reading of event streams: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,8 @@ package constants
|
||||||
|
|
||||||
// PostgreSQL specific constants
|
// PostgreSQL specific constants
|
||||||
const (
|
const (
|
||||||
|
EventStreamSourceCRDKind = "FabricEventStream"
|
||||||
|
EventStreamSourceCRDName = "fabriceventstreams.zalando.org"
|
||||||
EventStreamSourcePGType = "PostgresLogicalReplication"
|
EventStreamSourcePGType = "PostgresLogicalReplication"
|
||||||
EventStreamSourceSlotPrefix = "fes"
|
EventStreamSourceSlotPrefix = "fes"
|
||||||
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
|
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue