diff --git a/operator/supervisor.go b/operator/supervisor.go new file mode 100644 index 000000000..b7d5da559 --- /dev/null +++ b/operator/supervisor.go @@ -0,0 +1,340 @@ +package operator + +import ( + "fmt" + "log" + "sync" + "time" + + etcdclient "github.com/coreos/etcd/client" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/fields" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" +) + +const ( + ACTION_DELETE = "delete" + ACTION_UPDATE = "update" + ACTION_ADD = "add" +) + +type podEvent struct { + namespace string + name string + actionType string +} + +type podWatcher struct { + podNamespace string + podName string + eventsChannel chan podEvent + subscribe bool +} + +type SpiloSupervisor struct { + podEvents chan podEvent + podWatchers chan podWatcher + SpiloClient *rest.RESTClient + Clientset *kubernetes.Clientset + + spiloInformer cache.SharedIndexInformer + podInformer cache.SharedIndexInformer + etcdApiClient etcdclient.KeysAPI +} + +func podsListWatch(client *kubernetes.Clientset) *cache.ListWatch { + return cache.NewListWatchFromClient(client.Core().RESTClient(), "pods", api.NamespaceAll, fields.Everything()) +} + +func newSupervisor(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset) *SpiloSupervisor { + spiloSupervisor := &SpiloSupervisor{ + SpiloClient: spiloClient, + Clientset: clientset, + } + + spiloInformer := cache.NewSharedIndexInformer( + cache.NewListWatchFromClient(spiloClient, "spilos", api.NamespaceAll, fields.Everything()), + &Spilo{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + + spiloInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: spiloSupervisor.spiloAdd, + UpdateFunc: spiloSupervisor.spiloUpdate, + DeleteFunc: spiloSupervisor.spiloDelete, + }) + + podInformer := cache.NewSharedIndexInformer( + podsListWatch(clientset), + &v1.Pod{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + + podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: spiloSupervisor.podAdd, + UpdateFunc: spiloSupervisor.podUpdate, + DeleteFunc: spiloSupervisor.podDelete, + }) + + spiloSupervisor.spiloInformer = spiloInformer + spiloSupervisor.podInformer = podInformer + + cfg := etcdclient.Config{ + Endpoints: []string{etcdHostOutside}, + Transport: etcdclient.DefaultTransport, + HeaderTimeoutPerRequest: time.Second, + } + + c, err := etcdclient.New(cfg) + if err != nil { + log.Fatal(err) + } + + spiloSupervisor.etcdApiClient = etcdclient.NewKeysAPI(c) + spiloSupervisor.podEvents = make(chan podEvent) + + return spiloSupervisor +} + +func (d *SpiloSupervisor) podAdd(obj interface{}) { + pod := obj.(*v1.Pod) + d.podEvents <- podEvent{ + namespace: pod.Namespace, + name: pod.Name, + actionType: ACTION_ADD, + } +} + +func (d *SpiloSupervisor) podDelete(obj interface{}) { + pod := obj.(*v1.Pod) + d.podEvents <- podEvent{ + namespace: pod.Namespace, + name: pod.Name, + actionType: ACTION_DELETE, + } +} + +func (d *SpiloSupervisor) podUpdate(old, cur interface{}) { + oldPod := old.(*v1.Pod) + d.podEvents <- podEvent{ + namespace: oldPod.Namespace, + name: oldPod.Name, + actionType: ACTION_UPDATE, + } +} + +func (z *SpiloSupervisor) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + + if err := EnsureSpiloThirdPartyResource(z.Clientset); err != nil { + log.Fatalf("Couldn't create ThirdPartyResource: %s", err) + } + + go z.spiloInformer.Run(stopCh) + go z.podInformer.Run(stopCh) + go z.podWatcher(stopCh) + + <-stopCh +} + +func (z *SpiloSupervisor) spiloAdd(obj interface{}) { + spilo := obj.(*Spilo) + + clusterName := (*spilo).Metadata.Name + ns := (*spilo).Metadata.Namespace + + //TODO: check if object already exists before creating + z.CreateEndPoint(ns, clusterName) + z.CreateService(ns, clusterName) + z.CreateSecrets(ns, clusterName) + z.CreateStatefulSet(spilo) +} + +func (z *SpiloSupervisor) spiloUpdate(old, cur interface{}) { + oldSpilo := old.(*Spilo) + curSpilo := cur.(*Spilo) + + //if oldSpilo.Spec.NumberOfInstances != curSpilo.Spec.NumberOfInstances { + // z.UpdateStatefulSet(curSpilo) + //} + + if oldSpilo.Spec.DockerImage != curSpilo.Spec.DockerImage { + log.Printf("Updating DockerImage: %s.%s", + curSpilo.Metadata.Namespace, + curSpilo.Metadata.Name) + + z.UpdateStatefulSetImage(curSpilo) + } + + log.Printf("Update spilo old: %+v cur: %+v", *oldSpilo, *curSpilo) +} + +func (z *SpiloSupervisor) spiloDelete(obj interface{}) { + spilo := obj.(*Spilo) + + err := z.DeleteStatefulSet(spilo.Metadata.Namespace, spilo.Metadata.Name) + if err != nil { + log.Printf("Error while deleting stateful set: %+v", err) + } +} + +func (z *SpiloSupervisor) DeleteStatefulSet(ns, clusterName string) error { + orphanDependents := false + deleteOptions := v1.DeleteOptions{ + OrphanDependents: &orphanDependents, + } + + listOptions := v1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", clusterName), + } + + podList, err := z.Clientset.Pods(ns).List(listOptions) + if err != nil { + log.Printf("Error: %+v", err) + } + + err = z.Clientset.StatefulSets(ns).Delete(clusterName, &deleteOptions) + if err != nil { + return err + } + log.Printf("StatefulSet %s.%s has been deleted\n", ns, clusterName) + + for _, pod := range podList.Items { + err = z.Clientset.Pods(pod.Namespace).Delete(pod.Name, &deleteOptions) + if err != nil { + log.Printf("Error while deleting Pod %s: %+v", pod.Name, err) + return err + } + + log.Printf("Pod %s.%s has been deleted\n", pod.Namespace, pod.Name) + } + + serviceList, err := z.Clientset.Services(ns).List(listOptions) + if err != nil { + return err + } + + for _, service := range serviceList.Items { + err = z.Clientset.Services(service.Namespace).Delete(service.Name, &deleteOptions) + if err != nil { + log.Printf("Error while deleting Service %s: %+v", service.Name, err) + + return err + } + + log.Printf("Service %s.%s has been deleted\n", service.Namespace, service.Name) + } + + z.DeleteEtcdKey(clusterName) + + return nil +} + +func (z *SpiloSupervisor) UpdateStatefulSet(spilo *Spilo) { + ns := (*spilo).Metadata.Namespace + + statefulSet := z.createSetFromSpilo(spilo) + _, err := z.Clientset.StatefulSets(ns).Update(&statefulSet) + + if err != nil { + log.Printf("Error while updating StatefulSet: %s", err) + } +} + +func (z *SpiloSupervisor) UpdateStatefulSetImage(spilo *Spilo) { + ns := (*spilo).Metadata.Namespace + + z.UpdateStatefulSet(spilo) + + listOptions := v1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", (*spilo).Metadata.Name), + } + + pods, err := z.Clientset.Pods(ns).List(listOptions) + if err != nil { + log.Printf("Error while getting pods: %s", err) + } + + orphanDependents := true + deleteOptions := v1.DeleteOptions{ + OrphanDependents: &orphanDependents, + } + + var masterPodName string + for _, pod := range pods.Items { + log.Printf("Pod processing: %s", pod.Name) + + role, ok := pod.Labels["spilo-role"] + if ok == false { + log.Println("No spilo-role label") + continue + } + if role == "master" { + masterPodName = pod.Name + log.Printf("Skipping master: %s", masterPodName) + continue + } + + err := z.Clientset.Pods(ns).Delete(pod.Name, &deleteOptions) + if err != nil { + log.Printf("Error while deleting Pod %s.%s: %s", pod.Namespace, pod.Name, err) + } else { + log.Printf("Pod deleted: %s.%s", pod.Namespace, pod.Name) + } + + w1 := podWatcher{ + podNamespace: pod.Namespace, + podName: pod.Name, + eventsChannel: make(chan podEvent, 1), + subscribe: true, + } + + log.Printf("Watching pod %s.%s being recreated", pod.Namespace, pod.Name) + z.podWatchers <- w1 + for e := range w1.eventsChannel { + if e.actionType == ACTION_ADD { break } + } + + log.Printf("Pod %s.%s has been recreated", pod.Namespace, pod.Name) + } + + //TODO: do manual failover + err = z.Clientset.Pods(ns).Delete(masterPodName, &deleteOptions) + if err != nil { + log.Printf("Error while deleting Pod %s.%s: %s", ns, masterPodName, err) + } else { + log.Printf("Pod deleted: %s.%s", ns, masterPodName) + } +} + +func (z *SpiloSupervisor) podWatcher(stopCh <-chan struct{}) { + //TODO: mind the namespace of the pod + + watchers := make(map[string] podWatcher) + for { + select { + case watcher := <-z.podWatchers: + if watcher.subscribe { + watchers[watcher.podName] = watcher + } else { + close(watcher.eventsChannel) + delete(watchers, watcher.podName) + } + case event := <-z.podEvents: + log.Printf("Pod watcher event: %s.%s - %s", event.namespace, event.name, event.actionType) + log.Printf("Current watchers: %+v", watchers) + podWatcher, ok := watchers[event.name] + if ok == false { + continue + } + + podWatcher.eventsChannel <- event + } + } +}