From 1bd12961fd4a9356d10dff94ceab18396fd42680 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Fri, 9 Jul 2021 19:11:28 +0200 Subject: [PATCH] provide event stream API completed codegen manually provide update and compare code for fes add db check for specified databases and tables add unit test --- hack/update-codegen.sh | 23 +- manifests/operator-service-account-rbac.yaml | 14 ++ pkg/apis/acid.zalan.do/v1/crds.go | 47 ++++ pkg/apis/acid.zalan.do/v1/postgresql_type.go | 12 + .../acid.zalan.do/v1/zz_generated.deepcopy.go | 37 +++ pkg/apis/zalando.org/register.go | 6 + .../zalando.org/v1alpha1/fabriceventstream.go | 88 +++++++ pkg/apis/zalando.org/v1alpha1/register.go | 46 ++++ .../v1alpha1/zz_generated.deepcopy.go | 227 ++++++++++++++++++ pkg/cluster/cluster.go | 9 + pkg/cluster/database.go | 38 +++ pkg/cluster/streams.go | 196 +++++++++++++++ pkg/cluster/streams_test.go | 105 ++++++++ .../clientset/versioned/clientset.go | 13 +- .../versioned/fake/clientset_generated.go | 7 + .../clientset/versioned/fake/register.go | 2 + .../clientset/versioned/scheme/register.go | 2 + .../typed/zalando.org/v1alpha1/doc.go | 26 ++ .../zalando.org/v1alpha1/fabriceventstream.go | 184 ++++++++++++++ .../typed/zalando.org/v1alpha1/fake/doc.go | 26 ++ .../v1alpha1/fake/fake_fabriceventstream.go | 136 +++++++++++ .../v1alpha1/fake/fake_zalando.org_client.go | 46 ++++ .../v1alpha1/generated_expansion.go | 27 +++ .../v1alpha1/zalando.org_client.go | 95 ++++++++ .../informers/externalversions/factory.go | 6 + .../informers/externalversions/generic.go | 5 + .../externalversions/zalando.org/interface.go | 52 ++++ .../zalando.org/v1alpha1/fabriceventstream.go | 96 ++++++++ .../zalando.org/v1alpha1/interface.go | 51 ++++ .../v1alpha1/expansion_generated.go | 33 +++ .../zalando.org/v1alpha1/fabriceventstream.go | 105 ++++++++ pkg/util/constants/streams.go | 17 ++ pkg/util/k8sutil/k8sutil.go | 16 +- 33 files changed, 1779 insertions(+), 14 deletions(-) create mode 100644 pkg/apis/zalando.org/register.go create mode 100644 pkg/apis/zalando.org/v1alpha1/fabriceventstream.go create mode 100644 pkg/apis/zalando.org/v1alpha1/register.go create mode 100644 pkg/apis/zalando.org/v1alpha1/zz_generated.deepcopy.go create mode 100644 pkg/cluster/streams.go create mode 100644 pkg/cluster/streams_test.go create mode 100644 pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/doc.go create mode 100644 pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fabriceventstream.go create mode 100644 pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/doc.go create mode 100644 pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/fake_fabriceventstream.go create mode 100644 pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/fake_zalando.org_client.go create mode 100644 pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/generated_expansion.go create mode 100644 pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/zalando.org_client.go create mode 100644 pkg/generated/informers/externalversions/zalando.org/interface.go create mode 100644 pkg/generated/informers/externalversions/zalando.org/v1alpha1/fabriceventstream.go create mode 100644 pkg/generated/informers/externalversions/zalando.org/v1alpha1/interface.go create mode 100644 pkg/generated/listers/zalando.org/v1alpha1/expansion_generated.go create mode 100644 pkg/generated/listers/zalando.org/v1alpha1/fabriceventstream.go create mode 100644 pkg/util/constants/streams.go diff --git a/hack/update-codegen.sh b/hack/update-codegen.sh index 280da9385..18c71f83c 100755 --- a/hack/update-codegen.sh +++ b/hack/update-codegen.sh @@ -1,13 +1,18 @@ #!/usr/bin/env bash +set -eou pipefail -set -o errexit -set -o nounset -set -o pipefail +GOPKG="github.com/zalando/postgres-operator" +SCRIPT_ROOT="$(dirname "${BASH_SOURCE[0]}")/.." -SCRIPT_ROOT=$(dirname ${BASH_SOURCE})/.. -CODEGEN_PKG=${CODEGEN_PKG:-$(cd ${SCRIPT_ROOT}; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ${GOPATH}/src/k8s.io/code-generator)} +rm -rf "${SCRIPT_ROOT}/generated" -bash "${CODEGEN_PKG}/generate-groups.sh" all \ - github.com/zalando/postgres-operator/pkg/generated github.com/zalando/postgres-operator/pkg/apis \ - "acid.zalan.do:v1" \ - --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt +go run k8s.io/code-generator/cmd/deepcopy-gen \ + --input-dirs ${GOPKG}/pkg/apis/acid.zalan.do/v1,${GOPKG}/pkg/apis/zalando.org/v1alpha1 \ + -O zz_generated.deepcopy \ + --bounding-dirs ${GOPKG}/pkg/apis \ + --go-header-file "${SCRIPT_ROOT}/hack/custom-boilerplate.go.txt" \ + -o "${SCRIPT_ROOT}/generated" + +cp -rv "${SCRIPT_ROOT}/generated/${GOPKG}"/* . + +rm -rf "${SCRIPT_ROOT}/generated" \ No newline at end of file diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index f0307f6a0..48da5f06a 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -35,6 +35,20 @@ rules: - get - list - watch +# all verbs allowed for event streams +- apiGroups: + - zalando.org + resources: + - fabriceventstream + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch # to create or get/update CRDs when starting up - apiGroups: - apiextensions.k8s.io diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index a95eeab20..105e14cd7 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -657,6 +657,53 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ }, }, }, + "streams": { + Type: "array", + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + Required: []string{"type"}, + Properties: map[string]apiextv1.JSONSchemaProps{ + "database": { + Type: "string", + }, + "filter": { + Type: "object", + AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, + "sqsArn": { + Type: "string", + }, + "tables": { + Type: "object", + AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, + "type": { + Type: "string", + Enum: []apiextv1.JSON{ + { + Raw: []byte(`"nakadi"`), + }, + { + Raw: []byte(`"sqs"`), + }, + { + Raw: []byte(`"wal"`), + }, + }, + }, + }, + }, + }, + }, "teamId": { Type: "string", }, diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 7346fb0e5..02eaf6ee6 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -74,6 +74,7 @@ type PostgresSpec struct { ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"` TLS *TLSDescription `json:"tls,omitempty"` AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"` + Streams []Stream `json:"stream,omitempty"` // deprecated json tags InitContainersOld []v1.Container `json:"init_containers,omitempty"` @@ -224,3 +225,14 @@ type ConnectionPooler struct { Resources `json:"resources,omitempty"` } + +type Stream struct { + Type string `json:"type"` + Database string `json:"database,omitempty"` + Tables map[string]string `json:"tables,omitempty"` + Filter map[string]string `json:"filter,omitempty"` + BatchSize uint32 `json:"batchSize,omitempty"` + SqsArn string `json:"sqsArn,omitempty"` + QueueName string `json:"queueName,omitempty"` + User string `json:"user,omitempty"` +} diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 584a72143..a8be4e143 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -722,6 +722,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Streams != nil { + in, out := &in.Streams, &out.Streams + *out = make([]Stream, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.InitContainersOld != nil { in, out := &in.InitContainersOld, &out.InitContainersOld *out = make([]corev1.Container, len(*in)) @@ -1125,6 +1132,36 @@ func (in *StandbyDescription) DeepCopy() *StandbyDescription { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Stream) DeepCopyInto(out *Stream) { + *out = *in + if in.Tables != nil { + in, out := &in.Tables, &out.Tables + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Filter != nil { + in, out := &in.Filter, &out.Filter + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Stream. +func (in *Stream) DeepCopy() *Stream { + if in == nil { + return nil + } + out := new(Stream) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TLSDescription) DeepCopyInto(out *TLSDescription) { *out = *in diff --git a/pkg/apis/zalando.org/register.go b/pkg/apis/zalando.org/register.go new file mode 100644 index 000000000..3dbd3f089 --- /dev/null +++ b/pkg/apis/zalando.org/register.go @@ -0,0 +1,6 @@ +package zalando + +const ( + // GroupName is the group name for the operator CRDs + GroupName = "zalando.org" +) diff --git a/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go b/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go new file mode 100644 index 000000000..37f1449c9 --- /dev/null +++ b/pkg/apis/zalando.org/v1alpha1/fabriceventstream.go @@ -0,0 +1,88 @@ +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// FabricEventStream defines FabricEventStream Custom Resource Definition Object. +type FabricEventStream struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FabricEventStreamSpec `json:"spec"` +} + +// FabricEventStreamSpec defines the specification for the FabricEventStream TPR. +type FabricEventStreamSpec struct { + ApplicationId string `json:"applicationId"` + EventStreams []EventStream `json:"eventStreams"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// FabricEventStreamList defines a list of FabricEventStreams . +type FabricEventStreamList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + + Items []FabricEventStream `json:"items"` +} + +// EventStream defines the source, flow and sink of the event stream +type EventStream struct { + EventStreamFlow EventStreamFlow `json:"flow"` + EventStreamSink EventStreamSink `json:"sink"` + EventStreamSource EventStreamSource `json:"source"` +} + +// EventStreamFlow defines the flow characteristics of the event stream +type EventStreamFlow struct { + Type string `json:"type"` + DataTypeColumn string `json:"dataTypeColumn,omitempty"` + DataOpColumn string `json:"dataOpColumn,omitempty"` + MetadataColumn string `json:"metadataColumn,omitempty"` + DataColumn string `json:"dataColumn,omitempty"` + CallHomeIdColumn string `json:"callHomeIdColumn,omitempty"` + CallHomeUrl string `json:"callHomeUrl,omitempty"` +} + +// EventStreamSink defines the target of the event stream +type EventStreamSink struct { + Type string `json:"type"` + EventType string `json:"eventType,omitempty"` + MaxBatchSize uint32 `json:"maxBatchSize,omitempty"` + QueueName string `json:"queueName,omitempty"` +} + +// EventStreamSource defines the source of the event stream and connection for FES operator +type EventStreamSource struct { + Type string `json:"type"` + Schema string `json:"schema,omitempty" defaults:"public"` + EventStreamTable EventStreamTable `json:"table"` + Filter string `json:"filter,omitempty"` + Connection Connection `json:"jdbcConnection"` +} + +// EventStreamTable defines the name and ID column to be used for streaming +type EventStreamTable struct { + Name string `json:"name"` + IDColumn string `json:"idColumn,omitempty" defaults:"id"` +} + +// Connection to be used for allowing the FES operator to connect to a database +type Connection struct { + Url string `json:"jdbcUrl"` + SlotName string `json:"slotName"` + DBAuth DBAuth `json:"databaseAuthentication"` +} + +// DBAuth specifies the credentials to be used for connecting with the database +type DBAuth struct { + Type string `json:"type"` + Name string `json:"name,omitempty"` + UserKey string `json:"userKey,omitempty"` + PasswordKey string `json:"passwordKey,omitempty"` +} diff --git a/pkg/apis/zalando.org/v1alpha1/register.go b/pkg/apis/zalando.org/v1alpha1/register.go new file mode 100644 index 000000000..07732ae53 --- /dev/null +++ b/pkg/apis/zalando.org/v1alpha1/register.go @@ -0,0 +1,46 @@ +package v1alpha1 + +import ( + "github.com/zalando/postgres-operator/pkg/apis/zalando.org" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" +) + +// APIVersion of the `fabriceventstream` CRD +const ( + APIVersion = "v1alpha1" +) + +var ( + schemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) + // AddToScheme applies all the stored functions to the scheme. A non-nil error + // indicates that one function failed and the attempt was abandoned. + AddToScheme = schemeBuilder.AddToScheme +) + +func init() { + err := AddToScheme(scheme.Scheme) + if err != nil { + panic(err) + } +} + +// SchemeGroupVersion is the group version used to register these objects. +var SchemeGroupVersion = schema.GroupVersion{Group: zalando.GroupName, Version: APIVersion} + +// Resource takes an unqualified resource and returns a Group-qualified GroupResource. +func Resource(resource string) schema.GroupResource { + return SchemeGroupVersion.WithResource(resource).GroupResource() +} + +// addKnownTypes adds the set of types defined in this package to the supplied scheme. +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &FabricEventStream{}, + &FabricEventStreamList{}, + ) + metav1.AddToGroupVersion(scheme, SchemeGroupVersion) + return nil +} diff --git a/pkg/apis/zalando.org/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/zalando.org/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 000000000..0327279e5 --- /dev/null +++ b/pkg/apis/zalando.org/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,227 @@ +// +build !ignore_autogenerated + +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Connection) DeepCopyInto(out *Connection) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Connection) DeepCopy() *Connection { + if in == nil { + return nil + } + out := new(Connection) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DBAuth) DeepCopyInto(out *DBAuth) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DBAuth) DeepCopy() *DBAuth { + if in == nil { + return nil + } + out := new(DBAuth) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStream) DeepCopyInto(out *EventStream) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStream) DeepCopy() *EventStream { + if in == nil { + return nil + } + out := new(EventStream) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamFlow) DeepCopyInto(out *EventStreamFlow) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamFlow) DeepCopy() *EventStreamFlow { + if in == nil { + return nil + } + out := new(EventStreamFlow) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamSink) DeepCopyInto(out *EventStreamSink) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamSink) DeepCopy() *EventStreamSink { + if in == nil { + return nil + } + out := new(EventStreamSink) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamSource) DeepCopyInto(out *EventStreamSource) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamSource) DeepCopy() *EventStreamSource { + if in == nil { + return nil + } + out := new(EventStreamSource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamTable) DeepCopyInto(out *EventStreamTable) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamTable) DeepCopy() *EventStreamTable { + if in == nil { + return nil + } + out := new(EventStreamTable) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FabricEventStreamSpec) DeepCopyInto(out *FabricEventStreamSpec) { + *out = *in + if in.EventStreams != nil { + in, out := &in.EventStreams, &out.EventStreams + *out = make([]EventStream, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FabricEventStreamSpec. +func (in *FabricEventStreamSpec) DeepCopy() *FabricEventStreamSpec { + if in == nil { + return nil + } + out := new(FabricEventStreamSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FabricEventStream) DeepCopyInto(out *FabricEventStream) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FabricEventStream. +func (in *FabricEventStream) DeepCopy() *FabricEventStream { + if in == nil { + return nil + } + out := new(FabricEventStream) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FabricEventStream) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FabricEventStreamList) DeepCopyInto(out *FabricEventStreamList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]FabricEventStream, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FabricEventStreamList. +func (in *FabricEventStreamList) DeepCopy() *FabricEventStreamList { + if in == nil { + return nil + } + out := new(FabricEventStreamList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FabricEventStreamList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index c9abb10fd..3165534e8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -361,6 +361,10 @@ func (c *Cluster) Create() error { // something fails, report warning c.createConnectionPooler(c.installLookupFunction) + if len(c.Spec.Streams) > 0 { + c.createStreams() + } + return nil } @@ -855,6 +859,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true } + if err := c.syncStreams(); err != nil { + c.logger.Errorf("could not sync streams: %v", err) + updateFailed = true + } + if !updateFailed { // Major version upgrade must only fire after success of earlier operations and should stay last if err := c.majorVersionUpgrade(); err != nil { diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index ba4cf223a..6e0a8ae30 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -33,6 +33,8 @@ const ( getExtensionsSQL = `SELECT e.extname, n.nspname FROM pg_catalog.pg_extension e LEFT JOIN pg_catalog.pg_namespace n ON n.oid = e.extnamespace ORDER BY 1;` + tableExistsSQL = `SELECT TRUE FROM pg_tables WHERE tablename = $1 AND schemaname = $2;` + createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` createDatabaseSchemaSQL = `SET ROLE TO "%s"; CREATE SCHEMA IF NOT EXISTS "%s" AUTHORIZATION "%s"` alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` @@ -506,6 +508,42 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi return nil } +// getExtension returns the list of current database extensions +// The caller is responsible for opening and closing the database connection +func (c *Cluster) tableExists(tableName, schemaName string) (bool, error) { + var ( + rows *sql.Rows + exists bool + err error + ) + + if rows, err = c.pgDb.Query(tableExistsSQL, tableName, schemaName); err != nil { + return false, fmt.Errorf("could not check table for existence: %v", err) + } + + defer func() { + if err2 := rows.Close(); err2 != nil { + if err != nil { + err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err) + } else { + err = fmt.Errorf("error when closing query cursor: %v", err2) + } + } + }() + + for rows.Next() { + if err = rows.Scan(&exists); err != nil { + return false, fmt.Errorf("error when processing row: %v", err) + } + } + + if exists { + return true, nil + } else { + return false, fmt.Errorf("table %s not found", schemaName+"."+tableName) + } +} + // Creates a connection pool credentials lookup function in every database to // perform remote authentication. func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string, role PostgresRole) error { diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go new file mode 100644 index 000000000..6f1b7efec --- /dev/null +++ b/pkg/cluster/streams.go @@ -0,0 +1,196 @@ +package cluster + +import ( + "context" + "fmt" + "reflect" + "strings" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var outboxTableNameTemplate config.StringTemplate = "{table}_{eventtype}_outbox" + +func (c *Cluster) createStreams() error { + c.setProcessName("creating streams") + + fes := c.generateFabricEventStream() + _, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("could not create event stream custom resource: %v", err) + } + + return nil +} + +func (c *Cluster) syncStreamDbResources() error { + + for _, stream := range c.Spec.Streams { + if err := c.initDbConnWithName(stream.Database); err != nil { + return fmt.Errorf("could not init connection to database %s specified for event stream: %v", stream.Database, err) + } + + for table, eventType := range stream.Tables { + tableName, schemaName := getTableSchema(table) + if exists, err := c.tableExists(tableName, schemaName); !exists { + return fmt.Errorf("could not find table %s specified for event stream: %v", table, err) + } + // check if outbox table exists and if not, create it + outboxTable := outboxTableNameTemplate.Format("table", tableName, "eventtype", eventType) + if exists, err := c.tableExists(outboxTable, schemaName); !exists { + return fmt.Errorf("could not find outbox table %s specified for event stream: %v", outboxTable, err) + } + } + } + + return nil +} + +func (c *Cluster) generateFabricEventStream() *zalandov1alpha1.FabricEventStream { + eventStreams := make([]zalandov1alpha1.EventStream, 0) + + for _, stream := range c.Spec.Streams { + for table, eventType := range stream.Tables { + streamSource := c.getEventStreamSource(stream, table, eventType) + streamFlow := getEventStreamFlow(stream) + streamSink := getEventStreamSink(stream, eventType) + + eventStreams = append(eventStreams, zalandov1alpha1.EventStream{ + EventStreamFlow: streamFlow, + EventStreamSink: streamSink, + EventStreamSource: streamSource}) + } + } + + return &zalandov1alpha1.FabricEventStream{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.Name + constants.FESsuffix, + Namespace: c.Namespace, + Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)), + }, + Spec: zalandov1alpha1.FabricEventStreamSpec{ + ApplicationId: "", + EventStreams: eventStreams, + }, + } +} + +func (c *Cluster) getEventStreamSource(stream acidv1.Stream, table, eventType string) zalandov1alpha1.EventStreamSource { + streamFilter := stream.Filter[table] + _, schema := getTableSchema(table) + return zalandov1alpha1.EventStreamSource{ + Type: constants.EventStreamSourcePGType, + Schema: schema, + EventStreamTable: getOutboxTable(table, eventType), + Filter: streamFilter, + Connection: c.getStreamConnection(stream.Database, stream.User), + } +} + +func getEventStreamFlow(stream acidv1.Stream) zalandov1alpha1.EventStreamFlow { + switch stream.Type { + case "nakadi": + return zalandov1alpha1.EventStreamFlow{ + Type: constants.EventStreamFlowPgNakadiType, + DataTypeColumn: constants.EventStreamFlowDataTypeColumn, + DataOpColumn: constants.EventStreamFlowDataOpColumn, + MetadataColumn: constants.EventStreamFlowMetadataColumn, + DataColumn: constants.EventStreamFlowDataColumn} + case "sqs": + return zalandov1alpha1.EventStreamFlow{ + Type: constants.EventStreamFlowPgApiType, + CallHomeIdColumn: "id", + CallHomeUrl: stream.SqsArn} + } + + return zalandov1alpha1.EventStreamFlow{} +} + +func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1alpha1.EventStreamSink { + switch stream.Type { + case "nakadi": + return zalandov1alpha1.EventStreamSink{ + Type: constants.EventStreamSinkNakadiType, + EventType: eventType, + MaxBatchSize: stream.BatchSize} + case "sqs": + return zalandov1alpha1.EventStreamSink{ + Type: constants.EventStreamSinkSqsType, + QueueName: stream.QueueName} + } + + return zalandov1alpha1.EventStreamSink{} +} + +func getTableSchema(fullTableName string) (tableName, schemaName string) { + schemaName = "public" + tableName = fullTableName + if strings.Contains(fullTableName, ".") { + schemaName = strings.Split(fullTableName, ".")[0] + tableName = strings.Split(fullTableName, ".")[1] + } + + return tableName, schemaName +} + +func getOutboxTable(tableName, eventType string) zalandov1alpha1.EventStreamTable { + return zalandov1alpha1.EventStreamTable{ + Name: outboxTableNameTemplate.Format("table", tableName, "eventtype", eventType), + IDColumn: "id", + } +} + +func (c *Cluster) getStreamConnection(database, user string) zalandov1alpha1.Connection { + return zalandov1alpha1.Connection{ + Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), + SlotName: constants.EventStreamSourceSlotName, + DBAuth: zalandov1alpha1.DBAuth{ + Type: constants.EventStreamSourceAuthType, + Name: c.credentialSecretNameForCluster(user, c.ClusterName), + UserKey: "username", + PasswordKey: "password", + }, + } +} + +func (c *Cluster) syncStreams() error { + + c.setProcessName("syncing streams") + + effectiveStreams, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Get(context.TODO(), c.Name+constants.FESsuffix, metav1.GetOptions{}) + if err != nil { + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("error during reading of event streams: %v", err) + } + + c.logger.Infof("event streams do not exist") + err := c.createStreams() + if err != nil { + return fmt.Errorf("could not create missing streams: %v", err) + } + } else { + c.syncStreamDbResources() + desiredStreams := c.generateFabricEventStream() + if reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { + c.updateStreams(desiredStreams) + } + } + + return nil +} + +func (c *Cluster) updateStreams(newEventStreams *zalandov1alpha1.FabricEventStream) error { + c.setProcessName("updating event streams") + + _, err := c.KubeClient.FabricEventStreamsGetter.FabricEventStreams(c.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("could not update event stream custom resource: %v", err) + } + + return nil +} diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go new file mode 100644 index 000000000..aebd29ff6 --- /dev/null +++ b/pkg/cluster/streams_test.go @@ -0,0 +1,105 @@ +package cluster + +import ( + "reflect" + + "context" + "testing" + + "github.com/stretchr/testify/assert" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + fakezalandov1alpha1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func newFakeK8sStreamClient() (k8sutil.KubernetesClient, *fake.Clientset) { + zalandoClientSet := fakezalandov1alpha1.NewSimpleClientset() + clientSet := fake.NewSimpleClientset() + + return k8sutil.KubernetesClient{ + FabricEventStreamsGetter: zalandoClientSet.ZalandoV1alpha1(), + }, clientSet +} + +func TestGenerateFabricEventStream(t *testing.T) { + client, _ := newFakeK8sStreamClient() + clusterName := "acid-test-cluster" + namespace := "default" + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + Spec: acidv1.PostgresSpec{ + Databases: map[string]string{ + "foo": "foo_user", + }, + Streams: []acidv1.Stream{ + { + Type: "nakadi", + Database: "foo", + Tables: map[string]string{ + "bar": "stream_type_a", + }, + BatchSize: uint32(100), + User: "foo_user", + }, + { + Type: "wal", + Database: "foo", + Tables: map[string]string{ + "bar": "stream_type_a", + }, + BatchSize: uint32(100), + User: "zalando", + }, + { + Type: "sqs", + Database: "foo", + SqsArn: "arn:aws:sqs:eu-central-1:111122223333", + QueueName: "foo-queue", + User: "foo_user", + }, + }, + Users: map[string]acidv1.UserFlags{ + "foo_user": {}, + "zalando": {}, + }, + Volume: acidv1.Volume{ + Size: "1Gi", + }, + }, + } + + var cluster = New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + PodRoleLabel: "spilo-role", + }, + }, + }, client, pg, logger, eventRecorder) + + cluster.syncStreams() + + streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), cluster.Name+constants.FESsuffix, metav1.GetOptions{}) + assert.NoError(t, err) + + result := cluster.generateFabricEventStream() + if !reflect.DeepEqual(result, streamCRD) { + t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result) + } +} diff --git a/pkg/generated/clientset/versioned/clientset.go b/pkg/generated/clientset/versioned/clientset.go index ab4a88735..73a3863db 100644 --- a/pkg/generated/clientset/versioned/clientset.go +++ b/pkg/generated/clientset/versioned/clientset.go @@ -28,6 +28,7 @@ import ( "fmt" acidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1" + zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1" discovery "k8s.io/client-go/discovery" rest "k8s.io/client-go/rest" flowcontrol "k8s.io/client-go/util/flowcontrol" @@ -36,13 +37,15 @@ import ( type Interface interface { Discovery() discovery.DiscoveryInterface AcidV1() acidv1.AcidV1Interface + ZalandoV1alpha1() zalandov1alpha1.ZalandoV1alpha1Interface } // Clientset contains the clients for groups. Each group has exactly one // version included in a Clientset. type Clientset struct { *discovery.DiscoveryClient - acidV1 *acidv1.AcidV1Client + acidV1 *acidv1.AcidV1Client + zalandoV1alpha1 *zalandov1alpha1.ZalandoV1alpha1Client } // AcidV1 retrieves the AcidV1Client @@ -50,6 +53,11 @@ func (c *Clientset) AcidV1() acidv1.AcidV1Interface { return c.acidV1 } +// ZalandoV1alpha1 retrieves the ZalandoV1alpha1Client +func (c *Clientset) ZalandoV1alpha1() zalandov1alpha1.ZalandoV1alpha1Interface { + return c.zalandoV1alpha1 +} + // Discovery retrieves the DiscoveryClient func (c *Clientset) Discovery() discovery.DiscoveryInterface { if c == nil { @@ -72,6 +80,7 @@ func NewForConfig(c *rest.Config) (*Clientset, error) { var cs Clientset var err error cs.acidV1, err = acidv1.NewForConfig(&configShallowCopy) + cs.zalandoV1alpha1, err = zalandov1alpha1.NewForConfig(&configShallowCopy) if err != nil { return nil, err } @@ -88,6 +97,7 @@ func NewForConfig(c *rest.Config) (*Clientset, error) { func NewForConfigOrDie(c *rest.Config) *Clientset { var cs Clientset cs.acidV1 = acidv1.NewForConfigOrDie(c) + cs.zalandoV1alpha1 = zalandov1alpha1.NewForConfigOrDie(c) cs.DiscoveryClient = discovery.NewDiscoveryClientForConfigOrDie(c) return &cs @@ -97,6 +107,7 @@ func NewForConfigOrDie(c *rest.Config) *Clientset { func New(c rest.Interface) *Clientset { var cs Clientset cs.acidV1 = acidv1.New(c) + cs.zalandoV1alpha1 = zalandov1alpha1.New(c) cs.DiscoveryClient = discovery.NewDiscoveryClient(c) return &cs diff --git a/pkg/generated/clientset/versioned/fake/clientset_generated.go b/pkg/generated/clientset/versioned/fake/clientset_generated.go index 6ae5db2d3..3e38777fc 100644 --- a/pkg/generated/clientset/versioned/fake/clientset_generated.go +++ b/pkg/generated/clientset/versioned/fake/clientset_generated.go @@ -28,6 +28,8 @@ import ( clientset "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" acidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1" fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake" + zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1" + fakezalandov1alpha1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" @@ -86,3 +88,8 @@ var _ clientset.Interface = &Clientset{} func (c *Clientset) AcidV1() acidv1.AcidV1Interface { return &fakeacidv1.FakeAcidV1{Fake: &c.Fake} } + +// ZalandoV1alpha1 retrieves the ZalandoV1alpha1Client +func (c *Clientset) ZalandoV1alpha1() zalandov1alpha1.ZalandoV1alpha1Interface { + return &fakezalandov1alpha1.FakeZalandoV1alpha1{Fake: &c.Fake} +} diff --git a/pkg/generated/clientset/versioned/fake/register.go b/pkg/generated/clientset/versioned/fake/register.go index c4d383aab..b022f3e76 100644 --- a/pkg/generated/clientset/versioned/fake/register.go +++ b/pkg/generated/clientset/versioned/fake/register.go @@ -26,6 +26,7 @@ package fake import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -38,6 +39,7 @@ var codecs = serializer.NewCodecFactory(scheme) var localSchemeBuilder = runtime.SchemeBuilder{ acidv1.AddToScheme, + zalandov1alpha1.AddToScheme, } // AddToScheme adds all types of this clientset into the given scheme. This allows composition diff --git a/pkg/generated/clientset/versioned/scheme/register.go b/pkg/generated/clientset/versioned/scheme/register.go index 8be969eb5..5fc5886ea 100644 --- a/pkg/generated/clientset/versioned/scheme/register.go +++ b/pkg/generated/clientset/versioned/scheme/register.go @@ -26,6 +26,7 @@ package scheme import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -38,6 +39,7 @@ var Codecs = serializer.NewCodecFactory(Scheme) var ParameterCodec = runtime.NewParameterCodec(Scheme) var localSchemeBuilder = runtime.SchemeBuilder{ acidv1.AddToScheme, + zalandov1alpha1.AddToScheme, } // AddToScheme adds all types of this clientset into the given scheme. This allows composition diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/doc.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/doc.go new file mode 100644 index 000000000..a9896a348 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/doc.go @@ -0,0 +1,26 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// This package has the automatically generated typed clients. +package v1alpha1 diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fabriceventstream.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fabriceventstream.go new file mode 100644 index 000000000..ec93fe65a --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fabriceventstream.go @@ -0,0 +1,184 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" + scheme "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// FabricEventStreamsGetter has a method to return a FabricEventStreamInterface. +// A group's client should implement this interface. +type FabricEventStreamsGetter interface { + FabricEventStreams(namespace string) FabricEventStreamInterface +} + +// FabricEventStreamInterface has methods to work with FabricEventStream resources. +type FabricEventStreamInterface interface { + Create(ctx context.Context, fabricEventStream *v1alpha1.FabricEventStream, opts v1.CreateOptions) (*v1alpha1.FabricEventStream, error) + Update(ctx context.Context, fabricEventStream *v1alpha1.FabricEventStream, opts v1.UpdateOptions) (*v1alpha1.FabricEventStream, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.FabricEventStream, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.FabricEventStreamList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.FabricEventStream, err error) + FabricEventStreamExpansion +} + +// fabricEventStreams implements FabricEventStreamInterface +type fabricEventStreams struct { + client rest.Interface + ns string +} + +// newFabricEventStreams returns a FabricEventStreams +func newFabricEventStreams(c *ZalandoV1alpha1Client, namespace string) *fabricEventStreams { + return &fabricEventStreams{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the fabricEventStream, and returns the corresponding fabricEventStream object, and an error if there is any. +func (c *fabricEventStreams) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.FabricEventStream, err error) { + result = &v1alpha1.FabricEventStream{} + err = c.client.Get(). + Namespace(c.ns). + Resource("fabriceventstreams"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of FabricEventStreams that match those selectors. +func (c *fabricEventStreams) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.FabricEventStreamList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.FabricEventStreamList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("fabriceventstreams"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested fabricEventStreams. +func (c *fabricEventStreams) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("fabriceventstreams"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a fabricEventStream and creates it. Returns the server's representation of the fabricEventStream, and an error, if there is any. +func (c *fabricEventStreams) Create(ctx context.Context, fabricEventStream *v1alpha1.FabricEventStream, opts v1.CreateOptions) (result *v1alpha1.FabricEventStream, err error) { + result = &v1alpha1.FabricEventStream{} + err = c.client.Post(). + Namespace(c.ns). + Resource("fabriceventstreams"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(fabricEventStream). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a fabricEventStream and updates it. Returns the server's representation of the fabricEventStream, and an error, if there is any. +func (c *fabricEventStreams) Update(ctx context.Context, fabricEventStream *v1alpha1.FabricEventStream, opts v1.UpdateOptions) (result *v1alpha1.FabricEventStream, err error) { + result = &v1alpha1.FabricEventStream{} + err = c.client.Put(). + Namespace(c.ns). + Resource("fabriceventstreams"). + Name(fabricEventStream.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(fabricEventStream). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the fabricEventStream and deletes it. Returns an error if one occurs. +func (c *fabricEventStreams) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("fabriceventstreams"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *fabricEventStreams) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("fabriceventstreams"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched fabricEventStream. +func (c *fabricEventStreams) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.FabricEventStream, err error) { + result = &v1alpha1.FabricEventStream{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("fabriceventstreams"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/doc.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/doc.go new file mode 100644 index 000000000..c5fd1c04b --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/doc.go @@ -0,0 +1,26 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// Package fake has the automatically generated clients. +package fake diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/fake_fabriceventstream.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/fake_fabriceventstream.go new file mode 100644 index 000000000..9fd2485de --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/fake_fabriceventstream.go @@ -0,0 +1,136 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeFabricEventStreams implements FabricEventStreamInterface +type FakeFabricEventStreams struct { + Fake *FakeZalandoV1alpha1 + ns string +} + +var fabriceventstreamsResource = schema.GroupVersionResource{Group: "zalando.org", Version: "v1alpha1", Resource: "fabriceventstreams"} + +var fabriceventstreamsKind = schema.GroupVersionKind{Group: "zalando.org", Version: "v1alpha1", Kind: "FabricEventStream"} + +// Get takes name of the fabricEventStream, and returns the corresponding fabricEventStream object, and an error if there is any. +func (c *FakeFabricEventStreams) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.FabricEventStream, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(fabriceventstreamsResource, c.ns, name), &v1alpha1.FabricEventStream{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.FabricEventStream), err +} + +// List takes label and field selectors, and returns the list of FabricEventStreams that match those selectors. +func (c *FakeFabricEventStreams) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.FabricEventStreamList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(fabriceventstreamsResource, fabriceventstreamsKind, c.ns, opts), &v1alpha1.FabricEventStreamList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.FabricEventStreamList{ListMeta: obj.(*v1alpha1.FabricEventStreamList).ListMeta} + for _, item := range obj.(*v1alpha1.FabricEventStreamList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested fabricEventStreams. +func (c *FakeFabricEventStreams) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(fabriceventstreamsResource, c.ns, opts)) + +} + +// Create takes the representation of a fabricEventStream and creates it. Returns the server's representation of the fabricEventStream, and an error, if there is any. +func (c *FakeFabricEventStreams) Create(ctx context.Context, fabricEventStream *v1alpha1.FabricEventStream, opts v1.CreateOptions) (result *v1alpha1.FabricEventStream, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(fabriceventstreamsResource, c.ns, fabricEventStream), &v1alpha1.FabricEventStream{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.FabricEventStream), err +} + +// Update takes the representation of a fabricEventStream and updates it. Returns the server's representation of the fabricEventStream, and an error, if there is any. +func (c *FakeFabricEventStreams) Update(ctx context.Context, fabricEventStream *v1alpha1.FabricEventStream, opts v1.UpdateOptions) (result *v1alpha1.FabricEventStream, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(fabriceventstreamsResource, c.ns, fabricEventStream), &v1alpha1.FabricEventStream{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.FabricEventStream), err +} + +// Delete takes name of the fabricEventStream and deletes it. Returns an error if one occurs. +func (c *FakeFabricEventStreams) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(fabriceventstreamsResource, c.ns, name), &v1alpha1.FabricEventStream{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeFabricEventStreams) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(fabriceventstreamsResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.FabricEventStreamList{}) + return err +} + +// Patch applies the patch and returns the patched fabricEventStream. +func (c *FakeFabricEventStreams) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.FabricEventStream, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(fabriceventstreamsResource, c.ns, name, pt, data, subresources...), &v1alpha1.FabricEventStream{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.FabricEventStream), err +} diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/fake_zalando.org_client.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/fake_zalando.org_client.go new file mode 100644 index 000000000..f8583cae9 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/fake/fake_zalando.org_client.go @@ -0,0 +1,46 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1" + rest "k8s.io/client-go/rest" + testing "k8s.io/client-go/testing" +) + +type FakeZalandoV1alpha1 struct { + *testing.Fake +} + +func (c *FakeZalandoV1alpha1) FabricEventStreams(namespace string) v1alpha1.FabricEventStreamInterface { + return &FakeFabricEventStreams{c, namespace} +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *FakeZalandoV1alpha1) RESTClient() rest.Interface { + var ret *rest.RESTClient + return ret +} diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/generated_expansion.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/generated_expansion.go new file mode 100644 index 000000000..89c4f1d5d --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/generated_expansion.go @@ -0,0 +1,27 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +type FabricEventStreamExpansion interface{} diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/zalando.org_client.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/zalando.org_client.go new file mode 100644 index 000000000..56436b2f4 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1/zalando.org_client.go @@ -0,0 +1,95 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" + "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" + rest "k8s.io/client-go/rest" +) + +type ZalandoV1alpha1Interface interface { + RESTClient() rest.Interface + FabricEventStreamsGetter +} + +// ZalandoV1alpha1Client is used to interact with features provided by the zalando.org group. +type ZalandoV1alpha1Client struct { + restClient rest.Interface +} + +func (c *ZalandoV1alpha1Client) FabricEventStreams(namespace string) FabricEventStreamInterface { + return newFabricEventStreams(c, namespace) +} + +// NewForConfig creates a new ZalandoV1alpha1Client for the given config. +func NewForConfig(c *rest.Config) (*ZalandoV1alpha1Client, error) { + config := *c + if err := setConfigDefaults(&config); err != nil { + return nil, err + } + client, err := rest.RESTClientFor(&config) + if err != nil { + return nil, err + } + return &ZalandoV1alpha1Client{client}, nil +} + +// NewForConfigOrDie creates a new ZalandoV1alpha1Client for the given config and +// panics if there is an error in the config. +func NewForConfigOrDie(c *rest.Config) *ZalandoV1alpha1Client { + client, err := NewForConfig(c) + if err != nil { + panic(err) + } + return client +} + +// New creates a new ZalandoV1alpha1Client for the given RESTClient. +func New(c rest.Interface) *ZalandoV1alpha1Client { + return &ZalandoV1alpha1Client{c} +} + +func setConfigDefaults(config *rest.Config) error { + gv := v1alpha1.SchemeGroupVersion + config.GroupVersion = &gv + config.APIPath = "/apis" + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + + if config.UserAgent == "" { + config.UserAgent = rest.DefaultKubernetesUserAgent() + } + + return nil +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *ZalandoV1alpha1Client) RESTClient() rest.Interface { + if c == nil { + return nil + } + return c.restClient +} diff --git a/pkg/generated/informers/externalversions/factory.go b/pkg/generated/informers/externalversions/factory.go index e4b1efdc6..b3938ec74 100644 --- a/pkg/generated/informers/externalversions/factory.go +++ b/pkg/generated/informers/externalversions/factory.go @@ -32,6 +32,7 @@ import ( versioned "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" acidzalando "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do" internalinterfaces "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/internalinterfaces" + zalandoorg "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/zalando.org" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -179,8 +180,13 @@ type SharedInformerFactory interface { WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Acid() acidzalando.Interface + Zalando() zalandoorg.Interface } func (f *sharedInformerFactory) Acid() acidzalando.Interface { return acidzalando.New(f, f.namespace, f.tweakListOptions) } + +func (f *sharedInformerFactory) Zalando() zalandoorg.Interface { + return zalandoorg.New(f, f.namespace, f.tweakListOptions) +} diff --git a/pkg/generated/informers/externalversions/generic.go b/pkg/generated/informers/externalversions/generic.go index 5fd693558..23515c041 100644 --- a/pkg/generated/informers/externalversions/generic.go +++ b/pkg/generated/informers/externalversions/generic.go @@ -28,6 +28,7 @@ import ( "fmt" v1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + v1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" schema "k8s.io/apimachinery/pkg/runtime/schema" cache "k8s.io/client-go/tools/cache" ) @@ -64,6 +65,10 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource case v1.SchemeGroupVersion.WithResource("postgresqls"): return &genericInformer{resource: resource.GroupResource(), informer: f.Acid().V1().Postgresqls().Informer()}, nil + // Group=zalando.org, Version=v1alpha1 + case v1alpha1.SchemeGroupVersion.WithResource("fabriceventstreams"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Zalando().V1alpha1().FabricEventStreams().Informer()}, nil + } return nil, fmt.Errorf("no informer found for %v", resource) diff --git a/pkg/generated/informers/externalversions/zalando.org/interface.go b/pkg/generated/informers/externalversions/zalando.org/interface.go new file mode 100644 index 000000000..c8cb52a09 --- /dev/null +++ b/pkg/generated/informers/externalversions/zalando.org/interface.go @@ -0,0 +1,52 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package zalando + +import ( + internalinterfaces "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/zalando.org/v1alpha1" +) + +// Interface provides access to each of this group's versions. +type Interface interface { + // V1alpha1 provides access to shared informers for resources in V1alpha1. + V1alpha1() v1alpha1.Interface +} + +type group struct { + factory internalinterfaces.SharedInformerFactory + namespace string + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// New returns a new Interface. +func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { + return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} +} + +// V1alpha1 returns a new v1alpha1.Interface. +func (g *group) V1alpha1() v1alpha1.Interface { + return v1alpha1.New(g.factory, g.namespace, g.tweakListOptions) +} diff --git a/pkg/generated/informers/externalversions/zalando.org/v1alpha1/fabriceventstream.go b/pkg/generated/informers/externalversions/zalando.org/v1alpha1/fabriceventstream.go new file mode 100644 index 000000000..7d29d716d --- /dev/null +++ b/pkg/generated/informers/externalversions/zalando.org/v1alpha1/fabriceventstream.go @@ -0,0 +1,96 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + zalandoorgv1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" + versioned "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" + internalinterfaces "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/zalando/postgres-operator/pkg/generated/listers/zalando.org/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// FabricEventStreamInformer provides access to a shared informer and lister for +// FabricEventStreams. +type FabricEventStreamInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.FabricEventStreamLister +} + +type fabricEventStreamInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewFabricEventStreamInformer constructs a new informer for FabricEventStream type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFabricEventStreamInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredFabricEventStreamInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredFabricEventStreamInformer constructs a new informer for FabricEventStream type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredFabricEventStreamInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ZalandoV1alpha1().FabricEventStreams(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ZalandoV1alpha1().FabricEventStreams(namespace).Watch(context.TODO(), options) + }, + }, + &zalandoorgv1alpha1.FabricEventStream{}, + resyncPeriod, + indexers, + ) +} + +func (f *fabricEventStreamInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredFabricEventStreamInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *fabricEventStreamInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&zalandoorgv1alpha1.FabricEventStream{}, f.defaultInformer) +} + +func (f *fabricEventStreamInformer) Lister() v1alpha1.FabricEventStreamLister { + return v1alpha1.NewFabricEventStreamLister(f.Informer().GetIndexer()) +} diff --git a/pkg/generated/informers/externalversions/zalando.org/v1alpha1/interface.go b/pkg/generated/informers/externalversions/zalando.org/v1alpha1/interface.go new file mode 100644 index 000000000..7815569a2 --- /dev/null +++ b/pkg/generated/informers/externalversions/zalando.org/v1alpha1/interface.go @@ -0,0 +1,51 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + internalinterfaces "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/internalinterfaces" +) + +// Interface provides access to all the informers in this group version. +type Interface interface { + // FabricEventStreams returns a FabricEventStreamInformer. + FabricEventStreams() FabricEventStreamInformer +} + +type version struct { + factory internalinterfaces.SharedInformerFactory + namespace string + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// New returns a new Interface. +func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { + return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} +} + +// FabricEventStreams returns a FabricEventStreamInformer. +func (v *version) FabricEventStreams() FabricEventStreamInformer { + return &fabricEventStreamInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} diff --git a/pkg/generated/listers/zalando.org/v1alpha1/expansion_generated.go b/pkg/generated/listers/zalando.org/v1alpha1/expansion_generated.go new file mode 100644 index 000000000..16438da8b --- /dev/null +++ b/pkg/generated/listers/zalando.org/v1alpha1/expansion_generated.go @@ -0,0 +1,33 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +// FabricEventStreamListerExpansion allows custom methods to be added to +// FabricEventStreamLister. +type FabricEventStreamListerExpansion interface{} + +// FabricEventStreamNamespaceListerExpansion allows custom methods to be added to +// FabricEventStreamNamespaceLister. +type FabricEventStreamNamespaceListerExpansion interface{} diff --git a/pkg/generated/listers/zalando.org/v1alpha1/fabriceventstream.go b/pkg/generated/listers/zalando.org/v1alpha1/fabriceventstream.go new file mode 100644 index 000000000..244d11bbf --- /dev/null +++ b/pkg/generated/listers/zalando.org/v1alpha1/fabriceventstream.go @@ -0,0 +1,105 @@ +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// FabricEventStreamLister helps list FabricEventStreams. +// All objects returned here must be treated as read-only. +type FabricEventStreamLister interface { + // List lists all FabricEventStreams in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.FabricEventStream, err error) + // FabricEventStreams returns an object that can list and get FabricEventStreams. + FabricEventStreams(namespace string) FabricEventStreamNamespaceLister + FabricEventStreamListerExpansion +} + +// fabricEventStreamLister implements the FabricEventStreamLister interface. +type fabricEventStreamLister struct { + indexer cache.Indexer +} + +// NewFabricEventStreamLister returns a new FabricEventStreamLister. +func NewFabricEventStreamLister(indexer cache.Indexer) FabricEventStreamLister { + return &fabricEventStreamLister{indexer: indexer} +} + +// List lists all FabricEventStreams in the indexer. +func (s *fabricEventStreamLister) List(selector labels.Selector) (ret []*v1alpha1.FabricEventStream, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.FabricEventStream)) + }) + return ret, err +} + +// FabricEventStreams returns an object that can list and get FabricEventStreams. +func (s *fabricEventStreamLister) FabricEventStreams(namespace string) FabricEventStreamNamespaceLister { + return fabricEventStreamNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// FabricEventStreamNamespaceLister helps list and get FabricEventStreams. +// All objects returned here must be treated as read-only. +type FabricEventStreamNamespaceLister interface { + // List lists all FabricEventStreams in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.FabricEventStream, err error) + // Get retrieves the FabricEventStream from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.FabricEventStream, error) + FabricEventStreamNamespaceListerExpansion +} + +// fabricEventStreamNamespaceLister implements the FabricEventStreamNamespaceLister +// interface. +type fabricEventStreamNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all FabricEventStreams in the indexer for a given namespace. +func (s fabricEventStreamNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.FabricEventStream, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.FabricEventStream)) + }) + return ret, err +} + +// Get retrieves the FabricEventStream from the indexer for a given namespace and name. +func (s fabricEventStreamNamespaceLister) Get(name string) (*v1alpha1.FabricEventStream, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("fabriceventstream"), name) + } + return obj.(*v1alpha1.FabricEventStream), nil +} diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go new file mode 100644 index 000000000..c8360c11b --- /dev/null +++ b/pkg/util/constants/streams.go @@ -0,0 +1,17 @@ +package constants + +// PostgreSQL specific constants +const ( + FESsuffix = "-event-streams" + EventStreamSourcePGType = "PostgresLogicalReplication" + EventStreamSourceSlotName = "fes" + EventStreamSourceAuthType = "DatabaseAuthenticationSecret" + EventStreamFlowPgNakadiType = "PostgresWalToNakadiDataEvent" + EventStreamFlowPgApiType = "PostgresWalToApiCallHomeEvent" + EventStreamFlowDataTypeColumn = "data_type" + EventStreamFlowDataOpColumn = "data_op" + EventStreamFlowMetadataColumn = "metadata" + EventStreamFlowDataColumn = "data" + EventStreamSinkNakadiType = "Nakadi" + EventStreamSinkSqsType = "Sqs" +) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index dd6ec1e8b..67192dd21 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -12,8 +12,9 @@ import ( clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1" apiacidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" - acidv1client "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" + zalandoclient "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" acidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1" + zalandov1alpha1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/zalando.org/v1alpha1" "github.com/zalando/postgres-operator/pkg/spec" apiappsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -58,9 +59,11 @@ type KubernetesClient struct { acidv1.OperatorConfigurationsGetter acidv1.PostgresTeamsGetter acidv1.PostgresqlsGetter + zalandov1alpha1.FabricEventStreamsGetter - RESTClient rest.Interface - AcidV1ClientSet *acidv1client.Clientset + RESTClient rest.Interface + AcidV1ClientSet *zalandoclient.Clientset + ZalandoV1Alpha1ClientSet *zalandoclient.Clientset } type mockSecret struct { @@ -158,14 +161,19 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.CustomResourceDefinitionsGetter = apiextClient.ApiextensionsV1() - kubeClient.AcidV1ClientSet = acidv1client.NewForConfigOrDie(cfg) + kubeClient.AcidV1ClientSet = zalandoclient.NewForConfigOrDie(cfg) if err != nil { return kubeClient, fmt.Errorf("could not create acid.zalan.do clientset: %v", err) } + kubeClient.ZalandoV1Alpha1ClientSet = zalandoclient.NewForConfigOrDie(cfg) + if err != nil { + return kubeClient, fmt.Errorf("could not create zalando.org clientset: %v", err) + } kubeClient.OperatorConfigurationsGetter = kubeClient.AcidV1ClientSet.AcidV1() kubeClient.PostgresTeamsGetter = kubeClient.AcidV1ClientSet.AcidV1() kubeClient.PostgresqlsGetter = kubeClient.AcidV1ClientSet.AcidV1() + kubeClient.FabricEventStreamsGetter = kubeClient.ZalandoV1Alpha1ClientSet.ZalandoV1alpha1() return kubeClient, nil }