From c299f2f1437f5810e054c993c6120456d35bcc90 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 23 Jan 2017 10:11:31 +0100 Subject: [PATCH] proper names small fixes --- .gitignore | 2 +- operator/etcd.go | 2 +- operator/objects.go | 12 +- operator/spilo.go | 24 +++- operator/zookeeper.go | 318 ------------------------------------------ testcluster.yaml | 10 +- 6 files changed, 30 insertions(+), 338 deletions(-) delete mode 100644 operator/zookeeper.go diff --git a/.gitignore b/.gitignore index ba4754ea0..d48619340 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,4 @@ _testmain.go *.exe *.test *.prof -/vendor/ +/vendor/ \ No newline at end of file diff --git a/operator/etcd.go b/operator/etcd.go index dfa1e9b49..4ae0b2d6f 100644 --- a/operator/etcd.go +++ b/operator/etcd.go @@ -7,7 +7,7 @@ import ( "log" ) -func (z *SpiloZooKeeper) DeleteEtcdKey(clusterName string) error { +func (z *SpiloSupervisor) DeleteEtcdKey(clusterName string) error { options := client.DeleteOptions{ Recursive: true, } diff --git a/operator/objects.go b/operator/objects.go index 0255ba844..2a1370848 100644 --- a/operator/objects.go +++ b/operator/objects.go @@ -8,7 +8,7 @@ import ( "log" ) -func (z *SpiloZooKeeper) CreateStatefulSet(spilo *Spilo) { +func (z *SpiloSupervisor) CreateStatefulSet(spilo *Spilo) { ns := (*spilo).Metadata.Namespace statefulSet := z.createSetFromSpilo(spilo) @@ -21,7 +21,7 @@ func (z *SpiloZooKeeper) CreateStatefulSet(spilo *Spilo) { } } -func (z *SpiloZooKeeper) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet { +func (z *SpiloSupervisor) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet { clusterName := (*spilo).Metadata.Name envVars := []v1.EnvVar{ @@ -126,7 +126,7 @@ func (z *SpiloZooKeeper) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet { Env: envVars, } - terminateGracePeriodSeconds := int64(0) + terminateGracePeriodSeconds := int64(30) podSpec := v1.PodSpec{ TerminationGracePeriodSeconds: &terminateGracePeriodSeconds, @@ -166,7 +166,7 @@ func (z *SpiloZooKeeper) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet { } } -func (z *SpiloZooKeeper) CreateSecrets(ns, name string) { +func (z *SpiloSupervisor) CreateSecrets(ns, name string) { secret := v1.Secret{ ObjectMeta: v1.ObjectMeta{ Name: name, @@ -191,7 +191,7 @@ func (z *SpiloZooKeeper) CreateSecrets(ns, name string) { } } -func (z *SpiloZooKeeper) CreateService(ns, name string) { +func (z *SpiloSupervisor) CreateService(ns, name string) { service := v1.Service{ ObjectMeta: v1.ObjectMeta{ Name: name, @@ -214,7 +214,7 @@ func (z *SpiloZooKeeper) CreateService(ns, name string) { } } -func (z *SpiloZooKeeper) CreateEndPoint(ns, name string) { +func (z *SpiloSupervisor) CreateEndPoint(ns, name string) { endPoint := v1.Endpoints{ ObjectMeta: v1.ObjectMeta{ Name: name, diff --git a/operator/spilo.go b/operator/spilo.go index 97d04d42d..ed48fecdd 100644 --- a/operator/spilo.go +++ b/operator/spilo.go @@ -9,15 +9,17 @@ import ( "k8s.io/client-go/pkg/api/meta" "k8s.io/client-go/pkg/api/unversioned" "k8s.io/client-go/rest" + "net/url" + "fmt" + "strings" ) type SpiloOperator struct { Options - ClientSet *kubernetes.Clientset - SpiloClient *rest.RESTClient - - SpiloZooKeeper *SpiloZooKeeper + ClientSet *kubernetes.Clientset + Client *rest.RESTClient + Supervisor *SpiloSupervisor } func New(options Options) *SpiloOperator { @@ -28,6 +30,14 @@ func New(options Options) *SpiloOperator { log.Fatalf("Couldn't create Kubernetes client: %s", err) } + etcdService, _ := clientSet.Services("default").Get("etcd-client") + if len(etcdService.Spec.Ports) != 1 { + log.Fatalln("Can't find Etcd cluster") + } + ports := etcdService.Spec.Ports[0] + nodeurl, _ := url.Parse(config.Host) + etcdHostOutside = fmt.Sprintf("http://%s:%d", strings.Split(nodeurl.Host, ":")[0], ports.NodePort) + spiloClient, err := newKubernetesSpiloClient(config) if err != nil { log.Fatalf("Couldn't create Spilo client: %s", err) @@ -36,8 +46,8 @@ func New(options Options) *SpiloOperator { operator := &SpiloOperator{ Options: options, ClientSet: clientSet, - SpiloClient: spiloClient, - SpiloZooKeeper: newZookeeper(spiloClient, clientSet), + Client: spiloClient, + Supervisor: newSupervisor(spiloClient, clientSet), } return operator @@ -46,7 +56,7 @@ func New(options Options) *SpiloOperator { func (o *SpiloOperator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { log.Printf("Spilo operator %v\n", VERSION) - go o.SpiloZooKeeper.Run(stopCh, wg) + go o.Supervisor.Run(stopCh, wg) log.Println("Started working in background") } diff --git a/operator/zookeeper.go b/operator/zookeeper.go deleted file mode 100644 index ce98db924..000000000 --- a/operator/zookeeper.go +++ /dev/null @@ -1,318 +0,0 @@ -package operator - -import ( - "fmt" - "log" - "sync" - "time" - - etcdclient "github.com/coreos/etcd/client" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/pkg/api" - "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/pkg/fields" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" -) - -const ( - ACTION_DELETE = "delete" - ACTION_UPDATE = "update" - ACTION_ADD = "add" -) - -type podEvent struct { - namespace string - name string - actionType string -} - -type podWatcher struct { - podNamespace string - podName string - eventsChannel chan podEvent - subscribe bool -} - -type SpiloZooKeeper struct { - podEvents chan podEvent - podWatchers chan podWatcher - SpiloClient *rest.RESTClient - Clientset *kubernetes.Clientset - - spiloInformer cache.SharedIndexInformer - podInformer cache.SharedIndexInformer - etcdApiClient etcdclient.KeysAPI -} - -func podsListWatch(client *kubernetes.Clientset) *cache.ListWatch { - return cache.NewListWatchFromClient(client.Core().RESTClient(), "pods", api.NamespaceAll, fields.Everything()) -} - -func newZookeeper(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset) *SpiloZooKeeper { - spiloZooKeeper := &SpiloZooKeeper{ - SpiloClient: spiloClient, - Clientset: clientset, - } - - spiloInformer := cache.NewSharedIndexInformer( - cache.NewListWatchFromClient(spiloClient, "spilos", api.NamespaceAll, fields.Everything()), - &Spilo{}, - resyncPeriod, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - spiloInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: spiloZooKeeper.spiloAdd, - UpdateFunc: spiloZooKeeper.spiloUpdate, - DeleteFunc: spiloZooKeeper.spiloDelete, - }) - - podInformer := cache.NewSharedIndexInformer( - podsListWatch(clientset), - &v1.Pod{}, - resyncPeriod, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: spiloZooKeeper.podAdd, - UpdateFunc: spiloZooKeeper.podUpdate, - DeleteFunc: spiloZooKeeper.podDelete, - }) - - spiloZooKeeper.spiloInformer = spiloInformer - spiloZooKeeper.podInformer = podInformer - - cfg := etcdclient.Config{ - Endpoints: []string{etcdHostOutside}, - Transport: etcdclient.DefaultTransport, - HeaderTimeoutPerRequest: time.Second, - } - - c, err := etcdclient.New(cfg) - if err != nil { - log.Fatal(err) - } - - spiloZooKeeper.etcdApiClient = etcdclient.NewKeysAPI(c) - spiloZooKeeper.podEvents = make(chan podEvent) - - return spiloZooKeeper -} - -func (d *SpiloZooKeeper) podAdd(obj interface{}) { - pod := obj.(*v1.Pod) - d.podEvents <- podEvent{ - namespace: pod.Namespace, - name: pod.Name, - actionType: ACTION_ADD, - } -} - -func (d *SpiloZooKeeper) podDelete(obj interface{}) { - pod := obj.(*v1.Pod) - d.podEvents <- podEvent{ - namespace: pod.Namespace, - name: pod.Name, - actionType: ACTION_DELETE, - } -} - -func (d *SpiloZooKeeper) podUpdate(old, cur interface{}) { - oldPod := old.(*v1.Pod) - d.podEvents <- podEvent{ - namespace: oldPod.Namespace, - name: oldPod.Name, - actionType: ACTION_UPDATE, - } -} - -func (z *SpiloZooKeeper) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { - defer wg.Done() - wg.Add(1) - - if err := EnsureSpiloThirdPartyResource(z.Clientset); err != nil { - log.Fatalf("Couldn't create ThirdPartyResource: %s", err) - } - - go z.spiloInformer.Run(stopCh) - go z.podInformer.Run(stopCh) - go z.podWatcher(stopCh) - - <-stopCh -} - -func (z *SpiloZooKeeper) spiloAdd(obj interface{}) { - spilo := obj.(*Spilo) - - clusterName := (*spilo).Metadata.Name - ns := (*spilo).Metadata.Namespace - - z.CreateEndPoint(ns, clusterName) - z.CreateService(ns, clusterName) - z.CreateSecrets(ns, clusterName) - z.CreateStatefulSet(spilo) -} - -func (z *SpiloZooKeeper) spiloUpdate(old, cur interface{}) { - oldSpilo := old.(*Spilo) - curSpilo := cur.(*Spilo) - - if oldSpilo.Spec.NumberOfInstances != curSpilo.Spec.NumberOfInstances { - z.UpdateStatefulSet(curSpilo) - } - - if oldSpilo.Spec.DockerImage != curSpilo.Spec.DockerImage { - z.UpdateStatefulSetImage(curSpilo) - } - - log.Printf("Update spilo old: %+v cur: %+v", *oldSpilo, *curSpilo) -} - -func (z *SpiloZooKeeper) spiloDelete(obj interface{}) { - spilo := obj.(*Spilo) - - err := z.DeleteStatefulSet(spilo.Metadata.Namespace, spilo.Metadata.Name) - if err != nil { - log.Printf("Error while deleting stateful set: %+v", err) - } -} - -func (z *SpiloZooKeeper) DeleteStatefulSet(ns, clusterName string) error { - orphanDependents := false - deleteOptions := v1.DeleteOptions{ - OrphanDependents: &orphanDependents, - } - - listOptions := v1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", clusterName), - } - - podList, err := z.Clientset.Pods(ns).List(listOptions) - if err != nil { - log.Printf("Error: %+v", err) - } - - err = z.Clientset.StatefulSets(ns).Delete(clusterName, &deleteOptions) - if err != nil { - return err - } - log.Printf("StatefulSet %s.%s has been deleted\n", ns, clusterName) - - for _, pod := range podList.Items { - err = z.Clientset.Pods(pod.Namespace).Delete(pod.Name, &deleteOptions) - if err != nil { - log.Printf("Error while deleting Pod %s: %+v", pod.Name, err) - return err - } - - log.Printf("Pod %s.%s has been deleted\n", pod.Namespace, pod.Name) - } - - serviceList, err := z.Clientset.Services(ns).List(listOptions) - if err != nil { - return err - } - - for _, service := range serviceList.Items { - err = z.Clientset.Services(service.Namespace).Delete(service.Name, &deleteOptions) - if err != nil { - log.Printf("Error while deleting Service %s: %+v", service.Name, err) - - return err - } - - log.Printf("Service %s.%s has been deleted\n", service.Namespace, service.Name) - } - - z.DeleteEtcdKey(clusterName) - - return nil -} - -func (z *SpiloZooKeeper) UpdateStatefulSet(spilo *Spilo) { - ns := (*spilo).Metadata.Namespace - - statefulSet := z.createSetFromSpilo(spilo) - _, err := z.Clientset.StatefulSets(ns).Update(&statefulSet) - - if err != nil { - log.Printf("Error while updating StatefulSet: %s", err) - } -} - -func (z *SpiloZooKeeper) UpdateStatefulSetImage(spilo *Spilo) { - ns := (*spilo).Metadata.Namespace - - z.UpdateStatefulSet(spilo) - - listOptions := v1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", (*spilo).Metadata.Name), - } - - pods, err := z.Clientset.Pods(ns).List(listOptions) - if err != nil { - log.Printf("Error while getting pods: %s", err) - } - - orphanDependents := true - deleteOptions := v1.DeleteOptions{ - OrphanDependents: &orphanDependents, - } - - var masterPodName string - for _, pod := range pods.Items { - log.Printf("Pod processing: %s", pod.Name) - - role, ok := pod.Labels["spilo-role"] - if ok == false { - log.Println("No spilo-role label") - continue - } - if role == "master" { - masterPodName = pod.Name - log.Printf("Skipping master: %s", masterPodName) - continue - } - - err := z.Clientset.Pods(ns).Delete(pod.Name, &deleteOptions) - if err != nil { - log.Printf("Error while deleting Pod %s.%s: %s", pod.Namespace, pod.Name, err) - } else { - log.Printf("Pod deleted: %s.%s", pod.Namespace, pod.Name) - } - - //TODO: wait until Pod recreated - } - - //TODO: do manual failover - err = z.Clientset.Pods(ns).Delete(masterPodName, &deleteOptions) - if err != nil { - log.Printf("Error while deleting Pod %s.%s: %s", ns, masterPodName, err) - } else { - log.Printf("Pod deleted: %s.%s", ns, masterPodName) - } -} - -func (z *SpiloZooKeeper) podWatcher(stopCh <-chan struct{}) { - watchers := make(map[string]chan podEvent) - for { - select { - case watcher := <-z.podWatchers: - if watcher.subscribe { - watchers[watcher.podName] = watcher.eventsChannel - } else { - close(watcher.eventsChannel) - delete(watchers, watcher.podName) - } - case podEvent := <-z.podEvents: - podChannel, ok := watchers[podEvent.name] - if ok == false { - continue - } - - podChannel <- podEvent - } - } -} diff --git a/testcluster.yaml b/testcluster.yaml index d50366bdd..5e3c95b0f 100644 --- a/testcluster.yaml +++ b/testcluster.yaml @@ -1,16 +1,16 @@ -apiVersion: "acid.zalan.do/v1" -kind: Spilo +apiVersion: "zalan.do/v1" +kind: "Spilo" metadata: name: testcluster spec: + docker_image: registry.opensource.zalan.do/acid/spilo-9.6:1.2-p5 etcd_host: etcd-client.default.svc.cluster.local:2379 volume_size: 100 # GB - resource_cpu: 111m - resource_memory: 222Mi + resource_cpu: 100m + resource_memory: 500Mi number_of_instances: 3 - docker_image: registry.opensource.zalan.do/acid/spilo-9.6:1.2-p5 # put the spilo image here postgres_configuration: - param: "max_connections" value: "10"