diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index bfa5f73f0..363ffe7c6 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -28,3 +28,4 @@ data: resync_period_pod: 5m super_username: postgres teams_api_url: https://teams.example.com/api/ + workers: "4" diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 4cbeb0166..56e5a3f7a 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -39,7 +39,7 @@ type Config struct { RestClient *rest.RESTClient EtcdClient etcdclient.KeysAPI TeamsAPIClient *teams.TeamsAPI - OpConfig *config.Config + OpConfig config.Config InfrastructureRoles map[string]spec.PgUser // inherited from the controller } @@ -60,15 +60,18 @@ type Cluster struct { pgUsers map[string]spec.PgUser podEvents chan spec.PodEvent podSubscribers map[spec.NamespacedName]chan spec.PodEvent + podSubscribersMu sync.RWMutex pgDb *sql.DB mu sync.Mutex masterLess bool podDispatcherRunning bool + deleteOptions *v1.DeleteOptions } -func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Logger) *Cluster { +func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} + orphanDependents := true cluster := &Cluster{ Config: cfg, @@ -80,6 +83,7 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Logger) *Cluster { kubeResources: kubeResources, masterLess: false, podDispatcherRunning: false, + deleteOptions: &v1.DeleteOptions{OrphanDependents: &orphanDependents}, } return cluster @@ -89,12 +93,12 @@ func (c *Cluster) ClusterName() spec.NamespacedName { return util.NameFromMeta(c.Metadata) } -func (c *Cluster) TeamName() string { +func (c *Cluster) teamName() string { // TODO: check Teams API for the actual name (in case the user passes an integer Id). return c.Spec.TeamId } -func (c *Cluster) SetStatus(status spec.PostgresStatus) { +func (c *Cluster) setStatus(status spec.PostgresStatus) { c.Status = status b, err := json.Marshal(status) if err != nil { @@ -154,11 +158,25 @@ func (c *Cluster) etcdKeyExists(keyName string) (bool, error) { } func (c *Cluster) Create(stopCh <-chan struct{}) error { + c.mu.Lock() + defer c.mu.Unlock() + var err error + if !c.podDispatcherRunning { go c.podEventsDispatcher(stopCh) c.podDispatcherRunning = true } + defer func() { + if err == nil { + c.setStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running? + } else { + c.setStatus(spec.ClusterStatusAddFailed) + } + }() + + c.setStatus(spec.ClusterStatusCreating) + keyExist, err := c.etcdKeyExists(fmt.Sprintf("/%s/%s", c.OpConfig.EtcdScope, c.Metadata.Name)) if err != nil { c.logger.Warnf("Can't check etcd key: %s", err) @@ -180,38 +198,36 @@ func (c *Cluster) Create(stopCh <-chan struct{}) error { c.logger.Infof("Service '%s' has been successfully created", util.NameFromMeta(service.ObjectMeta)) } - if err := c.initUsers(); err != nil { + if err = c.initUsers(); err != nil { return err - } else { - c.logger.Infof("User secrets have been initialized") } + c.logger.Infof("User secrets have been initialized") - if err := c.applySecrets(); err != nil { + if err = c.applySecrets(); err != nil { return fmt.Errorf("Can't create Secrets: %s", err) - } else { - c.logger.Infof("Secrets have been successfully created") } + c.logger.Infof("Secrets have been successfully created") ss, err := c.createStatefulSet() if err != nil { return fmt.Errorf("Can't create StatefulSet: %s", err) - } else { - c.logger.Infof("StatefulSet '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) } + c.logger.Infof("StatefulSet '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Info("Waiting for cluster being ready") - if err := c.waitStatefulsetPodsReady(); err != nil { + if err = c.waitStatefulsetPodsReady(); err != nil { c.logger.Errorf("Failed to create cluster: %s", err) return err } + c.logger.Infof("Pods are ready") if !c.masterLess { - if err := c.initDbConn(); err != nil { + if err = c.initDbConn(); err != nil { return fmt.Errorf("Can't init db connection: %s", err) } - if err := c.createUsers(); err != nil { + if err = c.createUsers(); err != nil { return fmt.Errorf("Can't create users: %s", err) } else { c.logger.Infof("Users have been successfully created") @@ -323,13 +339,18 @@ func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resource } func (c *Cluster) Update(newSpec *spec.Postgresql) error { - c.logger.Infof("Cluster update from version %s to %s", + c.mu.Lock() + defer c.mu.Unlock() + + c.setStatus(spec.ClusterStatusUpdating) + c.logger.Debugf("Cluster update from version %s to %s", c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion) newService := c.genService(newSpec.Spec.AllowedSourceRanges) if match, reason := c.sameServiceWith(newService); !match { c.logServiceChanges(c.Service, newService, true, reason) if err := c.updateService(newService); err != nil { + c.setStatus(spec.ClusterStatusUpdateFailed) return fmt.Errorf("Can't update Service: %s", err) } else { c.logger.Infof("Service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta)) @@ -348,8 +369,10 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, reason) //TODO: mind the case of updating allowedSourceRanges if err := c.updateStatefulSet(newStatefulSet); err != nil { + c.setStatus(spec.ClusterStatusUpdateFailed) return fmt.Errorf("Can't upate StatefulSet: %s", err) } + //TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted c.logger.Infof("StatefulSet '%s' has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) } @@ -363,15 +386,20 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.logger.Infof("Rolling update is needed") // TODO: wait for actual streaming to the replica if err := c.recreatePods(); err != nil { + c.setStatus(spec.ClusterStatusUpdateFailed) return fmt.Errorf("Can't recreate Pods: %s", err) } c.logger.Infof("Rolling update has been finished") } + c.setStatus(spec.ClusterStatusRunning) return nil } func (c *Cluster) Delete() error { + c.mu.Lock() + defer c.mu.Unlock() + if err := c.deleteEndpoint(); err != nil { c.logger.Errorf("Can't delete Endpoint: %s", err) } diff --git a/pkg/cluster/pg.go b/pkg/cluster/pg.go index fc159ed87..090479ad2 100644 --- a/pkg/cluster/pg.go +++ b/pkg/cluster/pg.go @@ -26,8 +26,6 @@ func (c *Cluster) pgConnectionString() string { func (c *Cluster) initDbConn() error { //TODO: concurrent safe? if c.pgDb == nil { - c.mu.Lock() - defer c.mu.Unlock() if c.pgDb == nil { conn, err := sql.Open("postgres", c.pgConnectionString()) if err != nil { diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 3b35ccbf5..ee8666249 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -72,7 +72,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error { } for _, pvc := range pvcs { c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) - if err := c.KubeClient.PersistentVolumeClaims(ns).Delete(pvc.Name, deleteOptions); err != nil { + if err := c.KubeClient.PersistentVolumeClaims(ns).Delete(pvc.Name, c.deleteOptions); err != nil { c.logger.Warningf("Can't delete PersistentVolumeClaim: %s", err) } } @@ -86,17 +86,10 @@ func (c *Cluster) deletePersistenVolumeClaims() error { } func (c *Cluster) deletePod(podName spec.NamespacedName) error { - ch := make(chan spec.PodEvent) - if _, ok := c.podSubscribers[podName]; ok { - panic("Pod '" + podName.String() + "' is already subscribed") - } - c.podSubscribers[podName] = ch - defer func() { - close(ch) - delete(c.podSubscribers, podName) - }() + ch := c.registerPodSubscriber(podName) + defer c.unregisterPodSubscriber(podName) - if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, deleteOptions); err != nil { + if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil { return err } @@ -108,6 +101,9 @@ func (c *Cluster) deletePod(podName spec.NamespacedName) error { } func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { + c.podSubscribersMu.Lock() + defer c.podSubscribersMu.Unlock() + if _, ok := c.podSubscribers[podName]; !ok { panic("Subscriber for Pod '" + podName.String() + "' is not found") } @@ -117,26 +113,25 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { } func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.PodEvent { + c.podSubscribersMu.Lock() + defer c.podSubscribersMu.Unlock() + ch := make(chan spec.PodEvent) if _, ok := c.podSubscribers[podName]; ok { panic("Pod '" + podName.String() + "' is already subscribed") } c.podSubscribers[podName] = ch + return ch } func (c *Cluster) recreatePod(pod v1.Pod) error { podName := util.NameFromMeta(pod.ObjectMeta) - orphanDependents := false - deleteOptions := &v1.DeleteOptions{ - OrphanDependents: &orphanDependents, - } - ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) - if err := c.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil { + if err := c.KubeClient.Pods(pod.Namespace).Delete(pod.Name, c.deleteOptions); err != nil { return fmt.Errorf("Can't delete Pod: %s", err) } @@ -156,10 +151,11 @@ func (c *Cluster) podEventsDispatcher(stopCh <-chan struct{}) { for { select { case event := <-c.podEvents: - if subscriber, ok := c.podSubscribers[event.PodName]; ok { + c.podSubscribersMu.RLock() + subscriber, ok := c.podSubscribers[event.PodName] + c.podSubscribersMu.RUnlock() + if ok { go func() { subscriber <- event }() //TODO: is it a right way to do nonblocking send to the channel? - } else { - c.logger.Debugf("Skipping event for an unwatched Pod '%s'", event.PodName) } case <-stopCh: return @@ -183,7 +179,7 @@ func (c *Cluster) recreatePods() error { var masterPod v1.Pod for _, pod := range pods.Items { - role := c.PodSpiloRole(&pod) + role := c.podSpiloRole(&pod) if role == constants.PodRoleMaster { masterPod = pod diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index c6442841a..b6eeef252 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -11,12 +11,7 @@ import ( "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" ) -var ( - deleteOptions = &v1.DeleteOptions{OrphanDependents: &orphanDependents} - orphanDependents = false -) - -func (c *Cluster) LoadResources() error { +func (c *Cluster) loadResources() error { ns := c.Metadata.Namespace listOptions := v1.ListOptions{ LabelSelector: c.labelsSet().String(), @@ -152,7 +147,7 @@ func (c *Cluster) deleteStatefulSet() error { return fmt.Errorf("There is no StatefulSet in the cluster") } - err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(c.Statefulset.Name, deleteOptions) + err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(c.Statefulset.Name, c.deleteOptions) if err != nil { return err } @@ -217,7 +212,7 @@ func (c *Cluster) deleteService() error { if c.Service == nil { return fmt.Errorf("There is no Service in the cluster") } - err := c.KubeClient.Services(c.Service.Namespace).Delete(c.Service.Name, deleteOptions) + err := c.KubeClient.Services(c.Service.Namespace).Delete(c.Service.Name, c.deleteOptions) if err != nil { return err } @@ -256,7 +251,7 @@ func (c *Cluster) deleteEndpoint() error { if c.Endpoint == nil { return fmt.Errorf("There is no Endpoint in the cluster") } - err := c.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, deleteOptions) + err := c.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, c.deleteOptions) if err != nil { return err } @@ -300,7 +295,7 @@ func (c *Cluster) applySecrets() error { func (c *Cluster) deleteSecret(secret *v1.Secret) error { c.logger.Debugf("Deleting Secret '%s'", util.NameFromMeta(secret.ObjectMeta)) - err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, deleteOptions) + err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) if err != nil { return err } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 1b5e62fec..254a9c310 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -7,8 +7,14 @@ import ( ) func (c *Cluster) SyncCluster(stopCh <-chan struct{}) { + c.mu.Lock() + defer c.mu.Unlock() + + c.loadResources() + if !c.podDispatcherRunning { go c.podEventsDispatcher(stopCh) + c.podDispatcherRunning = true } c.logger.Debugf("Syncing Secrets") diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 96900b01a..56596cc8b 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -110,7 +110,7 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent) error { for { select { case podEvent := <-podEvents: - role := c.PodSpiloRole(podEvent.CurPod) + role := c.podSpiloRole(podEvent.CurPod) // We cannot assume any role of the newly created pod. Normally, for a multi-pod cluster // we should observe the 'replica' value, but it could be that some pods are not allowed // to promote, therefore, the new pod could be a master as well. @@ -127,7 +127,7 @@ func (c *Cluster) waitForPodDeletion(podEvents chan spec.PodEvent) error { for { select { case podEvent := <-podEvents: - if podEvent.EventType == spec.PodEventDelete { + if podEvent.EventType == spec.EventDelete { return nil } case <-time.After(c.OpConfig.PodDeletionWaitTimeout): @@ -233,7 +233,7 @@ func (c *Cluster) dnsName() string { return strings.ToLower(fmt.Sprintf( c.OpConfig.DNSNameFormat, c.Spec.ClusterName, - c.TeamName(), + c.teamName(), c.OpConfig.DbHostedZone)) } @@ -266,6 +266,6 @@ func (c *Cluster) deleteEtcdKey() error { return nil } -func (c *Cluster) PodSpiloRole(pod *v1.Pod) string { +func (c *Cluster) podSpiloRole(pod *v1.Pod) string { return pod.Labels[c.OpConfig.PodRoleLabel] } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5c9273870..b822be9d1 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -1,6 +1,7 @@ package controller import ( + "fmt" "sync" "github.com/Sirupsen/logrus" @@ -28,14 +29,17 @@ type Controller struct { Config opConfig *config.Config logger *logrus.Entry - clusters map[spec.NamespacedName]*cluster.Cluster - stopChMap map[spec.NamespacedName]chan struct{} waitCluster sync.WaitGroup + clustersMu sync.RWMutex + clusters map[spec.NamespacedName]*cluster.Cluster + stopChs map[spec.NamespacedName]chan struct{} + postgresqlInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer + podCh chan spec.PodEvent - podCh chan spec.PodEvent + clusterEventQueues []*cache.FIFO } func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { @@ -47,12 +51,12 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger) return &Controller{ - Config: *controllerConfig, - opConfig: operatorConfig, - logger: logger.WithField("pkg", "controller"), - clusters: make(map[spec.NamespacedName]*cluster.Cluster), - stopChMap: make(map[spec.NamespacedName]chan struct{}), - podCh: make(chan spec.PodEvent), + Config: *controllerConfig, + opConfig: operatorConfig, + logger: logger.WithField("pkg", "controller"), + clusters: make(map[spec.NamespacedName]*cluster.Cluster), + stopChs: make(map[spec.NamespacedName]chan struct{}), + podCh: make(chan spec.PodEvent), } } @@ -64,6 +68,10 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { go c.runInformers(stopCh) + for i := range c.clusterEventQueues { + go c.processClusterEventsQueue(i) + } + c.logger.Info("Started working in background") } @@ -117,6 +125,18 @@ func (c *Controller) initController() { if err := c.initEtcdClient(c.opConfig.EtcdHost); err != nil { c.logger.Fatalf("Can't get etcd client: %s", err) } + + c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) + for i := range c.clusterEventQueues { + c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) { + e, ok := obj.(spec.ClusterEvent) + if !ok { + return "", fmt.Errorf("Can't cast to ClusterEvent") + } + + return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil + }) + } } func (c *Controller) runInformers(stopCh <-chan struct{}) { diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index ecda7936f..54b92e2c1 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -65,7 +65,7 @@ func (c *Controller) podAdd(obj interface{}) { ClusterName: c.PodClusterName(pod), PodName: util.NameFromMeta(pod.ObjectMeta), CurPod: pod, - EventType: spec.PodEventAdd, + EventType: spec.EventAdd, } c.podCh <- podEvent @@ -87,7 +87,7 @@ func (c *Controller) podUpdate(prev, cur interface{}) { PodName: util.NameFromMeta(curPod.ObjectMeta), PrevPod: prevPod, CurPod: curPod, - EventType: spec.PodEventUpdate, + EventType: spec.EventUpdate, } c.podCh <- podEvent @@ -103,7 +103,7 @@ func (c *Controller) podDelete(obj interface{}) { ClusterName: c.PodClusterName(pod), PodName: util.NameFromMeta(pod.ObjectMeta), CurPod: pod, - EventType: spec.PodEventDelete, + EventType: spec.EventDelete, } c.podCh <- podEvent @@ -114,7 +114,11 @@ func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { for { select { case event := <-c.podCh: - if subscriber, ok := c.clusters[event.ClusterName]; ok { + c.clustersMu.RLock() + subscriber, ok := c.clusters[event.ClusterName] + c.clustersMu.RUnlock() + + if ok { c.logger.Debugf("Sending %s event of Pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) go subscriber.ReceivePodEvent(event) } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index e9b80a4d2..23956e6d3 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -8,7 +8,9 @@ import ( "k8s.io/client-go/pkg/api/meta" "k8s.io/client-go/pkg/fields" "k8s.io/client-go/pkg/runtime" + "k8s.io/client-go/pkg/types" "k8s.io/client-go/pkg/watch" + "k8s.io/client-go/tools/cache" "github.bus.zalan.do/acid/postgres-operator/pkg/cluster" "github.bus.zalan.do/acid/postgres-operator/pkg/spec" @@ -35,24 +37,17 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e return nil, fmt.Errorf("Can't extract list of postgresql objects: %s", err) } - clusterConfig := c.makeClusterConfig() for _, obj := range objList { pg, ok := obj.(*spec.Postgresql) if !ok { return nil, fmt.Errorf("Can't cast object to postgresql") } - clusterName := util.NameFromMeta(pg.Metadata) - cl := cluster.New(clusterConfig, *pg, c.logger.Logger) + c.queueClusterEvent(nil, pg, spec.EventSync) - stopCh := make(chan struct{}) - c.stopChMap[clusterName] = stopCh - c.clusters[clusterName] = cl - cl.LoadResources() - cl.ListResources() - cl.SyncCluster(stopCh) + c.logger.Debugf("Sync of the '%s' cluster has been queued", util.NameFromMeta(pg.Metadata)) } - if len(c.clusters) > 0 { - c.logger.Infof("There are %d clusters currently running", len(c.clusters)) + if len(objList) > 0 { + c.logger.Infof("There are %d clusters currently running", len(objList)) } else { c.logger.Infof("No clusters running") } @@ -60,6 +55,135 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e return object, err } +func (c *Controller) processEvent(obj interface{}) error { + var clusterName spec.NamespacedName + + event, ok := obj.(spec.ClusterEvent) + if !ok { + return fmt.Errorf("Can't cast to ClusterEvent") + } + logger := c.logger.WithField("worker", event.WorkerID) + + if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { + clusterName = util.NameFromMeta(event.NewSpec.Metadata) + } else { + clusterName = util.NameFromMeta(event.OldSpec.Metadata) + } + + c.clustersMu.RLock() + cl, clusterFound := c.clusters[clusterName] + stopCh := c.stopChs[clusterName] + c.clustersMu.RUnlock() + + switch event.EventType { + case spec.EventAdd: + if clusterFound { + logger.Debugf("Cluster '%s' already exists", clusterName) + return nil + } + + logger.Infof("Creation of the '%s' cluster started", clusterName) + + stopCh := make(chan struct{}) + cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + + c.clustersMu.Lock() + c.clusters[clusterName] = cl + c.stopChs[clusterName] = stopCh + c.clustersMu.Unlock() + + if err := cl.Create(stopCh); err != nil { + logger.Errorf("Can't create cluster: %s", err) + return nil + } + + logger.Infof("Cluster '%s' has been created", clusterName) + case spec.EventUpdate: + logger.Infof("Update of the '%s' cluster started", clusterName) + + if !clusterFound { + logger.Warnf("Cluster '%s' does not exist", clusterName) + return nil + } + if err := cl.Update(event.NewSpec); err != nil { + logger.Errorf("Can't update cluster: %s", err) + return nil + } + logger.Infof("Cluster '%s' has been updated", clusterName) + case spec.EventDelete: + logger.Infof("Deletion of the '%s' cluster started", clusterName) + if !clusterFound { + logger.Errorf("Unknown cluster: %s", clusterName) + return nil + } + + if err := cl.Delete(); err != nil { + logger.Errorf("Can't delete cluster '%s': %s", clusterName, err) + return nil + } + close(c.stopChs[clusterName]) + + c.clustersMu.Lock() + delete(c.clusters, clusterName) + delete(c.stopChs, clusterName) + c.clustersMu.Unlock() + + logger.Infof("Cluster '%s' has been deleted", clusterName) + case spec.EventSync: + logger.Infof("Syncing of the '%s' cluster started", clusterName) + + // no race condition because a cluster is always processed by single worker + if !clusterFound { + cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + stopCh = make(chan struct{}) + + c.clustersMu.Lock() + c.clusters[clusterName] = cl + c.stopChs[clusterName] = stopCh + c.clustersMu.Unlock() + } + + cl.SyncCluster(stopCh) + + logger.Infof("Cluster '%s' has been synced", clusterName) + } + + return nil +} + +func (c *Controller) processClusterEventsQueue(idx int) { + for { + c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)) + } +} + +func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec.EventType) { + var ( + uid types.UID + clusterName spec.NamespacedName + ) + + if old != nil { + uid = old.Metadata.GetUID() + clusterName = util.NameFromMeta(old.Metadata) + } else { + uid = new.Metadata.GetUID() + clusterName = util.NameFromMeta(new.Metadata) + } + workerId := c.clusterWorkerId(clusterName) + clusterEvent := spec.ClusterEvent{ + EventType: eventType, + UID: uid, + OldSpec: old, + NewSpec: new, + WorkerID: workerId, + } + //TODO: if we delete cluster, discard all the previous events for the cluster + + c.clusterEventQueues[workerId].Add(clusterEvent) + c.logger.WithField("worker", workerId).Infof("%s of the '%s' cluster has been queued for", eventType, clusterName) +} + func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) { return c.RestClient.Get(). Prefix("watch"). @@ -77,34 +201,12 @@ func (c *Controller) postgresqlAdd(obj interface{}) { return } - clusterName := util.NameFromMeta(pg.Metadata) - - _, ok = c.clusters[clusterName] - if ok { - c.logger.Infof("Cluster '%s' already exists", clusterName) - return - } - - c.logger.Infof("Creation of a new Postgresql cluster '%s' started", clusterName) - cl := cluster.New(c.makeClusterConfig(), *pg, c.logger.Logger) - - c.clusters[clusterName] = cl - stopCh := make(chan struct{}) - c.stopChMap[clusterName] = stopCh - - cl.SetStatus(spec.ClusterStatusCreating) - if err := cl.Create(stopCh); err != nil { - c.logger.Errorf("Can't create cluster: %s", err) - cl.SetStatus(spec.ClusterStatusAddFailed) - return - } - cl.SetStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running? - - c.logger.Infof("Postgresql cluster '%s' has been created", clusterName) + // We will not get multiple Add events for the same cluster + c.queueClusterEvent(nil, pg, spec.EventAdd) } func (c *Controller) postgresqlUpdate(prev, cur interface{}) { - pgPrev, ok := prev.(*spec.Postgresql) + pgOld, ok := prev.(*spec.Postgresql) if !ok { c.logger.Errorf("Can't cast to postgresql spec") } @@ -112,51 +214,22 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { if !ok { c.logger.Errorf("Can't cast to postgresql spec") } - - clusterName := util.NameFromMeta(pgNew.Metadata) - - //TODO: Do not update cluster which is currently creating - if pgPrev.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion { - c.logger.Infof("Skipping update with no resource version change") + if pgOld.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion { return } - pgCluster := c.clusters[clusterName] // current - if reflect.DeepEqual(pgPrev.Spec, pgNew.Spec) { - c.logger.Infof("Skipping update with no spec change") + if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { return } - pgCluster.SetStatus(spec.ClusterStatusUpdating) - if err := pgCluster.Update(pgNew); err != nil { - pgCluster.SetStatus(spec.ClusterStatusUpdateFailed) - c.logger.Errorf("Can't update cluster: %s", err) - } else { - c.logger.Infof("Cluster has been updated") - } - pgCluster.SetStatus(spec.ClusterStatusRunning) + c.queueClusterEvent(pgOld, pgNew, spec.EventUpdate) } func (c *Controller) postgresqlDelete(obj interface{}) { - pgCur, ok := obj.(*spec.Postgresql) + pg, ok := obj.(*spec.Postgresql) if !ok { c.logger.Errorf("Can't cast to postgresql spec") return } - clusterName := util.NameFromMeta(pgCur.Metadata) - pgCluster, ok := c.clusters[clusterName] - if !ok { - c.logger.Errorf("Unknown cluster: %s", clusterName) - return - } - c.logger.Infof("Starting deletion of the '%s' cluster", util.NameFromMeta(pgCur.Metadata)) - if err := pgCluster.Delete(); err != nil { - c.logger.Errorf("Can't delete cluster '%s': %s", clusterName, err) - return - } - - close(c.stopChMap[clusterName]) - delete(c.clusters, clusterName) - - c.logger.Infof("Cluster '%s' has been successfully deleted", clusterName) + c.queueClusterEvent(pg, nil, spec.EventDelete) } diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 4bb5b776a..123e230cd 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -2,24 +2,31 @@ package controller import ( "fmt" + "hash/crc32" "k8s.io/client-go/pkg/api/v1" extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1" "github.bus.zalan.do/acid/postgres-operator/pkg/cluster" "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/config" "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" ) func (c *Controller) makeClusterConfig() cluster.Config { + infrastructureRoles := make(map[string]spec.PgUser) + for k, v := range c.InfrastructureRoles { + infrastructureRoles[k] = v + } + return cluster.Config{ KubeClient: c.KubeClient, RestClient: c.RestClient, EtcdClient: c.EtcdClient, TeamsAPIClient: c.TeamsAPIClient, - OpConfig: c.opConfig, - InfrastructureRoles: c.InfrastructureRoles, + OpConfig: config.Copy(c.opConfig), + InfrastructureRoles: infrastructureRoles, } } @@ -55,6 +62,10 @@ func thirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource { } } +func (c *Controller) clusterWorkerId(clusterName spec.NamespacedName) uint32 { + return crc32.ChecksumIEEE([]byte(clusterName.String())) % c.opConfig.Workers +} + func (c *Controller) createTPR() error { TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) tpr := thirdPartyResource(TPRName) diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 23fc0ca64..3fbf7623b 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -5,22 +5,31 @@ import ( "k8s.io/client-go/pkg/types" ) -type PodEventType string +type EventType string type NamespacedName types.NamespacedName const ( - PodEventAdd PodEventType = "ADD" - PodEventUpdate PodEventType = "UPDATE" - PodEventDelete PodEventType = "DELETE" + EventAdd EventType = "ADD" + EventUpdate EventType = "UPDATE" + EventDelete EventType = "DELETE" + EventSync EventType = "SYNC" ) +type ClusterEvent struct { + UID types.UID + EventType EventType + OldSpec *Postgresql + NewSpec *Postgresql + WorkerID uint32 +} + type PodEvent struct { ClusterName NamespacedName PodName NamespacedName PrevPod *v1.Pod CurPod *v1.Pod - EventType PodEventType + EventType EventType } type PgUser struct { @@ -31,9 +40,17 @@ type PgUser struct { } func (p NamespacedName) String() string { + if p.Namespace == "" && p.Name == "" { + return "" + } + return types.NamespacedName(p).String() } +func (p NamespacedName) MarshalJSON() ([]byte, error) { + return []byte("\"" + p.String() + "\""), nil +} + func (n *NamespacedName) Decode(value string) error { name := types.NewNamespacedNameFromString(value) if value != "" && name == (types.NamespacedName{}) { diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index d260fe71e..852c23d0d 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -53,6 +53,7 @@ type Config struct { KubeIAMRole string `name:"kube_iam_role"` DebugLogging bool `name:"debug_logging" default:"false"` DNSNameFormat string `name:"dns_name_format" default:"%s.%s.%s"` + Workers uint32 `name:"workers" default:"4"` } func (c Config) MustMarshal() string { @@ -83,3 +84,14 @@ func NewFromMap(m map[string]string) *Config { return &cfg } + +func Copy(c *Config) Config { + cfg := *c + + cfg.ClusterLabels = make(map[string]string, len(c.ClusterLabels)) + for k, v := range c.ClusterLabels { + cfg.ClusterLabels[k] = v + } + + return cfg +}