235 lines
6.2 KiB
Go
235 lines
6.2 KiB
Go
package controller
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
|
|
"k8s.io/client-go/pkg/api"
|
|
"k8s.io/client-go/pkg/api/meta"
|
|
"k8s.io/client-go/pkg/fields"
|
|
"k8s.io/client-go/pkg/runtime"
|
|
"k8s.io/client-go/pkg/types"
|
|
"k8s.io/client-go/pkg/watch"
|
|
"k8s.io/client-go/tools/cache"
|
|
|
|
"github.com/zalando-incubator/postgres-operator/pkg/cluster"
|
|
"github.com/zalando-incubator/postgres-operator/pkg/spec"
|
|
"github.com/zalando-incubator/postgres-operator/pkg/util"
|
|
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
|
|
)
|
|
|
|
func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) {
|
|
c.logger.Info("Getting list of currently running clusters")
|
|
|
|
req := c.RestClient.Get().
|
|
RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)).
|
|
VersionedParams(&options, api.ParameterCodec).
|
|
FieldsSelectorParam(fields.Everything())
|
|
|
|
object, err := req.Do().Get()
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Can't get list of postgresql objects: %s", err)
|
|
}
|
|
|
|
objList, err := meta.ExtractList(object)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Can't extract list of postgresql objects: %s", err)
|
|
}
|
|
|
|
for _, obj := range objList {
|
|
pg, ok := obj.(*spec.Postgresql)
|
|
if !ok {
|
|
return nil, fmt.Errorf("Can't cast object to postgresql")
|
|
}
|
|
c.queueClusterEvent(nil, pg, spec.EventSync)
|
|
|
|
c.logger.Debugf("Sync of the '%s' cluster has been queued", util.NameFromMeta(pg.Metadata))
|
|
}
|
|
if len(objList) > 0 {
|
|
c.logger.Infof("There are %d clusters currently running", len(objList))
|
|
} else {
|
|
c.logger.Infof("No clusters running")
|
|
}
|
|
|
|
return object, err
|
|
}
|
|
|
|
func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) {
|
|
req := c.RestClient.Get().
|
|
RequestURI(fmt.Sprintf(constants.WatchClustersURITemplate, c.opConfig.Namespace)).
|
|
VersionedParams(&options, api.ParameterCodec).
|
|
FieldsSelectorParam(fields.Everything())
|
|
|
|
return req.Watch()
|
|
}
|
|
|
|
func (c *Controller) processEvent(obj interface{}) error {
|
|
var clusterName spec.NamespacedName
|
|
|
|
event, ok := obj.(spec.ClusterEvent)
|
|
if !ok {
|
|
return fmt.Errorf("Can't cast to ClusterEvent")
|
|
}
|
|
logger := c.logger.WithField("worker", event.WorkerID)
|
|
|
|
if event.EventType == spec.EventAdd || event.EventType == spec.EventSync {
|
|
clusterName = util.NameFromMeta(event.NewSpec.Metadata)
|
|
} else {
|
|
clusterName = util.NameFromMeta(event.OldSpec.Metadata)
|
|
}
|
|
|
|
c.clustersMu.RLock()
|
|
cl, clusterFound := c.clusters[clusterName]
|
|
stopCh := c.stopChs[clusterName]
|
|
c.clustersMu.RUnlock()
|
|
|
|
switch event.EventType {
|
|
case spec.EventAdd:
|
|
if clusterFound {
|
|
logger.Debugf("Cluster '%s' already exists", clusterName)
|
|
return nil
|
|
}
|
|
|
|
logger.Infof("Creation of the '%s' cluster started", clusterName)
|
|
|
|
stopCh := make(chan struct{})
|
|
cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger)
|
|
|
|
c.clustersMu.Lock()
|
|
c.clusters[clusterName] = cl
|
|
c.stopChs[clusterName] = stopCh
|
|
c.clustersMu.Unlock()
|
|
|
|
if err := cl.Create(stopCh); err != nil {
|
|
logger.Errorf("Can't create cluster: %s", err)
|
|
return nil
|
|
}
|
|
|
|
logger.Infof("Cluster '%s' has been created", clusterName)
|
|
case spec.EventUpdate:
|
|
logger.Infof("Update of the '%s' cluster started", clusterName)
|
|
|
|
if !clusterFound {
|
|
logger.Warnf("Cluster '%s' does not exist", clusterName)
|
|
return nil
|
|
}
|
|
if err := cl.Update(event.NewSpec); err != nil {
|
|
logger.Errorf("Can't update cluster: %s", err)
|
|
return nil
|
|
}
|
|
logger.Infof("Cluster '%s' has been updated", clusterName)
|
|
case spec.EventDelete:
|
|
logger.Infof("Deletion of the '%s' cluster started", clusterName)
|
|
if !clusterFound {
|
|
logger.Errorf("Unknown cluster: %s", clusterName)
|
|
return nil
|
|
}
|
|
|
|
if err := cl.Delete(); err != nil {
|
|
logger.Errorf("Can't delete cluster '%s': %s", clusterName, err)
|
|
return nil
|
|
}
|
|
close(c.stopChs[clusterName])
|
|
|
|
c.clustersMu.Lock()
|
|
delete(c.clusters, clusterName)
|
|
delete(c.stopChs, clusterName)
|
|
c.clustersMu.Unlock()
|
|
|
|
logger.Infof("Cluster '%s' has been deleted", clusterName)
|
|
case spec.EventSync:
|
|
logger.Infof("Syncing of the '%s' cluster started", clusterName)
|
|
|
|
// no race condition because a cluster is always processed by single worker
|
|
if !clusterFound {
|
|
cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger)
|
|
stopCh = make(chan struct{})
|
|
|
|
c.clustersMu.Lock()
|
|
c.clusters[clusterName] = cl
|
|
c.stopChs[clusterName] = stopCh
|
|
c.clustersMu.Unlock()
|
|
}
|
|
|
|
cl.SyncCluster(stopCh)
|
|
|
|
logger.Infof("Cluster '%s' has been synced", clusterName)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Controller) processClusterEventsQueue(idx int) {
|
|
for {
|
|
c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent))
|
|
}
|
|
}
|
|
|
|
func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) {
|
|
var (
|
|
uid types.UID
|
|
clusterName spec.NamespacedName
|
|
)
|
|
|
|
if old != nil {
|
|
uid = old.Metadata.GetUID()
|
|
clusterName = util.NameFromMeta(old.Metadata)
|
|
} else {
|
|
uid = new.Metadata.GetUID()
|
|
clusterName = util.NameFromMeta(new.Metadata)
|
|
}
|
|
workerId := c.clusterWorkerId(clusterName)
|
|
clusterEvent := spec.ClusterEvent{
|
|
EventType: eventType,
|
|
UID: uid,
|
|
OldSpec: old,
|
|
NewSpec: new,
|
|
WorkerID: workerId,
|
|
}
|
|
//TODO: if we delete cluster, discard all the previous events for the cluster
|
|
|
|
c.clusterEventQueues[workerId].Add(clusterEvent)
|
|
c.logger.WithField("worker", workerId).Infof("%s of the '%s' cluster has been queued for", eventType, clusterName)
|
|
}
|
|
|
|
func (c *Controller) postgresqlAdd(obj interface{}) {
|
|
pg, ok := obj.(*spec.Postgresql)
|
|
if !ok {
|
|
c.logger.Errorf("Can't cast to postgresql spec")
|
|
return
|
|
}
|
|
|
|
// We will not get multiple Add events for the same cluster
|
|
c.queueClusterEvent(nil, pg, spec.EventAdd)
|
|
}
|
|
|
|
func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
|
|
pgOld, ok := prev.(*spec.Postgresql)
|
|
if !ok {
|
|
c.logger.Errorf("Can't cast to postgresql spec")
|
|
}
|
|
pgNew, ok := cur.(*spec.Postgresql)
|
|
if !ok {
|
|
c.logger.Errorf("Can't cast to postgresql spec")
|
|
}
|
|
if pgOld.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion {
|
|
return
|
|
}
|
|
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
|
|
return
|
|
}
|
|
|
|
c.queueClusterEvent(pgOld, pgNew, spec.EventUpdate)
|
|
}
|
|
|
|
func (c *Controller) postgresqlDelete(obj interface{}) {
|
|
pg, ok := obj.(*spec.Postgresql)
|
|
if !ok {
|
|
c.logger.Errorf("Can't cast to postgresql spec")
|
|
return
|
|
}
|
|
|
|
c.queueClusterEvent(pg, nil, spec.EventDelete)
|
|
}
|