add delete case and fix updating streams + update unit test

This commit is contained in:
Felix Kunde 2021-08-23 15:14:11 +02:00
parent 6120ef7da4
commit 2726492fd3
4 changed files with 89 additions and 69 deletions

View File

@ -899,6 +899,10 @@ func (c *Cluster) Delete() {
defer c.mu.Unlock()
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources")
if err := c.deleteStreams(); err != nil {
c.logger.Warningf("could not delete event streams: %v", err)
}
// delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods
// deleting the cron job also removes pods and batch jobs it created
if err := c.deleteLogicalBackupJob(); err != nil {

View File

@ -33,8 +33,6 @@ const (
getExtensionsSQL = `SELECT e.extname, n.nspname FROM pg_catalog.pg_extension e
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = e.extnamespace ORDER BY 1;`
tableExistsSQL = `SELECT TRUE FROM pg_tables WHERE tablename = $1 AND schemaname = $2;`
createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";`
createDatabaseSchemaSQL = `SET ROLE TO "%s"; CREATE SCHEMA IF NOT EXISTS "%s" AUTHORIZATION "%s"`
alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";`
@ -508,42 +506,6 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi
return nil
}
// getExtension returns the list of current database extensions
// The caller is responsible for opening and closing the database connection
func (c *Cluster) tableExists(tableName, schemaName string) (bool, error) {
var (
rows *sql.Rows
exists bool
err error
)
if rows, err = c.pgDb.Query(tableExistsSQL, tableName, schemaName); err != nil {
return false, fmt.Errorf("could not check table for existence: %v", err)
}
defer func() {
if err2 := rows.Close(); err2 != nil {
if err != nil {
err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err)
} else {
err = fmt.Errorf("error when closing query cursor: %v", err2)
}
}
}()
for rows.Next() {
if err = rows.Scan(&exists); err != nil {
return false, fmt.Errorf("error when processing row: %v", err)
}
}
if exists {
return true, nil
} else {
return false, fmt.Errorf("table %s not found", schemaName+"."+tableName)
}
}
// Creates a connection pool credentials lookup function in every database to
// perform remote authentication.
func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role PostgresRole) error {

View File

@ -40,6 +40,17 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStre
return nil
}
func (c *Cluster) deleteStreams() error {
c.setProcessName("updating event streams")
err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("could not delete event stream custom resource: %v", err)
}
return nil
}
func (c *Cluster) syncPostgresConfig() error {
desiredPostgresConfig := make(map[string]interface{})
@ -90,29 +101,6 @@ func (c *Cluster) syncPostgresConfig() error {
return nil
}
func (c *Cluster) syncStreamDbResources() error {
for _, stream := range c.Spec.Streams {
if err := c.initDbConnWithName(stream.Database); err != nil {
return fmt.Errorf("could not init connection to database %q specified for event stream: %v", stream.Database, err)
}
for table, eventType := range stream.Tables {
tableName, schemaName := getTableSchema(table)
if exists, err := c.tableExists(tableName, schemaName); !exists {
return fmt.Errorf("could not find table %q specified for event stream: %v", table, err)
}
// check if outbox table exists and if not, create it
outboxTable := outboxTableNameTemplate.Format("table", tableName, "eventtype", eventType)
if exists, err := c.tableExists(outboxTable, schemaName); !exists {
return fmt.Errorf("could not find outbox table %q specified for event stream: %v", outboxTable, err)
}
}
}
return nil
}
func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream {
eventStreams := make([]zalandov1alpha1.EventStream, 0)
@ -251,12 +239,11 @@ func (c *Cluster) syncStreams() error {
return fmt.Errorf("could not create missing streams: %v", err)
}
} else {
err := c.syncStreamDbResources()
if err != nil {
c.logger.Warnf("database setup might be incomplete : %v", err)
}
desiredStreams := c.generateFabricEventStream()
if reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) {
if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) {
c.updateStreams(desiredStreams)
}
}

View File

@ -1,6 +1,7 @@
package cluster
import (
"encoding/json"
"reflect"
"context"
@ -13,6 +14,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
)
@ -22,16 +24,19 @@ func newFakeK8sStreamClient() (k8sutil.KubernetesClient, *fake.Clientset) {
return k8sutil.KubernetesClient{
FabricEventStreamsGetter: zalandoClientSet.ZalandoV1alpha1(),
PostgresqlsGetter: zalandoClientSet.AcidV1(),
PodsGetter: clientSet.CoreV1(),
}, clientSet
}
func TestGenerateFabricEventStream(t *testing.T) {
client, _ := newFakeK8sStreamClient()
clusterName := "acid-test-cluster"
namespace := "default"
pg := acidv1.Postgresql{
var (
clusterName string = "acid-test-cluster"
namespace string = "default"
pg = acidv1.Postgresql{
TypeMeta: metav1.TypeMeta{
Kind: "Postgresql",
APIVersion: "acid.zalan.do/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
@ -72,6 +77,10 @@ func TestGenerateFabricEventStream(t *testing.T) {
},
},
}
)
func TestGenerateFabricEventStream(t *testing.T) {
client, _ := newFakeK8sStreamClient()
var cluster = New(
Config{
@ -100,3 +109,61 @@ func TestGenerateFabricEventStream(t *testing.T) {
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result)
}
}
func TestUpdateFabricEventStream(t *testing.T) {
client, _ := newFakeK8sStreamClient()
var cluster = New(
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
},
},
}, client, pg, logger, eventRecorder)
_, err := cluster.KubeClient.Postgresqls(namespace).Create(
context.TODO(), &pg, metav1.CreateOptions{})
assert.NoError(t, err)
err = cluster.syncStreams()
assert.NoError(t, err)
var pgSpec acidv1.PostgresSpec
pgSpec.Streams = []acidv1.Stream{
{
StreamType: "nakadi",
Database: "foo",
Tables: map[string]string{
"bar": "stream_type_b",
},
BatchSize: uint32(250),
},
}
patch, err := json.Marshal(struct {
PostgresSpec interface{} `json:"spec"`
}{&pgSpec})
assert.NoError(t, err)
pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch(
context.TODO(), cluster.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "spec")
assert.NoError(t, err)
cluster.Postgresql.Spec = pgPatched.Spec
err = cluster.syncStreams()
assert.NoError(t, err)
streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), cluster.Name, metav1.GetOptions{})
assert.NoError(t, err)
result := cluster.generateFabricEventStream()
if !reflect.DeepEqual(result, streamCRD) {
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result)
}
}