From 0181a1b5b1e0f454ccb05b15258171c4811b0da8 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Tue, 24 Jul 2018 11:21:45 +0200 Subject: [PATCH] Introduce a repair scan to fix failing clusters (#304) A repair is a sync scan that acts only on those clusters that indicate that the last add, update or sync operation on them has failed. It is supposed to kick in more frequently than the repair scan. The repair scan still remains to be useful to fix the consequences of external actions (i.e. someone deletes a postgres-related service by mistake) unbeknownst to the operator. The repair scan is controlled by the new repair_period parameter in the operator configuration. It has to be at least 2 times more frequent than a sync scan to have any effect (a normal sync scan will update both last synced and last repaired attributes of the controller, since repair is just a sync underneath). A repair scan could be queued for a cluster that is already being synced if the sync period exceeds the interval between repairs. In that case a repair event will be discarded once the corresponding worker finds out that the cluster is not failing anymore. Review by @zerg-junior --- docs/administrator.md | 9 +++ docs/reference/operator_parameters.md | 5 +- ...gresql-operator-default-configuration.yaml | 4 +- pkg/cluster/cluster.go | 7 ++ pkg/controller/controller.go | 5 +- pkg/controller/operator_config.go | 1 + pkg/controller/postgresql.go | 70 +++++++++++++++---- pkg/spec/postgresql.go | 10 +++ pkg/spec/types.go | 1 + pkg/util/config/config.go | 3 +- pkg/util/config/crd_config.go | 1 + 11 files changed, 97 insertions(+), 19 deletions(-) diff --git a/docs/administrator.md b/docs/administrator.md index 5fbae8fe4..a7dff68ef 100644 --- a/docs/administrator.md +++ b/docs/administrator.md @@ -199,3 +199,12 @@ cluster manifest. In the case any of these variables are omitted from the manifest, the operator configmap's settings `enable_master_load_balancer` and `enable_replica_load_balancer` apply. Note that the operator settings affect all Postgresql services running in a namespace watched by the operator. + +## Running periodic 'autorepair' scans of Kubernetes objects + +The Postgres operator periodically scans all Kubernetes objects belonging to +each cluster and repairs all discrepancies between them and the definitions +generated from the current cluster manifest. There are two types of scans: a +`sync scan`, running every `resync_period` seconds for every cluster, and the +`repair scan`, coming every `repair_period` only for those clusters that didn't +report success as a result of the last operation applied to them. diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index fd8e797b3..dce1df4cc 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -80,7 +80,10 @@ Those are top-level keys, containing both leaf keys and groups. are applied. The default is `-1`. * **resync_period** - period between consecutive sync requests. The default is `5m`. + period between consecutive sync requests. The default is `30m`. + +* **repair_period** + period between consecutive repair requests. The default is `5m`. ## Postgres users diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index 7dcf75091..05fa935e9 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -8,7 +8,9 @@ configuration: workers: 4 min_instances: -1 max_instances: -1 - resync_period: 5m + resync_period: 30m + repair_period: 5m + #sidecar_docker_images: # example: "exampleimage:exampletag" users: diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index f1979ab8a..2548ebc22 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -630,6 +630,13 @@ func (c *Cluster) Delete() { } } +func (c *Cluster) NeedsRepair() (bool, spec.PostgresStatus) { + c.specMu.RLock() + defer c.specMu.RUnlock() + return !c.Status.Success(), c.Status + +} + // ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue. func (c *Cluster) ReceivePodEvent(event spec.PodEvent) { if err := c.podEventsQueue.Add(event); err != nil { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 7d1a6ed2f..6615be677 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -48,8 +48,9 @@ type Controller struct { nodesInformer cache.SharedIndexInformer podCh chan spec.PodEvent - clusterEventQueues []*cache.FIFO // [workerID]Queue - lastClusterSyncTime int64 + clusterEventQueues []*cache.FIFO // [workerID]Queue + lastClusterSyncTime int64 + lastClusterRepairTime int64 workerLogs map[uint32]ringlog.RingLogger diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index fb448105b..1b7318d1e 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -43,6 +43,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *config.OperatorConfigur result.MinInstances = fromCRD.MinInstances result.MaxInstances = fromCRD.MaxInstances result.ResyncPeriod = time.Duration(fromCRD.ResyncPeriod) + result.RepairPeriod = time.Duration(fromCRD.RepairPeriod) result.Sidecars = fromCRD.Sidecars result.SuperUsername = fromCRD.PostgresUsersConfiguration.SuperUsername diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 9f42075ed..c037260ab 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -42,9 +42,14 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { // TODO: make a separate function to be called from InitSharedInformers // clusterListFunc obtains a list of all PostgreSQL clusters and runs sync when necessary +// NB: as this function is called directly by the informer, it needs to avoid acquiring locks +// on individual cluster structures. Therefore, it acts on the manifests obtained from Kubernetes +// and not on the internal state of the clusters. func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) { - var list spec.PostgresqlList - var activeClustersCnt, failedClustersCnt int + var ( + list spec.PostgresqlList + event spec.EventType + ) req := c.KubeClient.CRDREST. Get(). @@ -61,19 +66,41 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object c.logger.Warningf("could not unmarshal list of clusters: %v", err) } - timeFromPreviousSync := time.Now().Unix() - atomic.LoadInt64(&c.lastClusterSyncTime) - if timeFromPreviousSync < int64(c.opConfig.ResyncPeriod.Seconds()) { - c.logger.Infof("not running SYNC, previous sync happened %d seconds ago", timeFromPreviousSync) - return &list, err + currentTime := time.Now().Unix() + timeFromPreviousSync := currentTime - atomic.LoadInt64(&c.lastClusterSyncTime) + timeFromPreviousRepair := currentTime - atomic.LoadInt64(&c.lastClusterRepairTime) + if timeFromPreviousSync >= int64(c.opConfig.ResyncPeriod.Seconds()) { + event = spec.EventSync + } else if timeFromPreviousRepair >= int64(c.opConfig.RepairPeriod.Seconds()) { + event = spec.EventRepair } + if event != "" { + c.queueEvents(&list, event) + } else { + c.logger.Infof("not enough time passed since the last sync (%s seconds) or repair (%s seconds)", + timeFromPreviousSync, timeFromPreviousRepair) + } + return &list, err +} +// queueEvents queues a sync or repair event for every cluster with a valid manifest +func (c *Controller) queueEvents(list *spec.PostgresqlList, event spec.EventType) { + var activeClustersCnt, failedClustersCnt, clustersToRepair int for i, pg := range list.Items { if pg.Error != nil { failedClustersCnt++ continue } - c.queueClusterEvent(nil, &list.Items[i], spec.EventSync) activeClustersCnt++ + // check if that cluster needs repair + if event == spec.EventRepair { + if pg.Status.Success() { + continue + } else { + clustersToRepair++ + } + } + c.queueClusterEvent(nil, &list.Items[i], event) } if len(list.Items) > 0 { if failedClustersCnt > 0 && activeClustersCnt == 0 { @@ -83,13 +110,18 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object } else { c.logger.Infof("there are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt) } + if clustersToRepair > 0 { + c.logger.Infof("%d clusters are scheduled for a repair scan", clustersToRepair) + } } else { c.logger.Infof("no clusters running") } - - atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) - - return &list, err + if event == spec.EventRepair || event == spec.EventSync { + atomic.StoreInt64(&c.lastClusterRepairTime, time.Now().Unix()) + if event == spec.EventSync { + atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) + } + } } type crdDecoder struct { @@ -155,7 +187,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { lg := c.logger.WithField("worker", event.WorkerID) - if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { + if event.EventType == spec.EventAdd || event.EventType == spec.EventSync || event.EventType == spec.EventRepair { clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta) } else { clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta) @@ -171,6 +203,16 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { defer c.curWorkerCluster.Store(event.WorkerID, nil) + if event.EventType == spec.EventRepair { + runRepair, lastOperationStatus := cl.NeedsRepair() + if !runRepair { + lg.Debugf("Observed cluster status %s, repair is not required", lastOperationStatus) + return + } + lg.Debugf("Observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus) + event.EventType = spec.EventSync + } + if event.EventType == spec.EventAdd || event.EventType == spec.EventUpdate || event.EventType == spec.EventSync { // handle deprecated parameters by possibly assigning their values to the new ones. if event.OldSpec != nil { @@ -406,8 +448,8 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *spec.Po if eventType != spec.EventDelete { return } - - for _, evType := range []spec.EventType{spec.EventAdd, spec.EventSync, spec.EventUpdate} { + // A delete event discards all prior requests for that cluster. + for _, evType := range []spec.EventType{spec.EventAdd, spec.EventSync, spec.EventUpdate, spec.EventRepair} { obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid)) if err != nil { lg.Warningf("could not get event from the queue: %v", err) diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index 55713632a..4b973e503 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -335,3 +335,13 @@ func (pl *PostgresqlList) UnmarshalJSON(data []byte) error { return nil } + +func (status PostgresStatus) Success() bool { + return status != ClusterStatusAddFailed && + status != ClusterStatusUpdateFailed && + status != ClusterStatusSyncFailed +} + +func (status PostgresStatus) String() string { + return string(status) +} diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 32d709811..98490e1b0 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -30,6 +30,7 @@ const ( EventUpdate EventType = "UPDATE" EventDelete EventType = "DELETE" EventSync EventType = "SYNC" + EventRepair EventType = "REPAIR" fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" ) diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index e9017bfab..683f98a17 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -14,7 +14,8 @@ import ( type CRD struct { ReadyWaitInterval time.Duration `name:"ready_wait_interval" default:"4s"` ReadyWaitTimeout time.Duration `name:"ready_wait_timeout" default:"30s"` - ResyncPeriod time.Duration `name:"resync_period" default:"5m"` + ResyncPeriod time.Duration `name:"resync_period" default:"30m"` + RepairPeriod time.Duration `name:"repair_period" default:"5m"` } // Resources describes kubernetes resource specific configuration parameters diff --git a/pkg/util/config/crd_config.go b/pkg/util/config/crd_config.go index ee3c4b712..cd08ea14e 100644 --- a/pkg/util/config/crd_config.go +++ b/pkg/util/config/crd_config.go @@ -119,6 +119,7 @@ type OperatorConfigurationData struct { MinInstances int32 `json:"min_instances,omitempty"` MaxInstances int32 `json:"max_instances,omitempty"` ResyncPeriod spec.Duration `json:"resync_period,omitempty"` + RepairPeriod spec.Duration `json:"repair_period,omitempty"` Sidecars map[string]string `json:"sidecar_docker_images,omitempty"` PostgresUsersConfiguration PostgresUsersConfiguration `json:"users"` Kubernetes KubernetesMetaConfiguration `json:"kubernetes"`