remove stream resources after drop from Postgres manifest (#2563)
* remove stream resources after drop from Postgres manifest
This commit is contained in:
		
							parent
							
								
									7cdc23fff0
								
							
						
					
					
						commit
						37d6993439
					
				|  | @ -1082,7 +1082,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// streams
 | 	// streams
 | ||||||
| 	if len(newSpec.Spec.Streams) > 0 { | 	if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) { | ||||||
| 		if err := c.syncStreams(); err != nil { | 		if err := c.syncStreams(); err != nil { | ||||||
| 			c.logger.Errorf("could not sync streams: %v", err) | 			c.logger.Errorf("could not sync streams: %v", err) | ||||||
| 			updateFailed = true | 			updateFailed = true | ||||||
|  |  | ||||||
|  | @ -327,7 +327,8 @@ func (c *Cluster) syncStreams() error { | ||||||
| 	if len(slotsToSync) > 0 { | 	if len(slotsToSync) > 0 { | ||||||
| 		requiredPatroniConfig.Slots = slotsToSync | 		requiredPatroniConfig.Slots = slotsToSync | ||||||
| 	} else { | 	} else { | ||||||
| 		return nil | 		// try to delete existing stream resources
 | ||||||
|  | 		return c.deleteStreams() | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	c.logger.Debug("syncing logical replication slots") | 	c.logger.Debug("syncing logical replication slots") | ||||||
|  |  | ||||||
|  | @ -455,4 +455,21 @@ func TestUpdateFabricEventStream(t *testing.T) { | ||||||
| 	if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { | 	if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { | ||||||
| 		t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result) | 		t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	mockClient := k8sutil.NewMockKubernetesClient() | ||||||
|  | 	cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter | ||||||
|  | 
 | ||||||
|  | 	// remove streams from manifest
 | ||||||
|  | 	pgPatched.Spec.Streams = nil | ||||||
|  | 	pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update( | ||||||
|  | 		context.TODO(), pgPatched, metav1.UpdateOptions{}) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 	cluster.Postgresql.Spec = pgUpdated.Spec | ||||||
|  | 	cluster.syncStreams() | ||||||
|  | 
 | ||||||
|  | 	streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) | ||||||
|  | 	if len(streamList.Items) > 0 || err != nil { | ||||||
|  | 		t.Errorf("stream resource has not been removed or unexpected error %v", err) | ||||||
|  | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -16,8 +16,9 @@ import ( | ||||||
| 	"github.com/zalando/postgres-operator/pkg/spec" | 	"github.com/zalando/postgres-operator/pkg/spec" | ||||||
| 	apiappsv1 "k8s.io/api/apps/v1" | 	apiappsv1 "k8s.io/api/apps/v1" | ||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
|  | 	apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" | ||||||
| 	apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" | 	apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" | ||||||
| 	apiextv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" | 	apiextv1client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" | ||||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
|  | @ -59,7 +60,7 @@ type KubernetesClient struct { | ||||||
| 	appsv1.DeploymentsGetter | 	appsv1.DeploymentsGetter | ||||||
| 	rbacv1.RoleBindingsGetter | 	rbacv1.RoleBindingsGetter | ||||||
| 	policyv1.PodDisruptionBudgetsGetter | 	policyv1.PodDisruptionBudgetsGetter | ||||||
| 	apiextv1.CustomResourceDefinitionsGetter | 	apiextv1client.CustomResourceDefinitionsGetter | ||||||
| 	clientbatchv1.CronJobsGetter | 	clientbatchv1.CronJobsGetter | ||||||
| 	acidv1.OperatorConfigurationsGetter | 	acidv1.OperatorConfigurationsGetter | ||||||
| 	acidv1.PostgresTeamsGetter | 	acidv1.PostgresTeamsGetter | ||||||
|  | @ -71,6 +72,13 @@ type KubernetesClient struct { | ||||||
| 	Zalandov1ClientSet *zalandoclient.Clientset | 	Zalandov1ClientSet *zalandoclient.Clientset | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | type mockCustomResourceDefinition struct { | ||||||
|  | 	apiextv1client.CustomResourceDefinitionInterface | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type MockCustomResourceDefinitionsGetter struct { | ||||||
|  | } | ||||||
|  | 
 | ||||||
| type mockSecret struct { | type mockSecret struct { | ||||||
| 	corev1.SecretInterface | 	corev1.SecretInterface | ||||||
| } | } | ||||||
|  | @ -240,6 +248,18 @@ func (client *KubernetesClient) SetFinalizer(clusterName spec.NamespacedName, pg | ||||||
| 	return updatedPg, nil | 	return updatedPg, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *mockCustomResourceDefinition) Get(ctx context.Context, name string, options metav1.GetOptions) (*apiextv1.CustomResourceDefinition, error) { | ||||||
|  | 	return &apiextv1.CustomResourceDefinition{}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *mockCustomResourceDefinition) Create(ctx context.Context, crd *apiextv1.CustomResourceDefinition, options metav1.CreateOptions) (*apiextv1.CustomResourceDefinition, error) { | ||||||
|  | 	return &apiextv1.CustomResourceDefinition{}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (mock *MockCustomResourceDefinitionsGetter) CustomResourceDefinitions() apiextv1client.CustomResourceDefinitionInterface { | ||||||
|  | 	return &mockCustomResourceDefinition{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) { | func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) { | ||||||
| 	oldFormatSecret := &v1.Secret{} | 	oldFormatSecret := &v1.Secret{} | ||||||
| 	oldFormatSecret.Name = "testcluster" | 	oldFormatSecret.Name = "testcluster" | ||||||
|  | @ -444,6 +464,8 @@ func NewMockKubernetesClient() KubernetesClient { | ||||||
| 		ConfigMapsGetter:  &MockConfigMapsGetter{}, | 		ConfigMapsGetter:  &MockConfigMapsGetter{}, | ||||||
| 		DeploymentsGetter: &MockDeploymentGetter{}, | 		DeploymentsGetter: &MockDeploymentGetter{}, | ||||||
| 		ServicesGetter:    &MockServiceGetter{}, | 		ServicesGetter:    &MockServiceGetter{}, | ||||||
|  | 
 | ||||||
|  | 		CustomResourceDefinitionsGetter: &MockCustomResourceDefinitionsGetter{}, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue