diff --git a/cmd/main.go b/cmd/main.go index 7fadd611a..2372ad2e0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -19,6 +19,7 @@ var ( outOfCluster bool version string config spec.ControllerConfig + controllerID string ) func mustParseDuration(d string) time.Duration { diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index cf450ef94..d6dcfb4c8 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -4,6 +4,8 @@ metadata: name: acid-test-cluster # labels: # environment: demo +# annotations: +# "acid.zalan.do/controller": "second-operator" spec: dockerImage: registry.opensource.zalan.do/acid/spilo-cdp-12:1.6-p16 teamId: "acid" diff --git a/manifests/postgres-operator.yaml b/manifests/postgres-operator.yaml index fa8682809..1531f6bb8 100644 --- a/manifests/postgres-operator.yaml +++ b/manifests/postgres-operator.yaml @@ -1,7 +1,7 @@ apiVersion: apps/v1 kind: Deployment metadata: - name: postgres-operator + name: postgres-operator-two spec: replicas: 1 selector: @@ -35,3 +35,6 @@ spec: # In order to use the CRD OperatorConfiguration instead, uncomment these lines and comment out the two lines above # - name: POSTGRES_OPERATOR_CONFIGURATION_OBJECT # value: postgresql-operator-default-configuration + # Define a ID to isoalte controllers from each other + # - name: CONTROLLER_ID + # value: "second-operator" diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 831078f3e..ae009bd59 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -13,6 +13,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/apiserver" "github.com/zalando/postgres-operator/pkg/cluster" "github.com/zalando/postgres-operator/pkg/spec" @@ -36,6 +37,7 @@ type Controller struct { stopCh chan struct{} + controllerID string curWorkerID uint32 //initialized with 0 curWorkerCluster sync.Map clusterWorkers map[spec.NamespacedName]uint32 @@ -68,6 +70,7 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller { config: *controllerConfig, opConfig: &config.Config{}, logger: logger.WithField("pkg", "controller"), + controllerID: os.Getenv("CONTROLLER_ID"), curWorkerCluster: sync.Map{}, clusterWorkers: make(map[spec.NamespacedName]uint32), clusters: make(map[spec.NamespacedName]*cluster.Cluster), @@ -238,6 +241,7 @@ func (c *Controller) initRoleBinding() { func (c *Controller) initController() { c.initClients() + c.controllerID = os.Getenv("CONTROLLER_ID") if configObjectName := os.Getenv("POSTGRES_OPERATOR_CONFIGURATION_OBJECT"); configObjectName != "" { if err := c.createConfigurationCRD(c.opConfig.EnableCRDValidation); err != nil { @@ -411,3 +415,16 @@ func (c *Controller) getEffectiveNamespace(namespaceFromEnvironment, namespaceFr return namespace } + +// hasOwnership returns true if the controller is the "owner" of the postgresql. +// Whether it's owner is determined by the value of 'acid.zalan.do/controller' +// annotation. If the value matches the controllerID then it owns it, or if the +// controllerID is "" and there's no annotation set. +func (c *Controller) hasOwnership(postgresql *acidv1.Postgresql) bool { + if postgresql.Annotations != nil { + if owner, ok := postgresql.Annotations[constants.PostgresqlControllerAnnotationKey]; ok { + return owner == c.controllerID + } + } + return c.controllerID == "" +} diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 8e8f9ae85..346c08a5c 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -40,12 +40,27 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { // clusterListFunc obtains a list of all PostgreSQL clusters func (c *Controller) listClusters(options metav1.ListOptions) (*acidv1.PostgresqlList, error) { + var pgList acidv1.PostgresqlList + // TODO: use the SharedInformer cache instead of quering Kubernetes API directly. list, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.opConfig.WatchedNamespace).List(options) if err != nil { c.logger.Errorf("could not list postgresql objects: %v", err) } - return list, err + if c.controllerID != "" { + c.logger.Debugf("watch only clusters with controllerID %q", c.controllerID) + } + for _, pg := range list.Items { + if pg.Error != "" { + continue + } + if !c.hasOwnership(&pg) { + continue + } + pgList.Items = append(pgList.Items, pg) + } + + return &pgList, err } // clusterListAndSync lists all manifests and decides whether to run the sync or repair. @@ -455,41 +470,48 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. } func (c *Controller) postgresqlAdd(obj interface{}) { - pg, ok := obj.(*acidv1.Postgresql) - if !ok { - c.logger.Errorf("could not cast to postgresql spec") - return + pg := c.postgresqlCheck(obj) + if pg != nil { + // We will not get multiple Add events for the same cluster + c.queueClusterEvent(nil, pg, EventAdd) } - // We will not get multiple Add events for the same cluster - c.queueClusterEvent(nil, pg, EventAdd) + return } func (c *Controller) postgresqlUpdate(prev, cur interface{}) { - pgOld, ok := prev.(*acidv1.Postgresql) - if !ok { - c.logger.Errorf("could not cast to postgresql spec") - } - pgNew, ok := cur.(*acidv1.Postgresql) - if !ok { - c.logger.Errorf("could not cast to postgresql spec") - } - // Avoid the inifinite recursion for status updates - if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { - return + pgOld := c.postgresqlCheck(prev) + pgNew := c.postgresqlCheck(cur) + if pgOld != nil && pgNew != nil { + // Avoid the inifinite recursion for status updates + if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { + return + } + c.queueClusterEvent(pgOld, pgNew, EventUpdate) } - c.queueClusterEvent(pgOld, pgNew, EventUpdate) + return } func (c *Controller) postgresqlDelete(obj interface{}) { + pg := c.postgresqlCheck(obj) + if pg != nil { + c.queueClusterEvent(pg, nil, EventDelete) + } + + return +} + +func (c *Controller) postgresqlCheck(obj interface{}) *acidv1.Postgresql { pg, ok := obj.(*acidv1.Postgresql) if !ok { c.logger.Errorf("could not cast to postgresql spec") - return + return nil } - - c.queueClusterEvent(pg, nil, EventDelete) + if !c.hasOwnership(pg) { + return nil + } + return pg } /* diff --git a/pkg/util/constants/annotations.go b/pkg/util/constants/annotations.go index 0b93fc2e1..fc5a84fa5 100644 --- a/pkg/util/constants/annotations.go +++ b/pkg/util/constants/annotations.go @@ -7,4 +7,5 @@ const ( ElbTimeoutAnnotationValue = "3600" KubeIAmAnnotation = "iam.amazonaws.com/role" VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" + PostgresqlControllerAnnotationKey = "acid.zalan.do/controller" )