remove streams delete and extend unit tests (#2737)
This commit is contained in:
		
							parent
							
								
									4929dd204c
								
							
						
					
					
						commit
						301462c415
					
				|  | @ -453,15 +453,6 @@ func (c *Cluster) syncStream(appId string) error { | ||||||
| 		if stream.Spec.ApplicationId != appId { | 		if stream.Spec.ApplicationId != appId { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if streamExists { |  | ||||||
| 			c.logger.Warningf("more than one event stream with applicationId %s found, delete it", appId) |  | ||||||
| 			if err = c.KubeClient.FabricEventStreams(stream.ObjectMeta.Namespace).Delete(context.TODO(), stream.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil { |  | ||||||
| 				c.logger.Errorf("could not delete event stream %q with applicationId %s: %v", stream.ObjectMeta.Name, appId, err) |  | ||||||
| 			} else { |  | ||||||
| 				c.logger.Infof("redundant event stream %q with applicationId %s has been successfully deleted", stream.ObjectMeta.Name, appId) |  | ||||||
| 			} |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		streamExists = true | 		streamExists = true | ||||||
| 		desiredStreams := c.generateFabricEventStream(appId) | 		desiredStreams := c.generateFabricEventStream(appId) | ||||||
| 		if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { | 		if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { | ||||||
|  | @ -484,6 +475,7 @@ func (c *Cluster) syncStream(appId string) error { | ||||||
| 			c.Streams[appId] = updatedStream | 			c.Streams[appId] = updatedStream | ||||||
| 			c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId) | 			c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId) | ||||||
| 		} | 		} | ||||||
|  | 		break | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if !streamExists { | 	if !streamExists { | ||||||
|  |  | ||||||
|  | @ -90,7 +90,7 @@ var ( | ||||||
| 			Namespace: namespace, | 			Namespace: namespace, | ||||||
| 			Labels: map[string]string{ | 			Labels: map[string]string{ | ||||||
| 				"application":  "spilo", | 				"application":  "spilo", | ||||||
| 				"cluster-name": fmt.Sprintf("%s-2", clusterName), | 				"cluster-name": clusterName, | ||||||
| 				"team":         "acid", | 				"team":         "acid", | ||||||
| 			}, | 			}, | ||||||
| 			OwnerReferences: []metav1.OwnerReference{ | 			OwnerReferences: []metav1.OwnerReference{ | ||||||
|  | @ -494,14 +494,13 @@ func TestSyncStreams(t *testing.T) { | ||||||
| 			OpConfig: config.Config{ | 			OpConfig: config.Config{ | ||||||
| 				PodManagementPolicy: "ordered_ready", | 				PodManagementPolicy: "ordered_ready", | ||||||
| 				Resources: config.Resources{ | 				Resources: config.Resources{ | ||||||
| 					ClusterLabels:         map[string]string{"application": "spilo"}, | 					ClusterLabels:        map[string]string{"application": "spilo"}, | ||||||
| 					ClusterNameLabel:      "cluster-name", | 					ClusterNameLabel:     "cluster-name", | ||||||
| 					DefaultCPURequest:     "300m", | 					DefaultCPURequest:    "300m", | ||||||
| 					DefaultCPULimit:       "300m", | 					DefaultCPULimit:      "300m", | ||||||
| 					DefaultMemoryRequest:  "300Mi", | 					DefaultMemoryRequest: "300Mi", | ||||||
| 					DefaultMemoryLimit:    "300Mi", | 					DefaultMemoryLimit:   "300Mi", | ||||||
| 					EnableOwnerReferences: util.True(), | 					PodRoleLabel:         "spilo-role", | ||||||
| 					PodRoleLabel:          "spilo-role", |  | ||||||
| 				}, | 				}, | ||||||
| 			}, | 			}, | ||||||
| 		}, client, pg, logger, eventRecorder) | 		}, client, pg, logger, eventRecorder) | ||||||
|  | @ -514,33 +513,17 @@ func TestSyncStreams(t *testing.T) { | ||||||
| 	err = cluster.syncStream(appId) | 	err = cluster.syncStream(appId) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 
 | 
 | ||||||
| 	// create a second stream with same spec but with different name
 | 	// sync the stream again
 | ||||||
| 	createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create( | 	err = cluster.syncStream(appId) | ||||||
| 		context.TODO(), fes, metav1.CreateOptions{}) |  | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, createdStream.Spec.ApplicationId, appId) |  | ||||||
| 
 | 
 | ||||||
| 	// check that two streams exist
 | 	// check that only one stream remains after sync
 | ||||||
| 	listOptions := metav1.ListOptions{ | 	listOptions := metav1.ListOptions{ | ||||||
| 		LabelSelector: cluster.labelsSet(true).String(), | 		LabelSelector: cluster.labelsSet(true).String(), | ||||||
| 	} | 	} | ||||||
| 	streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) | 	streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equalf(t, 2, len(streams.Items), "unexpected number of streams found: got %d, but expected only 2", len(streams.Items)) |  | ||||||
| 
 |  | ||||||
| 	// sync the stream which should remove the redundant stream
 |  | ||||||
| 	err = cluster.syncStream(appId) |  | ||||||
| 	assert.NoError(t, err) |  | ||||||
| 
 |  | ||||||
| 	// check that only one stream remains after sync
 |  | ||||||
| 	streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) |  | ||||||
| 	assert.NoError(t, err) |  | ||||||
| 	assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items)) | 	assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items)) | ||||||
| 
 |  | ||||||
| 	// check owner references
 |  | ||||||
| 	if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) { |  | ||||||
| 		t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences) |  | ||||||
| 	} |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func TestSameStreams(t *testing.T) { | func TestSameStreams(t *testing.T) { | ||||||
|  | @ -663,13 +646,14 @@ func TestUpdateStreams(t *testing.T) { | ||||||
| 			OpConfig: config.Config{ | 			OpConfig: config.Config{ | ||||||
| 				PodManagementPolicy: "ordered_ready", | 				PodManagementPolicy: "ordered_ready", | ||||||
| 				Resources: config.Resources{ | 				Resources: config.Resources{ | ||||||
| 					ClusterLabels:        map[string]string{"application": "spilo"}, | 					ClusterLabels:         map[string]string{"application": "spilo"}, | ||||||
| 					ClusterNameLabel:     "cluster-name", | 					ClusterNameLabel:      "cluster-name", | ||||||
| 					DefaultCPURequest:    "300m", | 					DefaultCPURequest:     "300m", | ||||||
| 					DefaultCPULimit:      "300m", | 					DefaultCPULimit:       "300m", | ||||||
| 					DefaultMemoryRequest: "300Mi", | 					DefaultMemoryRequest:  "300Mi", | ||||||
| 					DefaultMemoryLimit:   "300Mi", | 					DefaultMemoryLimit:    "300Mi", | ||||||
| 					PodRoleLabel:         "spilo-role", | 					EnableOwnerReferences: util.True(), | ||||||
|  | 					PodRoleLabel:          "spilo-role", | ||||||
| 				}, | 				}, | ||||||
| 			}, | 			}, | ||||||
| 		}, client, pg, logger, eventRecorder) | 		}, client, pg, logger, eventRecorder) | ||||||
|  | @ -678,10 +662,31 @@ func TestUpdateStreams(t *testing.T) { | ||||||
| 		context.TODO(), &pg, metav1.CreateOptions{}) | 		context.TODO(), &pg, metav1.CreateOptions{}) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 
 | 
 | ||||||
| 	// create the stream
 | 	// create stream with different owner reference
 | ||||||
|  | 	fes.ObjectMeta.Name = fmt.Sprintf("%s-12345", pg.Name) | ||||||
|  | 	fes.ObjectMeta.Labels["cluster-name"] = pg.Name | ||||||
|  | 	createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create( | ||||||
|  | 		context.TODO(), fes, metav1.CreateOptions{}) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 	assert.Equal(t, createdStream.Spec.ApplicationId, appId) | ||||||
|  | 
 | ||||||
|  | 	// sync the stream which should update the owner reference
 | ||||||
| 	err = cluster.syncStream(appId) | 	err = cluster.syncStream(appId) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 
 | 
 | ||||||
|  | 	// check that only one stream exists after sync
 | ||||||
|  | 	listOptions := metav1.ListOptions{ | ||||||
|  | 		LabelSelector: cluster.labelsSet(true).String(), | ||||||
|  | 	} | ||||||
|  | 	streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 	assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items)) | ||||||
|  | 
 | ||||||
|  | 	// compare owner references
 | ||||||
|  | 	if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) { | ||||||
|  | 		t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	// change specs of streams and patch CRD
 | 	// change specs of streams and patch CRD
 | ||||||
| 	for i, stream := range pg.Spec.Streams { | 	for i, stream := range pg.Spec.Streams { | ||||||
| 		if stream.ApplicationId == appId { | 		if stream.ApplicationId == appId { | ||||||
|  | @ -694,10 +699,7 @@ func TestUpdateStreams(t *testing.T) { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// compare stream returned from API with expected stream
 | 	// compare stream returned from API with expected stream
 | ||||||
| 	listOptions := metav1.ListOptions{ | 	streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) | ||||||
| 		LabelSelector: cluster.labelsSet(true).String(), |  | ||||||
| 	} |  | ||||||
| 	streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) |  | ||||||
| 	result := cluster.generateFabricEventStream(appId) | 	result := cluster.generateFabricEventStream(appId) | ||||||
| 	if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { | 	if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { | ||||||
| 		t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result) | 		t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result) | ||||||
|  | @ -716,23 +718,6 @@ func TestUpdateStreams(t *testing.T) { | ||||||
| 	if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { | 	if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { | ||||||
| 		t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result) | 		t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result) | ||||||
| 	} | 	} | ||||||
| 
 |  | ||||||
| 	mockClient := k8sutil.NewMockKubernetesClient() |  | ||||||
| 	cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter |  | ||||||
| 
 |  | ||||||
| 	// remove streams from manifest
 |  | ||||||
| 	pg.Spec.Streams = nil |  | ||||||
| 	pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update( |  | ||||||
| 		context.TODO(), &pg, metav1.UpdateOptions{}) |  | ||||||
| 	assert.NoError(t, err) |  | ||||||
| 
 |  | ||||||
| 	appIds := getDistinctApplicationIds(pgUpdated.Spec.Streams) |  | ||||||
| 	cluster.cleanupRemovedStreams(appIds) |  | ||||||
| 
 |  | ||||||
| 	streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) |  | ||||||
| 	if len(streams.Items) > 0 || err != nil { |  | ||||||
| 		t.Errorf("stream resource has not been removed or unexpected error %v", err) |  | ||||||
| 	} |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) { | func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) { | ||||||
|  | @ -752,3 +737,68 @@ func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.Postg | ||||||
| 
 | 
 | ||||||
| 	return streams | 	return streams | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func TestDeleteStreams(t *testing.T) { | ||||||
|  | 	pg.Name = fmt.Sprintf("%s-4", pg.Name) | ||||||
|  | 	var cluster = New( | ||||||
|  | 		Config{ | ||||||
|  | 			OpConfig: config.Config{ | ||||||
|  | 				PodManagementPolicy: "ordered_ready", | ||||||
|  | 				Resources: config.Resources{ | ||||||
|  | 					ClusterLabels:        map[string]string{"application": "spilo"}, | ||||||
|  | 					ClusterNameLabel:     "cluster-name", | ||||||
|  | 					DefaultCPURequest:    "300m", | ||||||
|  | 					DefaultCPULimit:      "300m", | ||||||
|  | 					DefaultMemoryRequest: "300Mi", | ||||||
|  | 					DefaultMemoryLimit:   "300Mi", | ||||||
|  | 					PodRoleLabel:         "spilo-role", | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, client, pg, logger, eventRecorder) | ||||||
|  | 
 | ||||||
|  | 	_, err := cluster.KubeClient.Postgresqls(namespace).Create( | ||||||
|  | 		context.TODO(), &pg, metav1.CreateOptions{}) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 	// create the stream
 | ||||||
|  | 	err = cluster.syncStream(appId) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 	// remove streams from manifest
 | ||||||
|  | 	pg.Spec.Streams = nil | ||||||
|  | 	pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update( | ||||||
|  | 		context.TODO(), &pg, metav1.UpdateOptions{}) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 	appIds := getDistinctApplicationIds(pgUpdated.Spec.Streams) | ||||||
|  | 	cluster.cleanupRemovedStreams(appIds) | ||||||
|  | 
 | ||||||
|  | 	// check that streams have been deleted
 | ||||||
|  | 	listOptions := metav1.ListOptions{ | ||||||
|  | 		LabelSelector: cluster.labelsSet(true).String(), | ||||||
|  | 	} | ||||||
|  | 	streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 	assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items)) | ||||||
|  | 
 | ||||||
|  | 	// create stream to test deleteStreams code
 | ||||||
|  | 	fes.ObjectMeta.Name = fmt.Sprintf("%s-12345", pg.Name) | ||||||
|  | 	fes.ObjectMeta.Labels["cluster-name"] = pg.Name | ||||||
|  | 	_, err = cluster.KubeClient.FabricEventStreams(namespace).Create( | ||||||
|  | 		context.TODO(), fes, metav1.CreateOptions{}) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 	// sync it once to cluster struct
 | ||||||
|  | 	err = cluster.syncStream(appId) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 	// we need a mock client because deleteStreams checks for CRD existance
 | ||||||
|  | 	mockClient := k8sutil.NewMockKubernetesClient() | ||||||
|  | 	cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter | ||||||
|  | 	cluster.deleteStreams() | ||||||
|  | 
 | ||||||
|  | 	// check that streams have been deleted
 | ||||||
|  | 	streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 	assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items)) | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue