diff --git a/pkg/controller/supervisor.go b/pkg/controller/controller.go similarity index 76% rename from pkg/controller/supervisor.go rename to pkg/controller/controller.go index cd8e4455b..9bfd2a20f 100644 --- a/pkg/controller/supervisor.go +++ b/pkg/controller/controller.go @@ -4,15 +4,16 @@ import ( "fmt" "log" "sync" - "time" - etcdclient "github.com/coreos/etcd/client" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/fields" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/etcd" ) const ( @@ -34,38 +35,39 @@ type podWatcher struct { subscribe bool } -type SpiloSupervisor struct { - podEvents chan podEvent - podWatchers chan podWatcher - SpiloClient *rest.RESTClient - Clientset *kubernetes.Clientset +type SpiloController struct { + podEvents chan podEvent + podWatchers chan podWatcher + SpiloClient *rest.RESTClient + Clientset *kubernetes.Clientset + etcdApiClient *etcd.EtcdClient spiloInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer - etcdApiClient etcdclient.KeysAPI } func podsListWatch(client *kubernetes.Clientset) *cache.ListWatch { - return cache.NewListWatchFromClient(client.Core().RESTClient(), "pods", api.NamespaceAll, fields.Everything()) + return cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "pods", api.NamespaceAll, fields.Everything()) } -func newSupervisor(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset) *SpiloSupervisor { - spiloSupervisor := &SpiloSupervisor{ - SpiloClient: spiloClient, - Clientset: clientset, +func newController(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset, etcdClient *etcd.EtcdClient) *SpiloController { + spiloController := &SpiloController{ + SpiloClient: spiloClient, + Clientset: clientset, + etcdApiClient: etcdClient, } spiloInformer := cache.NewSharedIndexInformer( cache.NewListWatchFromClient(spiloClient, "spilos", api.NamespaceAll, fields.Everything()), - &Spilo{}, + &spec.Spilo{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) spiloInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: spiloSupervisor.spiloAdd, - UpdateFunc: spiloSupervisor.spiloUpdate, - DeleteFunc: spiloSupervisor.spiloDelete, + AddFunc: spiloController.spiloAdd, + UpdateFunc: spiloController.spiloUpdate, + DeleteFunc: spiloController.spiloDelete, }) podInformer := cache.NewSharedIndexInformer( @@ -76,32 +78,20 @@ func newSupervisor(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset ) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: spiloSupervisor.podAdd, - UpdateFunc: spiloSupervisor.podUpdate, - DeleteFunc: spiloSupervisor.podDelete, + AddFunc: spiloController.podAdd, + UpdateFunc: spiloController.podUpdate, + DeleteFunc: spiloController.podDelete, }) - spiloSupervisor.spiloInformer = spiloInformer - spiloSupervisor.podInformer = podInformer + spiloController.spiloInformer = spiloInformer + spiloController.podInformer = podInformer - cfg := etcdclient.Config{ - Endpoints: []string{etcdHostOutside}, - Transport: etcdclient.DefaultTransport, - HeaderTimeoutPerRequest: time.Second, - } + spiloController.podEvents = make(chan podEvent) - c, err := etcdclient.New(cfg) - if err != nil { - log.Fatal(err) - } - - spiloSupervisor.etcdApiClient = etcdclient.NewKeysAPI(c) - spiloSupervisor.podEvents = make(chan podEvent) - - return spiloSupervisor + return spiloController } -func (d *SpiloSupervisor) podAdd(obj interface{}) { +func (d *SpiloController) podAdd(obj interface{}) { pod := obj.(*v1.Pod) d.podEvents <- podEvent{ namespace: pod.Namespace, @@ -110,7 +100,7 @@ func (d *SpiloSupervisor) podAdd(obj interface{}) { } } -func (d *SpiloSupervisor) podDelete(obj interface{}) { +func (d *SpiloController) podDelete(obj interface{}) { pod := obj.(*v1.Pod) d.podEvents <- podEvent{ namespace: pod.Namespace, @@ -119,7 +109,7 @@ func (d *SpiloSupervisor) podDelete(obj interface{}) { } } -func (d *SpiloSupervisor) podUpdate(old, cur interface{}) { +func (d *SpiloController) podUpdate(old, cur interface{}) { oldPod := old.(*v1.Pod) d.podEvents <- podEvent{ namespace: oldPod.Namespace, @@ -128,7 +118,7 @@ func (d *SpiloSupervisor) podUpdate(old, cur interface{}) { } } -func (z *SpiloSupervisor) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { +func (z *SpiloController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() wg.Add(1) @@ -143,8 +133,8 @@ func (z *SpiloSupervisor) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { <-stopCh } -func (z *SpiloSupervisor) spiloAdd(obj interface{}) { - spilo := obj.(*Spilo) +func (z *SpiloController) spiloAdd(obj interface{}) { + spilo := obj.(*spec.Spilo) clusterName := (*spilo).Metadata.Name ns := (*spilo).Metadata.Namespace @@ -156,9 +146,9 @@ func (z *SpiloSupervisor) spiloAdd(obj interface{}) { z.CreateStatefulSet(spilo) } -func (z *SpiloSupervisor) spiloUpdate(old, cur interface{}) { - oldSpilo := old.(*Spilo) - curSpilo := cur.(*Spilo) +func (z *SpiloController) spiloUpdate(old, cur interface{}) { + oldSpilo := old.(*spec.Spilo) + curSpilo := cur.(*spec.Spilo) if oldSpilo.Spec.NumberOfInstances != curSpilo.Spec.NumberOfInstances { z.UpdateStatefulSet(curSpilo) @@ -175,8 +165,8 @@ func (z *SpiloSupervisor) spiloUpdate(old, cur interface{}) { log.Printf("Update spilo old: %+v\ncurrent: %+v", *oldSpilo, *curSpilo) } -func (z *SpiloSupervisor) spiloDelete(obj interface{}) { - spilo := obj.(*Spilo) +func (z *SpiloController) spiloDelete(obj interface{}) { + spilo := obj.(*spec.Spilo) err := z.DeleteStatefulSet(spilo.Metadata.Namespace, spilo.Metadata.Name) if err != nil { @@ -184,7 +174,7 @@ func (z *SpiloSupervisor) spiloDelete(obj interface{}) { } } -func (z *SpiloSupervisor) DeleteStatefulSet(ns, clusterName string) error { +func (z *SpiloController) DeleteStatefulSet(ns, clusterName string) error { orphanDependents := false deleteOptions := v1.DeleteOptions{ OrphanDependents: &orphanDependents, @@ -231,12 +221,12 @@ func (z *SpiloSupervisor) DeleteStatefulSet(ns, clusterName string) error { log.Printf("Service %s.%s has been deleted\n", service.Namespace, service.Name) } - z.DeleteEtcdKey(clusterName) + z.etcdApiClient.DeleteEtcdKey(clusterName) return nil } -func (z *SpiloSupervisor) UpdateStatefulSet(spilo *Spilo) { +func (z *SpiloController) UpdateStatefulSet(spilo *spec.Spilo) { ns := (*spilo).Metadata.Namespace statefulSet := z.createSetFromSpilo(spilo) @@ -247,7 +237,7 @@ func (z *SpiloSupervisor) UpdateStatefulSet(spilo *Spilo) { } } -func (z *SpiloSupervisor) UpdateStatefulSetImage(spilo *Spilo) { +func (z *SpiloController) UpdateStatefulSetImage(spilo *spec.Spilo) { ns := (*spilo).Metadata.Namespace z.UpdateStatefulSet(spilo) @@ -313,7 +303,7 @@ func (z *SpiloSupervisor) UpdateStatefulSetImage(spilo *Spilo) { } } -func (z *SpiloSupervisor) podWatcher(stopCh <-chan struct{}) { +func (z *SpiloController) podWatcher(stopCh <-chan struct{}) { //TODO: mind the namespace of the pod watchers := make(map[string] podWatcher) diff --git a/pkg/controller/etcd.go b/pkg/controller/etcd.go deleted file mode 100644 index 452f72218..000000000 --- a/pkg/controller/etcd.go +++ /dev/null @@ -1,27 +0,0 @@ -package controller - -import ( - "fmt" - "github.com/coreos/etcd/client" - "golang.org/x/net/context" - "log" -) - -func (z *SpiloSupervisor) DeleteEtcdKey(clusterName string) error { - options := client.DeleteOptions{ - Recursive: true, - } - - keyName := fmt.Sprintf(etcdKeyTemplate, clusterName) - - resp, err := z.etcdApiClient.Delete(context.Background(), keyName, &options) - if resp != nil { - log.Printf("Response: %+v", *resp) - } else { - log.Fatal("No response from etcd") - } - - log.Printf("Deleting key %s from ETCD", clusterName) - - return err -} diff --git a/pkg/controller/objects.go b/pkg/controller/objects.go index cbaf6f640..186078c48 100644 --- a/pkg/controller/objects.go +++ b/pkg/controller/objects.go @@ -1,14 +1,17 @@ package controller import ( + "log" + "k8s.io/client-go/pkg/api/resource" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/util/intstr" - "log" + + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" ) -func (z *SpiloSupervisor) CreateStatefulSet(spilo *Spilo) { +func (z *SpiloController) CreateStatefulSet(spilo *spec.Spilo) { ns := (*spilo).Metadata.Namespace statefulSet := z.createSetFromSpilo(spilo) @@ -21,7 +24,7 @@ func (z *SpiloSupervisor) CreateStatefulSet(spilo *Spilo) { } } -func (z *SpiloSupervisor) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet { +func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.StatefulSet { clusterName := (*spilo).Metadata.Name envVars := []v1.EnvVar{ @@ -166,7 +169,7 @@ func (z *SpiloSupervisor) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet { } } -func (z *SpiloSupervisor) CreateSecrets(ns, name string) { +func (z *SpiloController) CreateSecrets(ns, name string) { secret := v1.Secret{ ObjectMeta: v1.ObjectMeta{ Name: name, @@ -191,7 +194,7 @@ func (z *SpiloSupervisor) CreateSecrets(ns, name string) { } } -func (z *SpiloSupervisor) CreateService(ns, name string) { +func (z *SpiloController) CreateService(ns, name string) { service := v1.Service{ ObjectMeta: v1.ObjectMeta{ Name: name, @@ -214,7 +217,7 @@ func (z *SpiloSupervisor) CreateService(ns, name string) { } } -func (z *SpiloSupervisor) CreateEndPoint(ns, name string) { +func (z *SpiloController) CreateEndPoint(ns, name string) { endPoint := v1.Endpoints{ ObjectMeta: v1.ObjectMeta{ Name: name, diff --git a/pkg/controller/operator.go b/pkg/controller/operator.go index e7572b8e1..5ecb75cdb 100644 --- a/pkg/controller/operator.go +++ b/pkg/controller/operator.go @@ -17,47 +17,18 @@ import ( "k8s.io/client-go/pkg/runtime/serializer" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" ) var ( - etcdHostOutside string - VENDOR = "acid.zalan.do" VERSION = "0.0.1.dev" resyncPeriod = 5 * time.Minute - - etcdKeyTemplate = "/service/%s" ) type Options struct { - KubeConfig string -} - -type Pgconf struct { - Parameter string `json:"param"` - Value string `json:"value"` -} - -type SpiloSpec struct { - EtcdHost string `json:"etcd_host"` - VolumeSize int `json:"volume_size"` - NumberOfInstances int32 `json:"number_of_instances"` - DockerImage string `json:"docker_image"` - PostgresConfiguration []Pgconf `json:"postgres_configuration"` - ResourceCPU string `json:"resource_cpu"` - ResourceMemory string `json:"resource_memory"` -} - -type Spilo struct { - unversioned.TypeMeta `json:",inline"` - Metadata api.ObjectMeta `json:"metadata"` - Spec SpiloSpec `json:"spec"` -} - -type SpiloList struct { - unversioned.TypeMeta `json:",inline"` - Metadata unversioned.ListMeta `json:"metadata"` - Items []Spilo `json:"items"` + KubeConfig string } func KubernetesConfig(options Options) *rest.Config { @@ -70,8 +41,6 @@ func KubernetesConfig(options Options) *rest.Config { config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() - etcdHostOutside = config.Host - if err != nil { log.Fatalf("Couldn't get Kubernetes default config: %s", err) } @@ -91,8 +60,8 @@ func newKubernetesSpiloClient(c *rest.Config) (*rest.RESTClient, error) { func(scheme *runtime.Scheme) error { scheme.AddKnownTypes( *c.GroupVersion, - &Spilo{}, - &SpiloList{}, + &spec.Spilo{}, + &spec.SpiloList{}, &api.ListOptions{}, &api.DeleteOptions{}, ) diff --git a/pkg/controller/spilo.go b/pkg/controller/spilo.go index 3b226956c..ec40f4acc 100644 --- a/pkg/controller/spilo.go +++ b/pkg/controller/spilo.go @@ -1,17 +1,16 @@ package controller import ( - "encoding/json" "log" "sync" + "net/url" + "fmt" + "strings" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/pkg/api/meta" - "k8s.io/client-go/pkg/api/unversioned" "k8s.io/client-go/rest" - "net/url" - "fmt" - "strings" + + "github.bus.zalan.do/acid/postgres-operator/pkg/etcd" ) type SpiloOperator struct { @@ -19,7 +18,8 @@ type SpiloOperator struct { ClientSet *kubernetes.Clientset Client *rest.RESTClient - Supervisor *SpiloSupervisor + Controller *SpiloController + EtcdClient *etcd.EtcdClient } func New(options Options) *SpiloOperator { @@ -36,18 +36,20 @@ func New(options Options) *SpiloOperator { } ports := etcdService.Spec.Ports[0] nodeurl, _ := url.Parse(config.Host) - etcdHostOutside = fmt.Sprintf("http://%s:%d", strings.Split(nodeurl.Host, ":")[0], ports.NodePort) + etcdHostOutside := fmt.Sprintf("http://%s:%d", strings.Split(nodeurl.Host, ":")[0], ports.NodePort) spiloClient, err := newKubernetesSpiloClient(config) if err != nil { log.Fatalf("Couldn't create Spilo client: %s", err) } + etcdClient := etcd.NewEctdClient(etcdHostOutside) + operator := &SpiloOperator{ Options: options, ClientSet: clientSet, Client: spiloClient, - Supervisor: newSupervisor(spiloClient, clientSet), + Controller: newController(spiloClient, clientSet, etcdClient), } return operator @@ -56,54 +58,7 @@ func New(options Options) *SpiloOperator { func (o *SpiloOperator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { log.Printf("Spilo operator %v\n", VERSION) - go o.Supervisor.Run(stopCh, wg) + go o.Controller.Run(stopCh, wg) log.Println("Started working in background") } - -// The code below is used only to work around a known problem with third-party -// resources and ugorji. If/when these issues are resolved, the code below -// should no longer be required. -// - -func (s *Spilo) GetObjectKind() unversioned.ObjectKind { - return &s.TypeMeta -} - -func (s *Spilo) GetObjectMeta() meta.Object { - return &s.Metadata -} -func (sl *SpiloList) GetObjectKind() unversioned.ObjectKind { - return &sl.TypeMeta -} - -func (sl *SpiloList) GetListMeta() unversioned.List { - return &sl.Metadata -} - -type SpiloListCopy SpiloList -type SpiloCopy Spilo - -func (e *Spilo) UnmarshalJSON(data []byte) error { - tmp := SpiloCopy{} - err := json.Unmarshal(data, &tmp) - if err != nil { - return err - } - tmp2 := Spilo(tmp) - *e = tmp2 - - return nil -} - -func (el *SpiloList) UnmarshalJSON(data []byte) error { - tmp := SpiloListCopy{} - err := json.Unmarshal(data, &tmp) - if err != nil { - return err - } - tmp2 := SpiloList(tmp) - *el = tmp2 - - return nil -} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go new file mode 100644 index 000000000..e51016d91 --- /dev/null +++ b/pkg/etcd/etcd.go @@ -0,0 +1,54 @@ +package etcd + +import ( + "fmt" + "github.com/coreos/etcd/client" + "golang.org/x/net/context" + "log" + "time" +) + +const etcdKeyTemplate = "/service/%s" + +type EtcdClient struct { + apiClient client.KeysAPI +} + +func NewEctdClient(host string) *EtcdClient { + etcdClient := EtcdClient{} + + cfg := client.Config{ + Endpoints: []string{host}, + Transport: client.DefaultTransport, + HeaderTimeoutPerRequest: time.Second, + } + + c, err := client.New(cfg) + if err != nil { + log.Fatal(err) + } + + etcdClient.apiClient = client.NewKeysAPI(c) + + return &etcdClient +} + + +func (c *EtcdClient) DeleteEtcdKey(clusterName string) error { + options := client.DeleteOptions{ + Recursive: true, + } + + keyName := fmt.Sprintf(etcdKeyTemplate, clusterName) + + resp, err := c.apiClient.Delete(context.Background(), keyName, &options) + if resp != nil { + log.Printf("Response: %+v", *resp) + } else { + log.Fatal("No response from etcd") + } + + log.Printf("Deleting key %s from ETCD", clusterName) + + return err +} diff --git a/pkg/spec/spilo.go b/pkg/spec/spilo.go new file mode 100644 index 000000000..3d372f301 --- /dev/null +++ b/pkg/spec/spilo.go @@ -0,0 +1,78 @@ +package spec + +import( + "encoding/json" + + "k8s.io/client-go/pkg/api/meta" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/pkg/api" +) + +type Pgconf struct { + Parameter string `json:"param"` + Value string `json:"value"` +} + +type SpiloSpec struct { + EtcdHost string `json:"etcd_host"` + VolumeSize int `json:"volume_size"` + NumberOfInstances int32 `json:"number_of_instances"` + DockerImage string `json:"docker_image"` + PostgresConfiguration []Pgconf `json:"postgres_configuration"` + ResourceCPU string `json:"resource_cpu"` + ResourceMemory string `json:"resource_memory"` +} + +type Spilo struct { + unversioned.TypeMeta `json:",inline"` + Metadata api.ObjectMeta `json:"metadata"` + Spec SpiloSpec `json:"spec"` +} + +type SpiloList struct { + unversioned.TypeMeta `json:",inline"` + Metadata unversioned.ListMeta `json:"metadata"` + Items []Spilo `json:"items"` +} + +func (s *Spilo) GetObjectKind() unversioned.ObjectKind { + return &s.TypeMeta +} + +func (s *Spilo) GetObjectMeta() meta.Object { + return &s.Metadata +} +func (sl *SpiloList) GetObjectKind() unversioned.ObjectKind { + return &sl.TypeMeta +} + +func (sl *SpiloList) GetListMeta() unversioned.List { + return &sl.Metadata +} + +type SpiloListCopy SpiloList +type SpiloCopy Spilo + +func (e *Spilo) UnmarshalJSON(data []byte) error { + tmp := SpiloCopy{} + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + tmp2 := Spilo(tmp) + *e = tmp2 + + return nil +} + +func (el *SpiloList) UnmarshalJSON(data []byte) error { + tmp := SpiloListCopy{} + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + tmp2 := SpiloList(tmp) + *el = tmp2 + + return nil +}