diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 10df7974c..a135c5767 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "encoding/json" "fmt" "reflect" "sort" @@ -13,6 +14,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, error) { @@ -29,8 +31,12 @@ func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, err func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) error { c.setProcessName("updating event streams") - - if _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}); err != nil { + patch, err := json.Marshal(newEventStreams) + if err != nil { + return fmt.Errorf("could not marshal new event stream CRD %q: %v", newEventStreams.Name, err) + } + if _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Patch( + context.TODO(), newEventStreams.Name, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { return err } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 7030c914e..f71178823 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -294,6 +294,21 @@ func TestSameStreams(t *testing.T) { }, } + stream3 := zalandov1.EventStream{ + EventStreamFlow: zalandov1.EventStreamFlow{}, + EventStreamRecovery: zalandov1.EventStreamRecovery{ + Type: constants.EventStreamRecoveryNoneType, + }, + EventStreamSink: zalandov1.EventStreamSink{ + EventType: "stream-type-b", + }, + EventStreamSource: zalandov1.EventStreamSource{ + EventStreamTable: zalandov1.EventStreamTable{ + Name: "bar", + }, + }, + } + tests := []struct { subTest string streamsA []zalandov1.EventStream @@ -336,6 +351,13 @@ func TestSameStreams(t *testing.T) { match: false, reason: "number of defined streams is different", }, + { + subTest: "event stream recovery specs differ", + streamsA: []zalandov1.EventStream{stream2}, + streamsB: []zalandov1.EventStream{stream3}, + match: false, + reason: "event stream specs differ", + }, } for _, tt := range tests { @@ -409,6 +431,28 @@ func TestUpdateFabricEventStream(t *testing.T) { result := cluster.generateFabricEventStream(appId) if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { - t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streams.Items[0], result) + t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result) + } + + // disable recovery + for _, stream := range pg.Spec.Streams { + if stream.ApplicationId == appId { + stream.EnableRecovery = util.False() + } + } + patchData, err = specPatch(pg.Spec) + assert.NoError(t, err) + + pgPatched, err = cluster.KubeClient.Postgresqls(namespace).Patch( + context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec") + assert.NoError(t, err) + + cluster.Postgresql.Spec = pgPatched.Spec + err = cluster.createOrUpdateStreams() + assert.NoError(t, err) + + result = cluster.generateFabricEventStream(appId) + if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { + t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result) } }