Enable slot and publication deletion when stream application is removed (#2684)

* refactor syncing publication section
* update createOrUpdateStream function to allow resource deletion when removed from manifest
* add minimal FES CRD to enable FES resources creation for E2E test
* fix bug of removing manifest slots in syncStream
* e2e test: fixing typo with major upgrade test
* e2e test: should create and delete FES resource
* e2e test: should not delete manual created resources
* e2e test: enable cluster role for FES with patching instead of deploying in manifest
This commit is contained in:
Ida Novindasari 2024-07-25 12:00:23 +02:00 committed by GitHub
parent 73f72414f6
commit 31f474a95c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 264 additions and 46 deletions

View File

@ -20,6 +20,7 @@ class K8sApi:
self.config = config.load_kube_config()
self.k8s_client = client.ApiClient()
self.rbac_api = client.RbacAuthorizationV1Api()
self.core_v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()

View File

@ -129,7 +129,8 @@ class EndToEndTestCase(unittest.TestCase):
"infrastructure-roles.yaml",
"infrastructure-roles-new.yaml",
"custom-team-membership.yaml",
"e2e-storage-class.yaml"]:
"e2e-storage-class.yaml",
"fes.crd.yaml"]:
result = k8s.create_with_kubectl("manifests/" + filename)
print("stdout: {}, stderr: {}".format(result.stdout, result.stderr))
@ -199,6 +200,7 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", owner_query)), 3,
"Not all additional users found in database", 10, 5)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_additional_pod_capabilities(self):
'''
@ -1203,7 +1205,7 @@ class EndToEndTestCase(unittest.TestCase):
version = p["server_version"][0:2]
return version
self.evantuallyEqual(check_version_14, "14", "Version was not upgrade to 14")
self.eventuallyEqual(check_version_14, "14", "Version was not upgrade to 14")
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_persistent_volume_claim_retention_policy(self):
@ -1989,6 +1991,123 @@ class EndToEndTestCase(unittest.TestCase):
"acid.zalan.do", "v1", "default", "postgresqls", "acid-standby-cluster")
time.sleep(5)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_stream_resources(self):
'''
Create and delete fabric event streaming resources.
'''
k8s = self.k8s
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"},
"Operator does not get in sync")
leader = k8s.get_cluster_leader_pod()
# patch ClusterRole with CRUD privileges on FES resources
cluster_role = k8s.api.rbac_api.read_cluster_role("postgres-operator")
fes_cluster_role_rule = client.V1PolicyRule(
api_groups=["zalando.org"],
resources=["fabriceventstreams"],
verbs=["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"]
)
cluster_role.rules.append(fes_cluster_role_rule)
k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role)
# create a table in one of the database of acid-minimal-cluster
create_stream_table = """
CREATE TABLE test_table (id int, payload jsonb);
"""
self.query_database(leader.metadata.name, "foo", create_stream_table)
# update the manifest with the streams section
patch_streaming_config = {
"spec": {
"patroni": {
"slots": {
"manual_slot": {
"type": "physical"
}
}
},
"streams": [
{
"applicationId": "test-app",
"batchSize": 100,
"database": "foo",
"enableRecovery": True,
"tables": {
"test_table": {
"eventType": "test-event",
"idColumn": "id",
"payloadColumn": "payload",
"recoveryEventType": "test-event-dlq"
}
}
}
]
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# check if publication, slot, and fes resource are created
get_publication_query = """
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app';
"""
get_slot_query = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1,
"Publication is not created", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1,
"Replication slot is not created", 10, 5)
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1,
"Could not find Fabric Event Stream resource", 10, 5)
# grant create and ownership of test_table to foo_user, reset search path to default
grant_permission_foo_user = """
GRANT CREATE ON DATABASE foo TO foo_user;
ALTER TABLE test_table OWNER TO foo_user;
ALTER ROLE foo_user RESET search_path;
"""
self.query_database(leader.metadata.name, "foo", grant_permission_foo_user)
# non-postgres user creates a publication
create_nonstream_publication = """
CREATE PUBLICATION mypublication FOR TABLE test_table;
"""
self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user")
# remove the streams section from the manifest
patch_streaming_config_removal = {
"spec": {
"streams": []
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# check if publication, slot, and fes resource are removed
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0,
'Could not delete Fabric Event Stream resource', 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0,
"Publication is not deleted", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0,
"Replication slot is not deleted", 10, 5)
# check the manual_slot and mypublication should not get deleted
get_manual_slot_query = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot';
"""
get_nonstream_publication_query = """
SELECT * FROM pg_publication WHERE pubname = 'mypublication';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1,
"Slot defined in patroni config is deleted", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1,
"Publication defined not in stream section is deleted", 10, 5)
@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_taint_based_eviction(self):
'''
@ -2115,7 +2234,7 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted")
self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted")
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 7, "Secrets were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config")
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config")
except timeout_decorator.TimeoutError:

23
manifests/fes.crd.yaml Normal file
View File

@ -0,0 +1,23 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: fabriceventstreams.zalando.org
spec:
group: zalando.org
names:
kind: FabricEventStream
listKind: FabricEventStreamList
plural: fabriceventstreams
singular: fabriceventstream
shortNames:
- fes
categories:
- all
scope: Namespaced
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object

View File

@ -1,6 +1,7 @@
package v1
import (
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -89,3 +90,8 @@ type DBAuth struct {
UserKey string `json:"userKey,omitempty"`
PasswordKey string `json:"passwordKey,omitempty"`
}
type Slot struct {
Slot map[string]string `json:"slot"`
Publication map[string]acidv1.StreamTable `json:"publication"`
}

View File

@ -49,9 +49,12 @@ const (
getPublicationsSQL = `SELECT p.pubname, string_agg(pt.schemaname || '.' || pt.tablename, ', ' ORDER BY pt.schemaname, pt.tablename)
FROM pg_publication p
LEFT JOIN pg_publication_tables pt ON pt.pubname = p.pubname
WHERE p.pubowner = 'postgres'::regrole
AND p.pubname LIKE 'fes_%'
GROUP BY p.pubname;`
createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');`
alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;`
dropPublicationSQL = `DROP PUBLICATION "%s";`
globalDefaultPrivilegesSQL = `SET ROLE TO "%s";
ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s";
@ -628,6 +631,14 @@ func (c *Cluster) getPublications() (publications map[string]string, err error)
return dbPublications, err
}
func (c *Cluster) executeDropPublication(pubName string) error {
c.logger.Infof("dropping publication %q", pubName)
if _, err := c.pgDb.Exec(fmt.Sprintf(dropPublicationSQL, pubName)); err != nil {
return fmt.Errorf("could not execute drop publication: %v", err)
}
return nil
}
// executeCreatePublication creates new publication for given tables
// The caller is responsible for opening and closing the database connection.
func (c *Cluster) executeCreatePublication(pubName, tableList string) error {

View File

@ -43,6 +43,16 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) er
return nil
}
func (c *Cluster) deleteStream(stream *zalandov1.FabricEventStream) error {
c.setProcessName("deleting event stream")
err := c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("could not delete event stream %q: %v", stream.Name, err)
}
return nil
}
func (c *Cluster) deleteStreams() error {
c.setProcessName("deleting event streams")
@ -61,7 +71,7 @@ func (c *Cluster) deleteStreams() error {
return fmt.Errorf("could not list of FabricEventStreams: %v", err)
}
for _, stream := range streams.Items {
err = c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{})
err := c.deleteStream(&stream)
if err != nil {
errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", stream.Name, err))
}
@ -85,9 +95,10 @@ func gatherApplicationIds(streams []acidv1.Stream) []string {
return appIds
}
func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error {
func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]zalandov1.Slot, slotsToSync *map[string]map[string]string) error {
createPublications := make(map[string]string)
alterPublications := make(map[string]string)
deletePublications := []string{}
defer func() {
if err := c.closeDbConn(); err != nil {
@ -97,7 +108,7 @@ func (c *Cluster) syncPublication(publication, dbName string, tables map[string]
// check for existing publications
if err := c.initDbConnWithName(dbName); err != nil {
return fmt.Errorf("could not init database connection")
return fmt.Errorf("could not init database connection: %v", err)
}
currentPublications, err := c.getPublications()
@ -105,24 +116,35 @@ func (c *Cluster) syncPublication(publication, dbName string, tables map[string]
return fmt.Errorf("could not get current publications: %v", err)
}
tableNames := make([]string, len(tables))
i := 0
for t := range tables {
tableName, schemaName := getTableSchema(t)
tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
i++
}
sort.Strings(tableNames)
tableList := strings.Join(tableNames, ", ")
for slotName, slotAndPublication := range databaseSlotsList {
tables := slotAndPublication.Publication
tableNames := make([]string, len(tables))
i := 0
for t := range tables {
tableName, schemaName := getTableSchema(t)
tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName)
i++
}
sort.Strings(tableNames)
tableList := strings.Join(tableNames, ", ")
currentTables, exists := currentPublications[publication]
if !exists {
createPublications[publication] = tableList
} else if currentTables != tableList {
alterPublications[publication] = tableList
currentTables, exists := currentPublications[slotName]
if !exists {
createPublications[slotName] = tableList
} else if currentTables != tableList {
alterPublications[slotName] = tableList
}
(*slotsToSync)[slotName] = slotAndPublication.Slot
}
if len(createPublications)+len(alterPublications) == 0 {
// check if there is any deletion
for slotName, _ := range currentPublications {
if _, exists := databaseSlotsList[slotName]; !exists {
deletePublications = append(deletePublications, slotName)
}
}
if len(createPublications)+len(alterPublications)+len(deletePublications) == 0 {
return nil
}
@ -136,6 +158,12 @@ func (c *Cluster) syncPublication(publication, dbName string, tables map[string]
return fmt.Errorf("update of publication %q failed: %v", publicationName, err)
}
}
for _, publicationName := range deletePublications {
(*slotsToSync)[publicationName] = nil
if err = c.executeDropPublication(publicationName); err != nil {
return fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
}
}
return nil
}
@ -279,56 +307,73 @@ func (c *Cluster) syncStreams() error {
return nil
}
slots := make(map[string]map[string]string)
databaseSlots := make(map[string]map[string]zalandov1.Slot)
slotsToSync := make(map[string]map[string]string)
publications := make(map[string]map[string]acidv1.StreamTable)
requiredPatroniConfig := c.Spec.Patroni
if len(requiredPatroniConfig.Slots) > 0 {
slots = requiredPatroniConfig.Slots
for slotName, slotConfig := range requiredPatroniConfig.Slots {
slotsToSync[slotName] = slotConfig
}
}
// gather list of required slots and publications
if err := c.initDbConn(); err != nil {
return fmt.Errorf("could not init database connection")
}
defer func() {
if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection: %v", err)
}
}()
listDatabases, err := c.getDatabases()
if err != nil {
return fmt.Errorf("could not get list of databases: %v", err)
}
// get database name with empty list of slot, except template0 and template1
for dbName, _ := range listDatabases {
if dbName != "template0" && dbName != "template1" {
databaseSlots[dbName] = map[string]zalandov1.Slot{}
}
}
// gather list of required slots and publications, group by database
for _, stream := range c.Spec.Streams {
if _, exists := databaseSlots[stream.Database]; !exists {
c.logger.Warningf("database %q does not exist in the cluster", stream.Database)
continue
}
slot := map[string]string{
"database": stream.Database,
"plugin": constants.EventStreamSourcePluginType,
"type": "logical",
}
slotName := getSlotName(stream.Database, stream.ApplicationId)
if _, exists := slots[slotName]; !exists {
slots[slotName] = slot
publications[slotName] = stream.Tables
if _, exists := databaseSlots[stream.Database][slotName]; !exists {
databaseSlots[stream.Database][slotName] = zalandov1.Slot{
Slot: slot,
Publication: stream.Tables,
}
} else {
streamTables := publications[slotName]
slotAndPublication := databaseSlots[stream.Database][slotName]
streamTables := slotAndPublication.Publication
for tableName, table := range stream.Tables {
if _, exists := streamTables[tableName]; !exists {
streamTables[tableName] = table
}
}
publications[slotName] = streamTables
slotAndPublication.Publication = streamTables
databaseSlots[stream.Database][slotName] = slotAndPublication
}
}
// create publications to each created slot
// sync publication in a database
c.logger.Debug("syncing database publications")
for publication, tables := range publications {
// but first check for existing publications
dbName := slots[publication]["database"]
err = c.syncPublication(publication, dbName, tables)
for dbName, databaseSlotsList := range databaseSlots {
err := c.syncPublication(dbName, databaseSlotsList, &slotsToSync)
if err != nil {
c.logger.Warningf("could not sync publication %q in database %q: %v", publication, dbName, err)
c.logger.Warningf("could not sync publications in database %q: %v", dbName, err)
continue
}
slotsToSync[publication] = slots[publication]
}
// no slots to sync = no streams defined or publications created
if len(slotsToSync) > 0 {
requiredPatroniConfig.Slots = slotsToSync
} else {
// try to delete existing stream resources
return c.deleteStreams()
}
c.logger.Debug("syncing logical replication slots")
@ -338,6 +383,7 @@ func (c *Cluster) syncStreams() error {
}
// sync logical replication slots in Patroni config
requiredPatroniConfig.Slots = slotsToSync
configPatched, _, _, err := c.syncPatroniConfig(pods, requiredPatroniConfig, nil)
if err != nil {
c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err)
@ -398,6 +444,18 @@ func (c *Cluster) createOrUpdateStreams() error {
}
}
// check if there is any deletion
for _, stream := range streams.Items {
if !util.SliceContains(appIds, stream.Spec.ApplicationId) {
c.logger.Infof("event streams with applicationId %s do not exist in the manifest, delete it", stream.Spec.ApplicationId)
err := c.deleteStream(&stream)
if err != nil {
return fmt.Errorf("failed deleting event streams with applicationId %s: %v", stream.Spec.ApplicationId, err)
}
c.logger.Infof("event streams %q have been successfully deleted", stream.Name)
}
}
return nil
}

View File

@ -466,7 +466,7 @@ func TestUpdateFabricEventStream(t *testing.T) {
assert.NoError(t, err)
cluster.Postgresql.Spec = pgUpdated.Spec
cluster.syncStreams()
cluster.createOrUpdateStreams()
streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
if len(streamList.Items) > 0 || err != nil {