patch event stream CRD instead of update (#2535)
This commit is contained in:
parent
569fc57fc8
commit
3d9d0258f0
|
|
@ -2,6 +2,7 @@ package cluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
@ -13,6 +14,7 @@ import (
|
||||||
"github.com/zalando/postgres-operator/pkg/util/constants"
|
"github.com/zalando/postgres-operator/pkg/util/constants"
|
||||||
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
|
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, error) {
|
func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, error) {
|
||||||
|
|
@ -29,8 +31,12 @@ func (c *Cluster) createStreams(appId string) (*zalandov1.FabricEventStream, err
|
||||||
|
|
||||||
func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) error {
|
func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) error {
|
||||||
c.setProcessName("updating event streams")
|
c.setProcessName("updating event streams")
|
||||||
|
patch, err := json.Marshal(newEventStreams)
|
||||||
if _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}); err != nil {
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not marshal new event stream CRD %q: %v", newEventStreams.Name, err)
|
||||||
|
}
|
||||||
|
if _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Patch(
|
||||||
|
context.TODO(), newEventStreams.Name, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -294,6 +294,21 @@ func TestSameStreams(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stream3 := zalandov1.EventStream{
|
||||||
|
EventStreamFlow: zalandov1.EventStreamFlow{},
|
||||||
|
EventStreamRecovery: zalandov1.EventStreamRecovery{
|
||||||
|
Type: constants.EventStreamRecoveryNoneType,
|
||||||
|
},
|
||||||
|
EventStreamSink: zalandov1.EventStreamSink{
|
||||||
|
EventType: "stream-type-b",
|
||||||
|
},
|
||||||
|
EventStreamSource: zalandov1.EventStreamSource{
|
||||||
|
EventStreamTable: zalandov1.EventStreamTable{
|
||||||
|
Name: "bar",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
subTest string
|
subTest string
|
||||||
streamsA []zalandov1.EventStream
|
streamsA []zalandov1.EventStream
|
||||||
|
|
@ -336,6 +351,13 @@ func TestSameStreams(t *testing.T) {
|
||||||
match: false,
|
match: false,
|
||||||
reason: "number of defined streams is different",
|
reason: "number of defined streams is different",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
subTest: "event stream recovery specs differ",
|
||||||
|
streamsA: []zalandov1.EventStream{stream2},
|
||||||
|
streamsB: []zalandov1.EventStream{stream3},
|
||||||
|
match: false,
|
||||||
|
reason: "event stream specs differ",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
|
@ -409,6 +431,28 @@ func TestUpdateFabricEventStream(t *testing.T) {
|
||||||
|
|
||||||
result := cluster.generateFabricEventStream(appId)
|
result := cluster.generateFabricEventStream(appId)
|
||||||
if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match {
|
if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match {
|
||||||
t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streams.Items[0], result)
|
t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// disable recovery
|
||||||
|
for _, stream := range pg.Spec.Streams {
|
||||||
|
if stream.ApplicationId == appId {
|
||||||
|
stream.EnableRecovery = util.False()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
patchData, err = specPatch(pg.Spec)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
pgPatched, err = cluster.KubeClient.Postgresqls(namespace).Patch(
|
||||||
|
context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
cluster.Postgresql.Spec = pgPatched.Spec
|
||||||
|
err = cluster.createOrUpdateStreams()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
result = cluster.generateFabricEventStream(appId)
|
||||||
|
if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match {
|
||||||
|
t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue