From 2726492fd30eb11b4cfb961442b52f75f4a18947 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 23 Aug 2021 15:14:11 +0200 Subject: [PATCH] add delete case and fix updating streams + update unit test --- pkg/cluster/cluster.go | 4 ++ pkg/cluster/database.go | 38 ------------------ pkg/cluster/streams.go | 37 ++++++----------- pkg/cluster/streams_test.go | 79 ++++++++++++++++++++++++++++++++++--- 4 files changed, 89 insertions(+), 69 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 977c3c4e5..609066767 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -899,6 +899,10 @@ func (c *Cluster) Delete() { defer c.mu.Unlock() c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources") + if err := c.deleteStreams(); err != nil { + c.logger.Warningf("could not delete event streams: %v", err) + } + // delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods // deleting the cron job also removes pods and batch jobs it created if err := c.deleteLogicalBackupJob(); err != nil { diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 6e0a8ae30..ba4cf223a 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -33,8 +33,6 @@ const ( getExtensionsSQL = `SELECT e.extname, n.nspname FROM pg_catalog.pg_extension e LEFT JOIN pg_catalog.pg_namespace n ON n.oid = e.extnamespace ORDER BY 1;` - tableExistsSQL = `SELECT TRUE FROM pg_tables WHERE tablename = $1 AND schemaname = $2;` - createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` createDatabaseSchemaSQL = `SET ROLE TO "%s"; CREATE SCHEMA IF NOT EXISTS "%s" AUTHORIZATION "%s"` alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` @@ -508,42 +506,6 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi return nil } -// getExtension returns the list of current database extensions -// The caller is responsible for opening and closing the database connection -func (c *Cluster) tableExists(tableName, schemaName string) (bool, error) { - var ( - rows *sql.Rows - exists bool - err error - ) - - if rows, err = c.pgDb.Query(tableExistsSQL, tableName, schemaName); err != nil { - return false, fmt.Errorf("could not check table for existence: %v", err) - } - - defer func() { - if err2 := rows.Close(); err2 != nil { - if err != nil { - err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err) - } else { - err = fmt.Errorf("error when closing query cursor: %v", err2) - } - } - }() - - for rows.Next() { - if err = rows.Scan(&exists); err != nil { - return false, fmt.Errorf("error when processing row: %v", err) - } - } - - if exists { - return true, nil - } else { - return false, fmt.Errorf("table %s not found", schemaName+"."+tableName) - } -} - // Creates a connection pool credentials lookup function in every database to // perform remote authentication. func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role PostgresRole) error { diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index d9cc19e1f..99a4513d6 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -40,6 +40,17 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStre return nil } +func (c *Cluster) deleteStreams() error { + c.setProcessName("updating event streams") + + err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("could not delete event stream custom resource: %v", err) + } + + return nil +} + func (c *Cluster) syncPostgresConfig() error { desiredPostgresConfig := make(map[string]interface{}) @@ -90,29 +101,6 @@ func (c *Cluster) syncPostgresConfig() error { return nil } -func (c *Cluster) syncStreamDbResources() error { - - for _, stream := range c.Spec.Streams { - if err := c.initDbConnWithName(stream.Database); err != nil { - return fmt.Errorf("could not init connection to database %q specified for event stream: %v", stream.Database, err) - } - - for table, eventType := range stream.Tables { - tableName, schemaName := getTableSchema(table) - if exists, err := c.tableExists(tableName, schemaName); !exists { - return fmt.Errorf("could not find table %q specified for event stream: %v", table, err) - } - // check if outbox table exists and if not, create it - outboxTable := outboxTableNameTemplate.Format("table", tableName, "eventtype", eventType) - if exists, err := c.tableExists(outboxTable, schemaName); !exists { - return fmt.Errorf("could not find outbox table %q specified for event stream: %v", outboxTable, err) - } - } - } - - return nil -} - func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream { eventStreams := make([]zalandov1alpha1.EventStream, 0) @@ -251,12 +239,11 @@ func (c *Cluster) syncStreams() error { return fmt.Errorf("could not create missing streams: %v", err) } } else { - err := c.syncStreamDbResources() if err != nil { c.logger.Warnf("database setup might be incomplete : %v", err) } desiredStreams := c.generateFabricEventStream() - if reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { + if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { c.updateStreams(desiredStreams) } } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 5ba6a5147..2db9c149b 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -1,6 +1,7 @@ package cluster import ( + "encoding/json" "reflect" "context" @@ -13,6 +14,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util/k8sutil" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" ) @@ -22,16 +24,19 @@ func newFakeK8sStreamClient() (k8sutil.KubernetesClient, *fake.Clientset) { return k8sutil.KubernetesClient{ FabricEventStreamsGetter: zalandoClientSet.ZalandoV1alpha1(), + PostgresqlsGetter: zalandoClientSet.AcidV1(), PodsGetter: clientSet.CoreV1(), }, clientSet } -func TestGenerateFabricEventStream(t *testing.T) { - client, _ := newFakeK8sStreamClient() - clusterName := "acid-test-cluster" - namespace := "default" - - pg := acidv1.Postgresql{ +var ( + clusterName string = "acid-test-cluster" + namespace string = "default" + pg = acidv1.Postgresql{ + TypeMeta: metav1.TypeMeta{ + Kind: "Postgresql", + APIVersion: "acid.zalan.do/v1", + }, ObjectMeta: metav1.ObjectMeta{ Name: clusterName, Namespace: namespace, @@ -72,6 +77,10 @@ func TestGenerateFabricEventStream(t *testing.T) { }, }, } +) + +func TestGenerateFabricEventStream(t *testing.T) { + client, _ := newFakeK8sStreamClient() var cluster = New( Config{ @@ -100,3 +109,61 @@ func TestGenerateFabricEventStream(t *testing.T) { t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result) } } + +func TestUpdateFabricEventStream(t *testing.T) { + client, _ := newFakeK8sStreamClient() + + var cluster = New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + PodRoleLabel: "spilo-role", + }, + }, + }, client, pg, logger, eventRecorder) + + _, err := cluster.KubeClient.Postgresqls(namespace).Create( + context.TODO(), &pg, metav1.CreateOptions{}) + assert.NoError(t, err) + err = cluster.syncStreams() + assert.NoError(t, err) + + var pgSpec acidv1.PostgresSpec + pgSpec.Streams = []acidv1.Stream{ + { + StreamType: "nakadi", + Database: "foo", + Tables: map[string]string{ + "bar": "stream_type_b", + }, + BatchSize: uint32(250), + }, + } + patch, err := json.Marshal(struct { + PostgresSpec interface{} `json:"spec"` + }{&pgSpec}) + assert.NoError(t, err) + + pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch( + context.TODO(), cluster.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "spec") + assert.NoError(t, err) + + cluster.Postgresql.Spec = pgPatched.Spec + err = cluster.syncStreams() + assert.NoError(t, err) + + streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), cluster.Name, metav1.GetOptions{}) + assert.NoError(t, err) + + result := cluster.generateFabricEventStream() + if !reflect.DeepEqual(result, streamCRD) { + t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result) + } +}