diff --git a/docs/administrator.md b/docs/administrator.md index 5fbae8fe4..a7dff68ef 100644 --- a/docs/administrator.md +++ b/docs/administrator.md @@ -199,3 +199,12 @@ cluster manifest. In the case any of these variables are omitted from the manifest, the operator configmap's settings `enable_master_load_balancer` and `enable_replica_load_balancer` apply. Note that the operator settings affect all Postgresql services running in a namespace watched by the operator. + +## Running periodic 'autorepair' scans of Kubernetes objects + +The Postgres operator periodically scans all Kubernetes objects belonging to +each cluster and repairs all discrepancies between them and the definitions +generated from the current cluster manifest. There are two types of scans: a +`sync scan`, running every `resync_period` seconds for every cluster, and the +`repair scan`, coming every `repair_period` only for those clusters that didn't +report success as a result of the last operation applied to them. diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index fd8e797b3..dce1df4cc 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -80,7 +80,10 @@ Those are top-level keys, containing both leaf keys and groups. are applied. The default is `-1`. * **resync_period** - period between consecutive sync requests. The default is `5m`. + period between consecutive sync requests. The default is `30m`. + +* **repair_period** + period between consecutive repair requests. The default is `5m`. ## Postgres users diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index 7dcf75091..05fa935e9 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -8,7 +8,9 @@ configuration: workers: 4 min_instances: -1 max_instances: -1 - resync_period: 5m + resync_period: 30m + repair_period: 5m + #sidecar_docker_images: # example: "exampleimage:exampletag" users: diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index f1979ab8a..2548ebc22 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -630,6 +630,13 @@ func (c *Cluster) Delete() { } } +func (c *Cluster) NeedsRepair() (bool, spec.PostgresStatus) { + c.specMu.RLock() + defer c.specMu.RUnlock() + return !c.Status.Success(), c.Status + +} + // ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue. func (c *Cluster) ReceivePodEvent(event spec.PodEvent) { if err := c.podEventsQueue.Add(event); err != nil { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 7d1a6ed2f..6615be677 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -48,8 +48,9 @@ type Controller struct { nodesInformer cache.SharedIndexInformer podCh chan spec.PodEvent - clusterEventQueues []*cache.FIFO // [workerID]Queue - lastClusterSyncTime int64 + clusterEventQueues []*cache.FIFO // [workerID]Queue + lastClusterSyncTime int64 + lastClusterRepairTime int64 workerLogs map[uint32]ringlog.RingLogger diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index fb448105b..1b7318d1e 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -43,6 +43,7 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *config.OperatorConfigur result.MinInstances = fromCRD.MinInstances result.MaxInstances = fromCRD.MaxInstances result.ResyncPeriod = time.Duration(fromCRD.ResyncPeriod) + result.RepairPeriod = time.Duration(fromCRD.RepairPeriod) result.Sidecars = fromCRD.Sidecars result.SuperUsername = fromCRD.PostgresUsersConfiguration.SuperUsername diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 9f42075ed..c037260ab 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -42,9 +42,14 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { // TODO: make a separate function to be called from InitSharedInformers // clusterListFunc obtains a list of all PostgreSQL clusters and runs sync when necessary +// NB: as this function is called directly by the informer, it needs to avoid acquiring locks +// on individual cluster structures. Therefore, it acts on the manifests obtained from Kubernetes +// and not on the internal state of the clusters. func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) { - var list spec.PostgresqlList - var activeClustersCnt, failedClustersCnt int + var ( + list spec.PostgresqlList + event spec.EventType + ) req := c.KubeClient.CRDREST. Get(). @@ -61,19 +66,41 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object c.logger.Warningf("could not unmarshal list of clusters: %v", err) } - timeFromPreviousSync := time.Now().Unix() - atomic.LoadInt64(&c.lastClusterSyncTime) - if timeFromPreviousSync < int64(c.opConfig.ResyncPeriod.Seconds()) { - c.logger.Infof("not running SYNC, previous sync happened %d seconds ago", timeFromPreviousSync) - return &list, err + currentTime := time.Now().Unix() + timeFromPreviousSync := currentTime - atomic.LoadInt64(&c.lastClusterSyncTime) + timeFromPreviousRepair := currentTime - atomic.LoadInt64(&c.lastClusterRepairTime) + if timeFromPreviousSync >= int64(c.opConfig.ResyncPeriod.Seconds()) { + event = spec.EventSync + } else if timeFromPreviousRepair >= int64(c.opConfig.RepairPeriod.Seconds()) { + event = spec.EventRepair } + if event != "" { + c.queueEvents(&list, event) + } else { + c.logger.Infof("not enough time passed since the last sync (%s seconds) or repair (%s seconds)", + timeFromPreviousSync, timeFromPreviousRepair) + } + return &list, err +} +// queueEvents queues a sync or repair event for every cluster with a valid manifest +func (c *Controller) queueEvents(list *spec.PostgresqlList, event spec.EventType) { + var activeClustersCnt, failedClustersCnt, clustersToRepair int for i, pg := range list.Items { if pg.Error != nil { failedClustersCnt++ continue } - c.queueClusterEvent(nil, &list.Items[i], spec.EventSync) activeClustersCnt++ + // check if that cluster needs repair + if event == spec.EventRepair { + if pg.Status.Success() { + continue + } else { + clustersToRepair++ + } + } + c.queueClusterEvent(nil, &list.Items[i], event) } if len(list.Items) > 0 { if failedClustersCnt > 0 && activeClustersCnt == 0 { @@ -83,13 +110,18 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object } else { c.logger.Infof("there are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt) } + if clustersToRepair > 0 { + c.logger.Infof("%d clusters are scheduled for a repair scan", clustersToRepair) + } } else { c.logger.Infof("no clusters running") } - - atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) - - return &list, err + if event == spec.EventRepair || event == spec.EventSync { + atomic.StoreInt64(&c.lastClusterRepairTime, time.Now().Unix()) + if event == spec.EventSync { + atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) + } + } } type crdDecoder struct { @@ -155,7 +187,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { lg := c.logger.WithField("worker", event.WorkerID) - if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { + if event.EventType == spec.EventAdd || event.EventType == spec.EventSync || event.EventType == spec.EventRepair { clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta) } else { clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta) @@ -171,6 +203,16 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { defer c.curWorkerCluster.Store(event.WorkerID, nil) + if event.EventType == spec.EventRepair { + runRepair, lastOperationStatus := cl.NeedsRepair() + if !runRepair { + lg.Debugf("Observed cluster status %s, repair is not required", lastOperationStatus) + return + } + lg.Debugf("Observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus) + event.EventType = spec.EventSync + } + if event.EventType == spec.EventAdd || event.EventType == spec.EventUpdate || event.EventType == spec.EventSync { // handle deprecated parameters by possibly assigning their values to the new ones. if event.OldSpec != nil { @@ -406,8 +448,8 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *spec.Po if eventType != spec.EventDelete { return } - - for _, evType := range []spec.EventType{spec.EventAdd, spec.EventSync, spec.EventUpdate} { + // A delete event discards all prior requests for that cluster. + for _, evType := range []spec.EventType{spec.EventAdd, spec.EventSync, spec.EventUpdate, spec.EventRepair} { obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid)) if err != nil { lg.Warningf("could not get event from the queue: %v", err) diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index 55713632a..4b973e503 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -335,3 +335,13 @@ func (pl *PostgresqlList) UnmarshalJSON(data []byte) error { return nil } + +func (status PostgresStatus) Success() bool { + return status != ClusterStatusAddFailed && + status != ClusterStatusUpdateFailed && + status != ClusterStatusSyncFailed +} + +func (status PostgresStatus) String() string { + return string(status) +} diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 32d709811..98490e1b0 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -30,6 +30,7 @@ const ( EventUpdate EventType = "UPDATE" EventDelete EventType = "DELETE" EventSync EventType = "SYNC" + EventRepair EventType = "REPAIR" fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" ) diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index e9017bfab..683f98a17 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -14,7 +14,8 @@ import ( type CRD struct { ReadyWaitInterval time.Duration `name:"ready_wait_interval" default:"4s"` ReadyWaitTimeout time.Duration `name:"ready_wait_timeout" default:"30s"` - ResyncPeriod time.Duration `name:"resync_period" default:"5m"` + ResyncPeriod time.Duration `name:"resync_period" default:"30m"` + RepairPeriod time.Duration `name:"repair_period" default:"5m"` } // Resources describes kubernetes resource specific configuration parameters diff --git a/pkg/util/config/crd_config.go b/pkg/util/config/crd_config.go index ee3c4b712..cd08ea14e 100644 --- a/pkg/util/config/crd_config.go +++ b/pkg/util/config/crd_config.go @@ -119,6 +119,7 @@ type OperatorConfigurationData struct { MinInstances int32 `json:"min_instances,omitempty"` MaxInstances int32 `json:"max_instances,omitempty"` ResyncPeriod spec.Duration `json:"resync_period,omitempty"` + RepairPeriod spec.Duration `json:"repair_period,omitempty"` Sidecars map[string]string `json:"sidecar_docker_images,omitempty"` PostgresUsersConfiguration PostgresUsersConfiguration `json:"users"` Kubernetes KubernetesMetaConfiguration `json:"kubernetes"`