define ownership between operator and postgres clusters

This commit is contained in:
Felix Kunde 2020-01-24 16:45:40 +01:00
parent 7fb163252c
commit fca089bd01
6 changed files with 69 additions and 23 deletions

View File

@ -19,6 +19,7 @@ var (
outOfCluster bool
version string
config spec.ControllerConfig
controllerID string
)
func mustParseDuration(d string) time.Duration {

View File

@ -4,6 +4,8 @@ metadata:
name: acid-test-cluster
# labels:
# environment: demo
# annotations:
# "acid.zalan.do/controller": "second-operator"
spec:
dockerImage: registry.opensource.zalan.do/acid/spilo-cdp-12:1.6-p16
teamId: "acid"

View File

@ -1,7 +1,7 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres-operator
name: postgres-operator-two
spec:
replicas: 1
selector:
@ -35,3 +35,6 @@ spec:
# In order to use the CRD OperatorConfiguration instead, uncomment these lines and comment out the two lines above
# - name: POSTGRES_OPERATOR_CONFIGURATION_OBJECT
# value: postgresql-operator-default-configuration
# Define a ID to isoalte controllers from each other
# - name: CONTROLLER_ID
# value: "second-operator"

View File

@ -13,6 +13,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/apiserver"
"github.com/zalando/postgres-operator/pkg/cluster"
"github.com/zalando/postgres-operator/pkg/spec"
@ -36,6 +37,7 @@ type Controller struct {
stopCh chan struct{}
controllerID string
curWorkerID uint32 //initialized with 0
curWorkerCluster sync.Map
clusterWorkers map[spec.NamespacedName]uint32
@ -68,6 +70,7 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller {
config: *controllerConfig,
opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"),
controllerID: os.Getenv("CONTROLLER_ID"),
curWorkerCluster: sync.Map{},
clusterWorkers: make(map[spec.NamespacedName]uint32),
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
@ -238,6 +241,7 @@ func (c *Controller) initRoleBinding() {
func (c *Controller) initController() {
c.initClients()
c.controllerID = os.Getenv("CONTROLLER_ID")
if configObjectName := os.Getenv("POSTGRES_OPERATOR_CONFIGURATION_OBJECT"); configObjectName != "" {
if err := c.createConfigurationCRD(c.opConfig.EnableCRDValidation); err != nil {
@ -411,3 +415,16 @@ func (c *Controller) getEffectiveNamespace(namespaceFromEnvironment, namespaceFr
return namespace
}
// hasOwnership returns true if the controller is the "owner" of the postgresql.
// Whether it's owner is determined by the value of 'acid.zalan.do/controller'
// annotation. If the value matches the controllerID then it owns it, or if the
// controllerID is "" and there's no annotation set.
func (c *Controller) hasOwnership(postgresql *acidv1.Postgresql) bool {
if postgresql.Annotations != nil {
if owner, ok := postgresql.Annotations[constants.PostgresqlControllerAnnotationKey]; ok {
return owner == c.controllerID
}
}
return c.controllerID == ""
}

View File

@ -40,12 +40,27 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
// clusterListFunc obtains a list of all PostgreSQL clusters
func (c *Controller) listClusters(options metav1.ListOptions) (*acidv1.PostgresqlList, error) {
var pgList acidv1.PostgresqlList
// TODO: use the SharedInformer cache instead of quering Kubernetes API directly.
list, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.opConfig.WatchedNamespace).List(options)
if err != nil {
c.logger.Errorf("could not list postgresql objects: %v", err)
}
return list, err
if c.controllerID != "" {
c.logger.Debugf("watch only clusters with controllerID %q", c.controllerID)
}
for _, pg := range list.Items {
if pg.Error != "" {
continue
}
if !c.hasOwnership(&pg) {
continue
}
pgList.Items = append(pgList.Items, pg)
}
return &pgList, err
}
// clusterListAndSync lists all manifests and decides whether to run the sync or repair.
@ -455,41 +470,48 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
}
func (c *Controller) postgresqlAdd(obj interface{}) {
pg, ok := obj.(*acidv1.Postgresql)
if !ok {
c.logger.Errorf("could not cast to postgresql spec")
return
pg := c.postgresqlCheck(obj)
if pg != nil {
// We will not get multiple Add events for the same cluster
c.queueClusterEvent(nil, pg, EventAdd)
}
// We will not get multiple Add events for the same cluster
c.queueClusterEvent(nil, pg, EventAdd)
return
}
func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
pgOld, ok := prev.(*acidv1.Postgresql)
if !ok {
c.logger.Errorf("could not cast to postgresql spec")
}
pgNew, ok := cur.(*acidv1.Postgresql)
if !ok {
c.logger.Errorf("could not cast to postgresql spec")
}
// Avoid the inifinite recursion for status updates
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
return
pgOld := c.postgresqlCheck(prev)
pgNew := c.postgresqlCheck(cur)
if pgOld != nil && pgNew != nil {
// Avoid the inifinite recursion for status updates
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
return
}
c.queueClusterEvent(pgOld, pgNew, EventUpdate)
}
c.queueClusterEvent(pgOld, pgNew, EventUpdate)
return
}
func (c *Controller) postgresqlDelete(obj interface{}) {
pg := c.postgresqlCheck(obj)
if pg != nil {
c.queueClusterEvent(pg, nil, EventDelete)
}
return
}
func (c *Controller) postgresqlCheck(obj interface{}) *acidv1.Postgresql {
pg, ok := obj.(*acidv1.Postgresql)
if !ok {
c.logger.Errorf("could not cast to postgresql spec")
return
return nil
}
c.queueClusterEvent(pg, nil, EventDelete)
if !c.hasOwnership(pg) {
return nil
}
return pg
}
/*

View File

@ -7,4 +7,5 @@ const (
ElbTimeoutAnnotationValue = "3600"
KubeIAmAnnotation = "iam.amazonaws.com/role"
VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by"
PostgresqlControllerAnnotationKey = "acid.zalan.do/controller"
)