Parallel cluster processing

Run operations concerning multiple clusters in parallel. Each cluster gets its
own worker in order to create, update, sync or delete clusters.  Each worker
acquires the lock on a cluster.  Subsequent operations on the same cluster
have to wait until the current one finishes.  There is a pool of parallel
workers, configurable with the `workers` parameter in the configmap and set by
default to 4. The cluster-related tasks  are assigned to the workers based on
a cluster name: the tasks for the same cluster will be always assigned to the
same worker. There is no blocking between workers, although there is a chance
that a single worker will become a bottleneck if too many clusters are
assigned to it; therefore, for large-scale deployments it might be necessary
to bump up workers from the default value.
This commit is contained in:
Murat Kabilov 2017-04-27 11:13:44 +02:00
parent a9c6c4861c
commit 2370659c69
13 changed files with 304 additions and 143 deletions

View File

@ -28,3 +28,4 @@ data:
resync_period_pod: 5m resync_period_pod: 5m
super_username: postgres super_username: postgres
teams_api_url: https://teams.example.com/api/ teams_api_url: https://teams.example.com/api/
workers: "4"

View File

@ -39,7 +39,7 @@ type Config struct {
RestClient *rest.RESTClient RestClient *rest.RESTClient
EtcdClient etcdclient.KeysAPI EtcdClient etcdclient.KeysAPI
TeamsAPIClient *teams.TeamsAPI TeamsAPIClient *teams.TeamsAPI
OpConfig *config.Config OpConfig config.Config
InfrastructureRoles map[string]spec.PgUser // inherited from the controller InfrastructureRoles map[string]spec.PgUser // inherited from the controller
} }
@ -60,15 +60,18 @@ type Cluster struct {
pgUsers map[string]spec.PgUser pgUsers map[string]spec.PgUser
podEvents chan spec.PodEvent podEvents chan spec.PodEvent
podSubscribers map[spec.NamespacedName]chan spec.PodEvent podSubscribers map[spec.NamespacedName]chan spec.PodEvent
podSubscribersMu sync.RWMutex
pgDb *sql.DB pgDb *sql.DB
mu sync.Mutex mu sync.Mutex
masterLess bool masterLess bool
podDispatcherRunning bool podDispatcherRunning bool
deleteOptions *v1.DeleteOptions
} }
func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Logger) *Cluster { func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name)
kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)}
orphanDependents := true
cluster := &Cluster{ cluster := &Cluster{
Config: cfg, Config: cfg,
@ -80,6 +83,7 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Logger) *Cluster {
kubeResources: kubeResources, kubeResources: kubeResources,
masterLess: false, masterLess: false,
podDispatcherRunning: false, podDispatcherRunning: false,
deleteOptions: &v1.DeleteOptions{OrphanDependents: &orphanDependents},
} }
return cluster return cluster
@ -89,12 +93,12 @@ func (c *Cluster) ClusterName() spec.NamespacedName {
return util.NameFromMeta(c.Metadata) return util.NameFromMeta(c.Metadata)
} }
func (c *Cluster) TeamName() string { func (c *Cluster) teamName() string {
// TODO: check Teams API for the actual name (in case the user passes an integer Id). // TODO: check Teams API for the actual name (in case the user passes an integer Id).
return c.Spec.TeamId return c.Spec.TeamId
} }
func (c *Cluster) SetStatus(status spec.PostgresStatus) { func (c *Cluster) setStatus(status spec.PostgresStatus) {
c.Status = status c.Status = status
b, err := json.Marshal(status) b, err := json.Marshal(status)
if err != nil { if err != nil {
@ -154,11 +158,25 @@ func (c *Cluster) etcdKeyExists(keyName string) (bool, error) {
} }
func (c *Cluster) Create(stopCh <-chan struct{}) error { func (c *Cluster) Create(stopCh <-chan struct{}) error {
c.mu.Lock()
defer c.mu.Unlock()
var err error
if !c.podDispatcherRunning { if !c.podDispatcherRunning {
go c.podEventsDispatcher(stopCh) go c.podEventsDispatcher(stopCh)
c.podDispatcherRunning = true c.podDispatcherRunning = true
} }
defer func() {
if err == nil {
c.setStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running?
} else {
c.setStatus(spec.ClusterStatusAddFailed)
}
}()
c.setStatus(spec.ClusterStatusCreating)
keyExist, err := c.etcdKeyExists(fmt.Sprintf("/%s/%s", c.OpConfig.EtcdScope, c.Metadata.Name)) keyExist, err := c.etcdKeyExists(fmt.Sprintf("/%s/%s", c.OpConfig.EtcdScope, c.Metadata.Name))
if err != nil { if err != nil {
c.logger.Warnf("Can't check etcd key: %s", err) c.logger.Warnf("Can't check etcd key: %s", err)
@ -180,38 +198,36 @@ func (c *Cluster) Create(stopCh <-chan struct{}) error {
c.logger.Infof("Service '%s' has been successfully created", util.NameFromMeta(service.ObjectMeta)) c.logger.Infof("Service '%s' has been successfully created", util.NameFromMeta(service.ObjectMeta))
} }
if err := c.initUsers(); err != nil { if err = c.initUsers(); err != nil {
return err return err
} else {
c.logger.Infof("User secrets have been initialized")
} }
c.logger.Infof("User secrets have been initialized")
if err := c.applySecrets(); err != nil { if err = c.applySecrets(); err != nil {
return fmt.Errorf("Can't create Secrets: %s", err) return fmt.Errorf("Can't create Secrets: %s", err)
} else {
c.logger.Infof("Secrets have been successfully created")
} }
c.logger.Infof("Secrets have been successfully created")
ss, err := c.createStatefulSet() ss, err := c.createStatefulSet()
if err != nil { if err != nil {
return fmt.Errorf("Can't create StatefulSet: %s", err) return fmt.Errorf("Can't create StatefulSet: %s", err)
} else {
c.logger.Infof("StatefulSet '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta))
} }
c.logger.Infof("StatefulSet '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta))
c.logger.Info("Waiting for cluster being ready") c.logger.Info("Waiting for cluster being ready")
if err := c.waitStatefulsetPodsReady(); err != nil { if err = c.waitStatefulsetPodsReady(); err != nil {
c.logger.Errorf("Failed to create cluster: %s", err) c.logger.Errorf("Failed to create cluster: %s", err)
return err return err
} }
c.logger.Infof("Pods are ready")
if !c.masterLess { if !c.masterLess {
if err := c.initDbConn(); err != nil { if err = c.initDbConn(); err != nil {
return fmt.Errorf("Can't init db connection: %s", err) return fmt.Errorf("Can't init db connection: %s", err)
} }
if err := c.createUsers(); err != nil { if err = c.createUsers(); err != nil {
return fmt.Errorf("Can't create users: %s", err) return fmt.Errorf("Can't create users: %s", err)
} else { } else {
c.logger.Infof("Users have been successfully created") c.logger.Infof("Users have been successfully created")
@ -323,13 +339,18 @@ func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resource
} }
func (c *Cluster) Update(newSpec *spec.Postgresql) error { func (c *Cluster) Update(newSpec *spec.Postgresql) error {
c.logger.Infof("Cluster update from version %s to %s", c.mu.Lock()
defer c.mu.Unlock()
c.setStatus(spec.ClusterStatusUpdating)
c.logger.Debugf("Cluster update from version %s to %s",
c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion) c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion)
newService := c.genService(newSpec.Spec.AllowedSourceRanges) newService := c.genService(newSpec.Spec.AllowedSourceRanges)
if match, reason := c.sameServiceWith(newService); !match { if match, reason := c.sameServiceWith(newService); !match {
c.logServiceChanges(c.Service, newService, true, reason) c.logServiceChanges(c.Service, newService, true, reason)
if err := c.updateService(newService); err != nil { if err := c.updateService(newService); err != nil {
c.setStatus(spec.ClusterStatusUpdateFailed)
return fmt.Errorf("Can't update Service: %s", err) return fmt.Errorf("Can't update Service: %s", err)
} else { } else {
c.logger.Infof("Service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta)) c.logger.Infof("Service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta))
@ -348,8 +369,10 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, reason) c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, reason)
//TODO: mind the case of updating allowedSourceRanges //TODO: mind the case of updating allowedSourceRanges
if err := c.updateStatefulSet(newStatefulSet); err != nil { if err := c.updateStatefulSet(newStatefulSet); err != nil {
c.setStatus(spec.ClusterStatusUpdateFailed)
return fmt.Errorf("Can't upate StatefulSet: %s", err) return fmt.Errorf("Can't upate StatefulSet: %s", err)
} }
//TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted
c.logger.Infof("StatefulSet '%s' has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) c.logger.Infof("StatefulSet '%s' has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta))
} }
@ -363,15 +386,20 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
c.logger.Infof("Rolling update is needed") c.logger.Infof("Rolling update is needed")
// TODO: wait for actual streaming to the replica // TODO: wait for actual streaming to the replica
if err := c.recreatePods(); err != nil { if err := c.recreatePods(); err != nil {
c.setStatus(spec.ClusterStatusUpdateFailed)
return fmt.Errorf("Can't recreate Pods: %s", err) return fmt.Errorf("Can't recreate Pods: %s", err)
} }
c.logger.Infof("Rolling update has been finished") c.logger.Infof("Rolling update has been finished")
} }
c.setStatus(spec.ClusterStatusRunning)
return nil return nil
} }
func (c *Cluster) Delete() error { func (c *Cluster) Delete() error {
c.mu.Lock()
defer c.mu.Unlock()
if err := c.deleteEndpoint(); err != nil { if err := c.deleteEndpoint(); err != nil {
c.logger.Errorf("Can't delete Endpoint: %s", err) c.logger.Errorf("Can't delete Endpoint: %s", err)
} }

View File

@ -26,8 +26,6 @@ func (c *Cluster) pgConnectionString() string {
func (c *Cluster) initDbConn() error { func (c *Cluster) initDbConn() error {
//TODO: concurrent safe? //TODO: concurrent safe?
if c.pgDb == nil { if c.pgDb == nil {
c.mu.Lock()
defer c.mu.Unlock()
if c.pgDb == nil { if c.pgDb == nil {
conn, err := sql.Open("postgres", c.pgConnectionString()) conn, err := sql.Open("postgres", c.pgConnectionString())
if err != nil { if err != nil {

View File

@ -72,7 +72,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error {
} }
for _, pvc := range pvcs { for _, pvc := range pvcs {
c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta))
if err := c.KubeClient.PersistentVolumeClaims(ns).Delete(pvc.Name, deleteOptions); err != nil { if err := c.KubeClient.PersistentVolumeClaims(ns).Delete(pvc.Name, c.deleteOptions); err != nil {
c.logger.Warningf("Can't delete PersistentVolumeClaim: %s", err) c.logger.Warningf("Can't delete PersistentVolumeClaim: %s", err)
} }
} }
@ -86,17 +86,10 @@ func (c *Cluster) deletePersistenVolumeClaims() error {
} }
func (c *Cluster) deletePod(podName spec.NamespacedName) error { func (c *Cluster) deletePod(podName spec.NamespacedName) error {
ch := make(chan spec.PodEvent) ch := c.registerPodSubscriber(podName)
if _, ok := c.podSubscribers[podName]; ok { defer c.unregisterPodSubscriber(podName)
panic("Pod '" + podName.String() + "' is already subscribed")
}
c.podSubscribers[podName] = ch
defer func() {
close(ch)
delete(c.podSubscribers, podName)
}()
if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, deleteOptions); err != nil { if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil {
return err return err
} }
@ -108,6 +101,9 @@ func (c *Cluster) deletePod(podName spec.NamespacedName) error {
} }
func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) {
c.podSubscribersMu.Lock()
defer c.podSubscribersMu.Unlock()
if _, ok := c.podSubscribers[podName]; !ok { if _, ok := c.podSubscribers[podName]; !ok {
panic("Subscriber for Pod '" + podName.String() + "' is not found") panic("Subscriber for Pod '" + podName.String() + "' is not found")
} }
@ -117,26 +113,25 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) {
} }
func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.PodEvent { func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.PodEvent {
c.podSubscribersMu.Lock()
defer c.podSubscribersMu.Unlock()
ch := make(chan spec.PodEvent) ch := make(chan spec.PodEvent)
if _, ok := c.podSubscribers[podName]; ok { if _, ok := c.podSubscribers[podName]; ok {
panic("Pod '" + podName.String() + "' is already subscribed") panic("Pod '" + podName.String() + "' is already subscribed")
} }
c.podSubscribers[podName] = ch c.podSubscribers[podName] = ch
return ch return ch
} }
func (c *Cluster) recreatePod(pod v1.Pod) error { func (c *Cluster) recreatePod(pod v1.Pod) error {
podName := util.NameFromMeta(pod.ObjectMeta) podName := util.NameFromMeta(pod.ObjectMeta)
orphanDependents := false
deleteOptions := &v1.DeleteOptions{
OrphanDependents: &orphanDependents,
}
ch := c.registerPodSubscriber(podName) ch := c.registerPodSubscriber(podName)
defer c.unregisterPodSubscriber(podName) defer c.unregisterPodSubscriber(podName)
if err := c.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil { if err := c.KubeClient.Pods(pod.Namespace).Delete(pod.Name, c.deleteOptions); err != nil {
return fmt.Errorf("Can't delete Pod: %s", err) return fmt.Errorf("Can't delete Pod: %s", err)
} }
@ -156,10 +151,11 @@ func (c *Cluster) podEventsDispatcher(stopCh <-chan struct{}) {
for { for {
select { select {
case event := <-c.podEvents: case event := <-c.podEvents:
if subscriber, ok := c.podSubscribers[event.PodName]; ok { c.podSubscribersMu.RLock()
subscriber, ok := c.podSubscribers[event.PodName]
c.podSubscribersMu.RUnlock()
if ok {
go func() { subscriber <- event }() //TODO: is it a right way to do nonblocking send to the channel? go func() { subscriber <- event }() //TODO: is it a right way to do nonblocking send to the channel?
} else {
c.logger.Debugf("Skipping event for an unwatched Pod '%s'", event.PodName)
} }
case <-stopCh: case <-stopCh:
return return
@ -183,7 +179,7 @@ func (c *Cluster) recreatePods() error {
var masterPod v1.Pod var masterPod v1.Pod
for _, pod := range pods.Items { for _, pod := range pods.Items {
role := c.PodSpiloRole(&pod) role := c.podSpiloRole(&pod)
if role == constants.PodRoleMaster { if role == constants.PodRoleMaster {
masterPod = pod masterPod = pod

View File

@ -11,12 +11,7 @@ import (
"github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil"
) )
var ( func (c *Cluster) loadResources() error {
deleteOptions = &v1.DeleteOptions{OrphanDependents: &orphanDependents}
orphanDependents = false
)
func (c *Cluster) LoadResources() error {
ns := c.Metadata.Namespace ns := c.Metadata.Namespace
listOptions := v1.ListOptions{ listOptions := v1.ListOptions{
LabelSelector: c.labelsSet().String(), LabelSelector: c.labelsSet().String(),
@ -152,7 +147,7 @@ func (c *Cluster) deleteStatefulSet() error {
return fmt.Errorf("There is no StatefulSet in the cluster") return fmt.Errorf("There is no StatefulSet in the cluster")
} }
err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(c.Statefulset.Name, deleteOptions) err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(c.Statefulset.Name, c.deleteOptions)
if err != nil { if err != nil {
return err return err
} }
@ -217,7 +212,7 @@ func (c *Cluster) deleteService() error {
if c.Service == nil { if c.Service == nil {
return fmt.Errorf("There is no Service in the cluster") return fmt.Errorf("There is no Service in the cluster")
} }
err := c.KubeClient.Services(c.Service.Namespace).Delete(c.Service.Name, deleteOptions) err := c.KubeClient.Services(c.Service.Namespace).Delete(c.Service.Name, c.deleteOptions)
if err != nil { if err != nil {
return err return err
} }
@ -256,7 +251,7 @@ func (c *Cluster) deleteEndpoint() error {
if c.Endpoint == nil { if c.Endpoint == nil {
return fmt.Errorf("There is no Endpoint in the cluster") return fmt.Errorf("There is no Endpoint in the cluster")
} }
err := c.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, deleteOptions) err := c.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, c.deleteOptions)
if err != nil { if err != nil {
return err return err
} }
@ -300,7 +295,7 @@ func (c *Cluster) applySecrets() error {
func (c *Cluster) deleteSecret(secret *v1.Secret) error { func (c *Cluster) deleteSecret(secret *v1.Secret) error {
c.logger.Debugf("Deleting Secret '%s'", util.NameFromMeta(secret.ObjectMeta)) c.logger.Debugf("Deleting Secret '%s'", util.NameFromMeta(secret.ObjectMeta))
err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, deleteOptions) err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions)
if err != nil { if err != nil {
return err return err
} }

View File

@ -7,8 +7,14 @@ import (
) )
func (c *Cluster) SyncCluster(stopCh <-chan struct{}) { func (c *Cluster) SyncCluster(stopCh <-chan struct{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.loadResources()
if !c.podDispatcherRunning { if !c.podDispatcherRunning {
go c.podEventsDispatcher(stopCh) go c.podEventsDispatcher(stopCh)
c.podDispatcherRunning = true
} }
c.logger.Debugf("Syncing Secrets") c.logger.Debugf("Syncing Secrets")

View File

@ -110,7 +110,7 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent) error {
for { for {
select { select {
case podEvent := <-podEvents: case podEvent := <-podEvents:
role := c.PodSpiloRole(podEvent.CurPod) role := c.podSpiloRole(podEvent.CurPod)
// We cannot assume any role of the newly created pod. Normally, for a multi-pod cluster // We cannot assume any role of the newly created pod. Normally, for a multi-pod cluster
// we should observe the 'replica' value, but it could be that some pods are not allowed // we should observe the 'replica' value, but it could be that some pods are not allowed
// to promote, therefore, the new pod could be a master as well. // to promote, therefore, the new pod could be a master as well.
@ -127,7 +127,7 @@ func (c *Cluster) waitForPodDeletion(podEvents chan spec.PodEvent) error {
for { for {
select { select {
case podEvent := <-podEvents: case podEvent := <-podEvents:
if podEvent.EventType == spec.PodEventDelete { if podEvent.EventType == spec.EventDelete {
return nil return nil
} }
case <-time.After(c.OpConfig.PodDeletionWaitTimeout): case <-time.After(c.OpConfig.PodDeletionWaitTimeout):
@ -233,7 +233,7 @@ func (c *Cluster) dnsName() string {
return strings.ToLower(fmt.Sprintf( return strings.ToLower(fmt.Sprintf(
c.OpConfig.DNSNameFormat, c.OpConfig.DNSNameFormat,
c.Spec.ClusterName, c.Spec.ClusterName,
c.TeamName(), c.teamName(),
c.OpConfig.DbHostedZone)) c.OpConfig.DbHostedZone))
} }
@ -266,6 +266,6 @@ func (c *Cluster) deleteEtcdKey() error {
return nil return nil
} }
func (c *Cluster) PodSpiloRole(pod *v1.Pod) string { func (c *Cluster) podSpiloRole(pod *v1.Pod) string {
return pod.Labels[c.OpConfig.PodRoleLabel] return pod.Labels[c.OpConfig.PodRoleLabel]
} }

View File

@ -1,6 +1,7 @@
package controller package controller
import ( import (
"fmt"
"sync" "sync"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
@ -28,14 +29,17 @@ type Controller struct {
Config Config
opConfig *config.Config opConfig *config.Config
logger *logrus.Entry logger *logrus.Entry
clusters map[spec.NamespacedName]*cluster.Cluster
stopChMap map[spec.NamespacedName]chan struct{}
waitCluster sync.WaitGroup waitCluster sync.WaitGroup
clustersMu sync.RWMutex
clusters map[spec.NamespacedName]*cluster.Cluster
stopChs map[spec.NamespacedName]chan struct{}
postgresqlInformer cache.SharedIndexInformer postgresqlInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer
podCh chan spec.PodEvent
podCh chan spec.PodEvent clusterEventQueues []*cache.FIFO
} }
func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
@ -47,12 +51,12 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger) controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger)
return &Controller{ return &Controller{
Config: *controllerConfig, Config: *controllerConfig,
opConfig: operatorConfig, opConfig: operatorConfig,
logger: logger.WithField("pkg", "controller"), logger: logger.WithField("pkg", "controller"),
clusters: make(map[spec.NamespacedName]*cluster.Cluster), clusters: make(map[spec.NamespacedName]*cluster.Cluster),
stopChMap: make(map[spec.NamespacedName]chan struct{}), stopChs: make(map[spec.NamespacedName]chan struct{}),
podCh: make(chan spec.PodEvent), podCh: make(chan spec.PodEvent),
} }
} }
@ -64,6 +68,10 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
go c.runInformers(stopCh) go c.runInformers(stopCh)
for i := range c.clusterEventQueues {
go c.processClusterEventsQueue(i)
}
c.logger.Info("Started working in background") c.logger.Info("Started working in background")
} }
@ -117,6 +125,18 @@ func (c *Controller) initController() {
if err := c.initEtcdClient(c.opConfig.EtcdHost); err != nil { if err := c.initEtcdClient(c.opConfig.EtcdHost); err != nil {
c.logger.Fatalf("Can't get etcd client: %s", err) c.logger.Fatalf("Can't get etcd client: %s", err)
} }
c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers)
for i := range c.clusterEventQueues {
c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) {
e, ok := obj.(spec.ClusterEvent)
if !ok {
return "", fmt.Errorf("Can't cast to ClusterEvent")
}
return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil
})
}
} }
func (c *Controller) runInformers(stopCh <-chan struct{}) { func (c *Controller) runInformers(stopCh <-chan struct{}) {

View File

@ -65,7 +65,7 @@ func (c *Controller) podAdd(obj interface{}) {
ClusterName: c.PodClusterName(pod), ClusterName: c.PodClusterName(pod),
PodName: util.NameFromMeta(pod.ObjectMeta), PodName: util.NameFromMeta(pod.ObjectMeta),
CurPod: pod, CurPod: pod,
EventType: spec.PodEventAdd, EventType: spec.EventAdd,
} }
c.podCh <- podEvent c.podCh <- podEvent
@ -87,7 +87,7 @@ func (c *Controller) podUpdate(prev, cur interface{}) {
PodName: util.NameFromMeta(curPod.ObjectMeta), PodName: util.NameFromMeta(curPod.ObjectMeta),
PrevPod: prevPod, PrevPod: prevPod,
CurPod: curPod, CurPod: curPod,
EventType: spec.PodEventUpdate, EventType: spec.EventUpdate,
} }
c.podCh <- podEvent c.podCh <- podEvent
@ -103,7 +103,7 @@ func (c *Controller) podDelete(obj interface{}) {
ClusterName: c.PodClusterName(pod), ClusterName: c.PodClusterName(pod),
PodName: util.NameFromMeta(pod.ObjectMeta), PodName: util.NameFromMeta(pod.ObjectMeta),
CurPod: pod, CurPod: pod,
EventType: spec.PodEventDelete, EventType: spec.EventDelete,
} }
c.podCh <- podEvent c.podCh <- podEvent
@ -114,7 +114,11 @@ func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) {
for { for {
select { select {
case event := <-c.podCh: case event := <-c.podCh:
if subscriber, ok := c.clusters[event.ClusterName]; ok { c.clustersMu.RLock()
subscriber, ok := c.clusters[event.ClusterName]
c.clustersMu.RUnlock()
if ok {
c.logger.Debugf("Sending %s event of Pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) c.logger.Debugf("Sending %s event of Pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName)
go subscriber.ReceivePodEvent(event) go subscriber.ReceivePodEvent(event)
} }

View File

@ -8,7 +8,9 @@ import (
"k8s.io/client-go/pkg/api/meta" "k8s.io/client-go/pkg/api/meta"
"k8s.io/client-go/pkg/fields" "k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/runtime" "k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/types"
"k8s.io/client-go/pkg/watch" "k8s.io/client-go/pkg/watch"
"k8s.io/client-go/tools/cache"
"github.bus.zalan.do/acid/postgres-operator/pkg/cluster" "github.bus.zalan.do/acid/postgres-operator/pkg/cluster"
"github.bus.zalan.do/acid/postgres-operator/pkg/spec" "github.bus.zalan.do/acid/postgres-operator/pkg/spec"
@ -35,24 +37,17 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e
return nil, fmt.Errorf("Can't extract list of postgresql objects: %s", err) return nil, fmt.Errorf("Can't extract list of postgresql objects: %s", err)
} }
clusterConfig := c.makeClusterConfig()
for _, obj := range objList { for _, obj := range objList {
pg, ok := obj.(*spec.Postgresql) pg, ok := obj.(*spec.Postgresql)
if !ok { if !ok {
return nil, fmt.Errorf("Can't cast object to postgresql") return nil, fmt.Errorf("Can't cast object to postgresql")
} }
clusterName := util.NameFromMeta(pg.Metadata) c.queueClusterEvent(nil, pg, spec.EventSync)
cl := cluster.New(clusterConfig, *pg, c.logger.Logger)
stopCh := make(chan struct{}) c.logger.Debugf("Sync of the '%s' cluster has been queued", util.NameFromMeta(pg.Metadata))
c.stopChMap[clusterName] = stopCh
c.clusters[clusterName] = cl
cl.LoadResources()
cl.ListResources()
cl.SyncCluster(stopCh)
} }
if len(c.clusters) > 0 { if len(objList) > 0 {
c.logger.Infof("There are %d clusters currently running", len(c.clusters)) c.logger.Infof("There are %d clusters currently running", len(objList))
} else { } else {
c.logger.Infof("No clusters running") c.logger.Infof("No clusters running")
} }
@ -60,6 +55,135 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e
return object, err return object, err
} }
func (c *Controller) processEvent(obj interface{}) error {
var clusterName spec.NamespacedName
event, ok := obj.(spec.ClusterEvent)
if !ok {
return fmt.Errorf("Can't cast to ClusterEvent")
}
logger := c.logger.WithField("worker", event.WorkerID)
if event.EventType == spec.EventAdd || event.EventType == spec.EventSync {
clusterName = util.NameFromMeta(event.NewSpec.Metadata)
} else {
clusterName = util.NameFromMeta(event.OldSpec.Metadata)
}
c.clustersMu.RLock()
cl, clusterFound := c.clusters[clusterName]
stopCh := c.stopChs[clusterName]
c.clustersMu.RUnlock()
switch event.EventType {
case spec.EventAdd:
if clusterFound {
logger.Debugf("Cluster '%s' already exists", clusterName)
return nil
}
logger.Infof("Creation of the '%s' cluster started", clusterName)
stopCh := make(chan struct{})
cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger)
c.clustersMu.Lock()
c.clusters[clusterName] = cl
c.stopChs[clusterName] = stopCh
c.clustersMu.Unlock()
if err := cl.Create(stopCh); err != nil {
logger.Errorf("Can't create cluster: %s", err)
return nil
}
logger.Infof("Cluster '%s' has been created", clusterName)
case spec.EventUpdate:
logger.Infof("Update of the '%s' cluster started", clusterName)
if !clusterFound {
logger.Warnf("Cluster '%s' does not exist", clusterName)
return nil
}
if err := cl.Update(event.NewSpec); err != nil {
logger.Errorf("Can't update cluster: %s", err)
return nil
}
logger.Infof("Cluster '%s' has been updated", clusterName)
case spec.EventDelete:
logger.Infof("Deletion of the '%s' cluster started", clusterName)
if !clusterFound {
logger.Errorf("Unknown cluster: %s", clusterName)
return nil
}
if err := cl.Delete(); err != nil {
logger.Errorf("Can't delete cluster '%s': %s", clusterName, err)
return nil
}
close(c.stopChs[clusterName])
c.clustersMu.Lock()
delete(c.clusters, clusterName)
delete(c.stopChs, clusterName)
c.clustersMu.Unlock()
logger.Infof("Cluster '%s' has been deleted", clusterName)
case spec.EventSync:
logger.Infof("Syncing of the '%s' cluster started", clusterName)
// no race condition because a cluster is always processed by single worker
if !clusterFound {
cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger)
stopCh = make(chan struct{})
c.clustersMu.Lock()
c.clusters[clusterName] = cl
c.stopChs[clusterName] = stopCh
c.clustersMu.Unlock()
}
cl.SyncCluster(stopCh)
logger.Infof("Cluster '%s' has been synced", clusterName)
}
return nil
}
func (c *Controller) processClusterEventsQueue(idx int) {
for {
c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent))
}
}
func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) {
var (
uid types.UID
clusterName spec.NamespacedName
)
if old != nil {
uid = old.Metadata.GetUID()
clusterName = util.NameFromMeta(old.Metadata)
} else {
uid = new.Metadata.GetUID()
clusterName = util.NameFromMeta(new.Metadata)
}
workerId := c.clusterWorkerId(clusterName)
clusterEvent := spec.ClusterEvent{
EventType: eventType,
UID: uid,
OldSpec: old,
NewSpec: new,
WorkerID: workerId,
}
//TODO: if we delete cluster, discard all the previous events for the cluster
c.clusterEventQueues[workerId].Add(clusterEvent)
c.logger.WithField("worker", workerId).Infof("%s of the '%s' cluster has been queued for", eventType, clusterName)
}
func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) { func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) {
return c.RestClient.Get(). return c.RestClient.Get().
Prefix("watch"). Prefix("watch").
@ -77,34 +201,12 @@ func (c *Controller) postgresqlAdd(obj interface{}) {
return return
} }
clusterName := util.NameFromMeta(pg.Metadata) // We will not get multiple Add events for the same cluster
c.queueClusterEvent(nil, pg, spec.EventAdd)
_, ok = c.clusters[clusterName]
if ok {
c.logger.Infof("Cluster '%s' already exists", clusterName)
return
}
c.logger.Infof("Creation of a new Postgresql cluster '%s' started", clusterName)
cl := cluster.New(c.makeClusterConfig(), *pg, c.logger.Logger)
c.clusters[clusterName] = cl
stopCh := make(chan struct{})
c.stopChMap[clusterName] = stopCh
cl.SetStatus(spec.ClusterStatusCreating)
if err := cl.Create(stopCh); err != nil {
c.logger.Errorf("Can't create cluster: %s", err)
cl.SetStatus(spec.ClusterStatusAddFailed)
return
}
cl.SetStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running?
c.logger.Infof("Postgresql cluster '%s' has been created", clusterName)
} }
func (c *Controller) postgresqlUpdate(prev, cur interface{}) { func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
pgPrev, ok := prev.(*spec.Postgresql) pgOld, ok := prev.(*spec.Postgresql)
if !ok { if !ok {
c.logger.Errorf("Can't cast to postgresql spec") c.logger.Errorf("Can't cast to postgresql spec")
} }
@ -112,51 +214,22 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
if !ok { if !ok {
c.logger.Errorf("Can't cast to postgresql spec") c.logger.Errorf("Can't cast to postgresql spec")
} }
if pgOld.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion {
clusterName := util.NameFromMeta(pgNew.Metadata)
//TODO: Do not update cluster which is currently creating
if pgPrev.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion {
c.logger.Infof("Skipping update with no resource version change")
return return
} }
pgCluster := c.clusters[clusterName] // current if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
if reflect.DeepEqual(pgPrev.Spec, pgNew.Spec) {
c.logger.Infof("Skipping update with no spec change")
return return
} }
pgCluster.SetStatus(spec.ClusterStatusUpdating) c.queueClusterEvent(pgOld, pgNew, spec.EventUpdate)
if err := pgCluster.Update(pgNew); err != nil {
pgCluster.SetStatus(spec.ClusterStatusUpdateFailed)
c.logger.Errorf("Can't update cluster: %s", err)
} else {
c.logger.Infof("Cluster has been updated")
}
pgCluster.SetStatus(spec.ClusterStatusRunning)
} }
func (c *Controller) postgresqlDelete(obj interface{}) { func (c *Controller) postgresqlDelete(obj interface{}) {
pgCur, ok := obj.(*spec.Postgresql) pg, ok := obj.(*spec.Postgresql)
if !ok { if !ok {
c.logger.Errorf("Can't cast to postgresql spec") c.logger.Errorf("Can't cast to postgresql spec")
return return
} }
clusterName := util.NameFromMeta(pgCur.Metadata)
pgCluster, ok := c.clusters[clusterName]
if !ok {
c.logger.Errorf("Unknown cluster: %s", clusterName)
return
}
c.logger.Infof("Starting deletion of the '%s' cluster", util.NameFromMeta(pgCur.Metadata)) c.queueClusterEvent(pg, nil, spec.EventDelete)
if err := pgCluster.Delete(); err != nil {
c.logger.Errorf("Can't delete cluster '%s': %s", clusterName, err)
return
}
close(c.stopChMap[clusterName])
delete(c.clusters, clusterName)
c.logger.Infof("Cluster '%s' has been successfully deleted", clusterName)
} }

View File

@ -2,24 +2,31 @@ package controller
import ( import (
"fmt" "fmt"
"hash/crc32"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1" extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"github.bus.zalan.do/acid/postgres-operator/pkg/cluster" "github.bus.zalan.do/acid/postgres-operator/pkg/cluster"
"github.bus.zalan.do/acid/postgres-operator/pkg/spec" "github.bus.zalan.do/acid/postgres-operator/pkg/spec"
"github.bus.zalan.do/acid/postgres-operator/pkg/util/config"
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
"github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil"
) )
func (c *Controller) makeClusterConfig() cluster.Config { func (c *Controller) makeClusterConfig() cluster.Config {
infrastructureRoles := make(map[string]spec.PgUser)
for k, v := range c.InfrastructureRoles {
infrastructureRoles[k] = v
}
return cluster.Config{ return cluster.Config{
KubeClient: c.KubeClient, KubeClient: c.KubeClient,
RestClient: c.RestClient, RestClient: c.RestClient,
EtcdClient: c.EtcdClient, EtcdClient: c.EtcdClient,
TeamsAPIClient: c.TeamsAPIClient, TeamsAPIClient: c.TeamsAPIClient,
OpConfig: c.opConfig, OpConfig: config.Copy(c.opConfig),
InfrastructureRoles: c.InfrastructureRoles, InfrastructureRoles: infrastructureRoles,
} }
} }
@ -55,6 +62,10 @@ func thirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource {
} }
} }
func (c *Controller) clusterWorkerId(clusterName spec.NamespacedName) uint32 {
return crc32.ChecksumIEEE([]byte(clusterName.String())) % c.opConfig.Workers
}
func (c *Controller) createTPR() error { func (c *Controller) createTPR() error {
TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor)
tpr := thirdPartyResource(TPRName) tpr := thirdPartyResource(TPRName)

View File

@ -5,22 +5,31 @@ import (
"k8s.io/client-go/pkg/types" "k8s.io/client-go/pkg/types"
) )
type PodEventType string type EventType string
type NamespacedName types.NamespacedName type NamespacedName types.NamespacedName
const ( const (
PodEventAdd PodEventType = "ADD" EventAdd EventType = "ADD"
PodEventUpdate PodEventType = "UPDATE" EventUpdate EventType = "UPDATE"
PodEventDelete PodEventType = "DELETE" EventDelete EventType = "DELETE"
EventSync EventType = "SYNC"
) )
type ClusterEvent struct {
UID types.UID
EventType EventType
OldSpec *Postgresql
NewSpec *Postgresql
WorkerID uint32
}
type PodEvent struct { type PodEvent struct {
ClusterName NamespacedName ClusterName NamespacedName
PodName NamespacedName PodName NamespacedName
PrevPod *v1.Pod PrevPod *v1.Pod
CurPod *v1.Pod CurPod *v1.Pod
EventType PodEventType EventType EventType
} }
type PgUser struct { type PgUser struct {
@ -31,9 +40,17 @@ type PgUser struct {
} }
func (p NamespacedName) String() string { func (p NamespacedName) String() string {
if p.Namespace == "" && p.Name == "" {
return ""
}
return types.NamespacedName(p).String() return types.NamespacedName(p).String()
} }
func (p NamespacedName) MarshalJSON() ([]byte, error) {
return []byte("\"" + p.String() + "\""), nil
}
func (n *NamespacedName) Decode(value string) error { func (n *NamespacedName) Decode(value string) error {
name := types.NewNamespacedNameFromString(value) name := types.NewNamespacedNameFromString(value)
if value != "" && name == (types.NamespacedName{}) { if value != "" && name == (types.NamespacedName{}) {

View File

@ -53,6 +53,7 @@ type Config struct {
KubeIAMRole string `name:"kube_iam_role"` KubeIAMRole string `name:"kube_iam_role"`
DebugLogging bool `name:"debug_logging" default:"false"` DebugLogging bool `name:"debug_logging" default:"false"`
DNSNameFormat string `name:"dns_name_format" default:"%s.%s.%s"` DNSNameFormat string `name:"dns_name_format" default:"%s.%s.%s"`
Workers uint32 `name:"workers" default:"4"`
} }
func (c Config) MustMarshal() string { func (c Config) MustMarshal() string {
@ -83,3 +84,14 @@ func NewFromMap(m map[string]string) *Config {
return &cfg return &cfg
} }
func Copy(c *Config) Config {
cfg := *c
cfg.ClusterLabels = make(map[string]string, len(c.ClusterLabels))
for k, v := range c.ClusterLabels {
cfg.ClusterLabels[k] = v
}
return cfg
}