do not use extra labels to list stream CRDs (#2803)
* do not use extra labels to list stream CRDs * add diff on labels for streams + unit test coverage
This commit is contained in:
parent
80ef38f7f0
commit
d44bfabe78
|
|
@ -467,7 +467,9 @@ func (c *Cluster) syncStream(appId string) error {
|
||||||
c.setProcessName("syncing stream with applicationId %s", appId)
|
c.setProcessName("syncing stream with applicationId %s", appId)
|
||||||
c.logger.Debugf("syncing stream with applicationId %s", appId)
|
c.logger.Debugf("syncing stream with applicationId %s", appId)
|
||||||
|
|
||||||
listOptions := metav1.ListOptions{LabelSelector: c.labelsSet(true).String()}
|
listOptions := metav1.ListOptions{
|
||||||
|
LabelSelector: c.labelsSet(false).String(),
|
||||||
|
}
|
||||||
streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions)
|
streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err)
|
return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err)
|
||||||
|
|
@ -492,7 +494,8 @@ func (c *Cluster) syncStream(appId string) error {
|
||||||
}
|
}
|
||||||
if match, reason := c.compareStreams(&stream, desiredStreams); !match {
|
if match, reason := c.compareStreams(&stream, desiredStreams); !match {
|
||||||
c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason)
|
c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason)
|
||||||
desiredStreams.ObjectMeta = stream.ObjectMeta
|
// make sure to keep the old name with randomly generated suffix
|
||||||
|
desiredStreams.ObjectMeta.Name = stream.ObjectMeta.Name
|
||||||
updatedStream, err := c.updateStreams(desiredStreams)
|
updatedStream, err := c.updateStreams(desiredStreams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err)
|
return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err)
|
||||||
|
|
@ -527,6 +530,11 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab
|
||||||
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
|
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(curEventStreams.ObjectMeta.Labels, newEventStreams.ObjectMeta.Labels) {
|
||||||
|
match = false
|
||||||
|
reasons = append(reasons, "new streams labels do not match the current ones")
|
||||||
|
}
|
||||||
|
|
||||||
if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed {
|
if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed {
|
||||||
match = false
|
match = false
|
||||||
reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason))
|
reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason))
|
||||||
|
|
|
||||||
|
|
@ -490,7 +490,7 @@ func TestGenerateFabricEventStream(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
listOptions := metav1.ListOptions{
|
listOptions := metav1.ListOptions{
|
||||||
LabelSelector: cluster.labelsSet(true).String(),
|
LabelSelector: cluster.labelsSet(false).String(),
|
||||||
}
|
}
|
||||||
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
|
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
@ -529,7 +529,8 @@ func newFabricEventStream(streams []zalandov1.EventStream, annotations map[strin
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncStreams(t *testing.T) {
|
func TestSyncStreams(t *testing.T) {
|
||||||
pg.Name = fmt.Sprintf("%s-2", pg.Name)
|
newClusterName := fmt.Sprintf("%s-2", pg.Name)
|
||||||
|
pg.Name = newClusterName
|
||||||
var cluster = New(
|
var cluster = New(
|
||||||
Config{
|
Config{
|
||||||
OpConfig: config.Config{
|
OpConfig: config.Config{
|
||||||
|
|
@ -560,7 +561,7 @@ func TestSyncStreams(t *testing.T) {
|
||||||
|
|
||||||
// check that only one stream remains after sync
|
// check that only one stream remains after sync
|
||||||
listOptions := metav1.ListOptions{
|
listOptions := metav1.ListOptions{
|
||||||
LabelSelector: cluster.labelsSet(true).String(),
|
LabelSelector: cluster.labelsSet(false).String(),
|
||||||
}
|
}
|
||||||
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
|
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
@ -812,6 +813,49 @@ func TestDeleteStreams(t *testing.T) {
|
||||||
err = cluster.syncStream(appId)
|
err = cluster.syncStream(appId)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// change specs of streams and patch CRD
|
||||||
|
for i, stream := range pg.Spec.Streams {
|
||||||
|
if stream.ApplicationId == appId {
|
||||||
|
streamTable := stream.Tables["data.bar"]
|
||||||
|
streamTable.EventType = "stream-type-c"
|
||||||
|
stream.Tables["data.bar"] = streamTable
|
||||||
|
stream.BatchSize = k8sutil.UInt32ToPointer(uint32(250))
|
||||||
|
pg.Spec.Streams[i] = stream
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// compare stream returned from API with expected stream
|
||||||
|
listOptions := metav1.ListOptions{
|
||||||
|
LabelSelector: cluster.labelsSet(false).String(),
|
||||||
|
}
|
||||||
|
streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
|
||||||
|
result := cluster.generateFabricEventStream(appId)
|
||||||
|
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
|
||||||
|
t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// change teamId and check that stream is updated
|
||||||
|
pg.Spec.TeamID = "new-team"
|
||||||
|
streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
|
||||||
|
result = cluster.generateFabricEventStream(appId)
|
||||||
|
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
|
||||||
|
t.Errorf("Malformed FabricEventStream after updating teamId, expected %#v, got %#v", streams.Items[0].ObjectMeta.Labels, result.ObjectMeta.Labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
// disable recovery
|
||||||
|
for idx, stream := range pg.Spec.Streams {
|
||||||
|
if stream.ApplicationId == appId {
|
||||||
|
stream.EnableRecovery = util.False()
|
||||||
|
pg.Spec.Streams[idx] = stream
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
|
||||||
|
result = cluster.generateFabricEventStream(appId)
|
||||||
|
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
|
||||||
|
t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result)
|
||||||
|
}
|
||||||
|
|
||||||
// remove streams from manifest
|
// remove streams from manifest
|
||||||
pg.Spec.Streams = nil
|
pg.Spec.Streams = nil
|
||||||
pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update(
|
pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update(
|
||||||
|
|
@ -822,10 +866,7 @@ func TestDeleteStreams(t *testing.T) {
|
||||||
cluster.cleanupRemovedStreams(appIds)
|
cluster.cleanupRemovedStreams(appIds)
|
||||||
|
|
||||||
// check that streams have been deleted
|
// check that streams have been deleted
|
||||||
listOptions := metav1.ListOptions{
|
streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
|
||||||
LabelSelector: cluster.labelsSet(true).String(),
|
|
||||||
}
|
|
||||||
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
|
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))
|
assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue