some refactoring

This commit is contained in:
Murat Kabilov 2017-01-23 17:10:50 +01:00
parent 256ff37c19
commit d5a7683a38
7 changed files with 200 additions and 178 deletions

View File

@ -4,15 +4,16 @@ import (
"fmt" "fmt"
"log" "log"
"sync" "sync"
"time"
etcdclient "github.com/coreos/etcd/client"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/fields" "k8s.io/client-go/pkg/fields"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
"github.bus.zalan.do/acid/postgres-operator/pkg/etcd"
) )
const ( const (
@ -34,38 +35,39 @@ type podWatcher struct {
subscribe bool subscribe bool
} }
type SpiloSupervisor struct { type SpiloController struct {
podEvents chan podEvent podEvents chan podEvent
podWatchers chan podWatcher podWatchers chan podWatcher
SpiloClient *rest.RESTClient SpiloClient *rest.RESTClient
Clientset *kubernetes.Clientset Clientset *kubernetes.Clientset
etcdApiClient *etcd.EtcdClient
spiloInformer cache.SharedIndexInformer spiloInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer
etcdApiClient etcdclient.KeysAPI
} }
func podsListWatch(client *kubernetes.Clientset) *cache.ListWatch { func podsListWatch(client *kubernetes.Clientset) *cache.ListWatch {
return cache.NewListWatchFromClient(client.Core().RESTClient(), "pods", api.NamespaceAll, fields.Everything()) return cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "pods", api.NamespaceAll, fields.Everything())
} }
func newSupervisor(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset) *SpiloSupervisor { func newController(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset, etcdClient *etcd.EtcdClient) *SpiloController {
spiloSupervisor := &SpiloSupervisor{ spiloController := &SpiloController{
SpiloClient: spiloClient, SpiloClient: spiloClient,
Clientset: clientset, Clientset: clientset,
etcdApiClient: etcdClient,
} }
spiloInformer := cache.NewSharedIndexInformer( spiloInformer := cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(spiloClient, "spilos", api.NamespaceAll, fields.Everything()), cache.NewListWatchFromClient(spiloClient, "spilos", api.NamespaceAll, fields.Everything()),
&Spilo{}, &spec.Spilo{},
resyncPeriod, resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
) )
spiloInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ spiloInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: spiloSupervisor.spiloAdd, AddFunc: spiloController.spiloAdd,
UpdateFunc: spiloSupervisor.spiloUpdate, UpdateFunc: spiloController.spiloUpdate,
DeleteFunc: spiloSupervisor.spiloDelete, DeleteFunc: spiloController.spiloDelete,
}) })
podInformer := cache.NewSharedIndexInformer( podInformer := cache.NewSharedIndexInformer(
@ -76,32 +78,20 @@ func newSupervisor(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset
) )
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: spiloSupervisor.podAdd, AddFunc: spiloController.podAdd,
UpdateFunc: spiloSupervisor.podUpdate, UpdateFunc: spiloController.podUpdate,
DeleteFunc: spiloSupervisor.podDelete, DeleteFunc: spiloController.podDelete,
}) })
spiloSupervisor.spiloInformer = spiloInformer spiloController.spiloInformer = spiloInformer
spiloSupervisor.podInformer = podInformer spiloController.podInformer = podInformer
cfg := etcdclient.Config{ spiloController.podEvents = make(chan podEvent)
Endpoints: []string{etcdHostOutside},
Transport: etcdclient.DefaultTransport,
HeaderTimeoutPerRequest: time.Second,
}
c, err := etcdclient.New(cfg) return spiloController
if err != nil {
log.Fatal(err)
}
spiloSupervisor.etcdApiClient = etcdclient.NewKeysAPI(c)
spiloSupervisor.podEvents = make(chan podEvent)
return spiloSupervisor
} }
func (d *SpiloSupervisor) podAdd(obj interface{}) { func (d *SpiloController) podAdd(obj interface{}) {
pod := obj.(*v1.Pod) pod := obj.(*v1.Pod)
d.podEvents <- podEvent{ d.podEvents <- podEvent{
namespace: pod.Namespace, namespace: pod.Namespace,
@ -110,7 +100,7 @@ func (d *SpiloSupervisor) podAdd(obj interface{}) {
} }
} }
func (d *SpiloSupervisor) podDelete(obj interface{}) { func (d *SpiloController) podDelete(obj interface{}) {
pod := obj.(*v1.Pod) pod := obj.(*v1.Pod)
d.podEvents <- podEvent{ d.podEvents <- podEvent{
namespace: pod.Namespace, namespace: pod.Namespace,
@ -119,7 +109,7 @@ func (d *SpiloSupervisor) podDelete(obj interface{}) {
} }
} }
func (d *SpiloSupervisor) podUpdate(old, cur interface{}) { func (d *SpiloController) podUpdate(old, cur interface{}) {
oldPod := old.(*v1.Pod) oldPod := old.(*v1.Pod)
d.podEvents <- podEvent{ d.podEvents <- podEvent{
namespace: oldPod.Namespace, namespace: oldPod.Namespace,
@ -128,7 +118,7 @@ func (d *SpiloSupervisor) podUpdate(old, cur interface{}) {
} }
} }
func (z *SpiloSupervisor) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (z *SpiloController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
wg.Add(1) wg.Add(1)
@ -143,8 +133,8 @@ func (z *SpiloSupervisor) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
<-stopCh <-stopCh
} }
func (z *SpiloSupervisor) spiloAdd(obj interface{}) { func (z *SpiloController) spiloAdd(obj interface{}) {
spilo := obj.(*Spilo) spilo := obj.(*spec.Spilo)
clusterName := (*spilo).Metadata.Name clusterName := (*spilo).Metadata.Name
ns := (*spilo).Metadata.Namespace ns := (*spilo).Metadata.Namespace
@ -156,9 +146,9 @@ func (z *SpiloSupervisor) spiloAdd(obj interface{}) {
z.CreateStatefulSet(spilo) z.CreateStatefulSet(spilo)
} }
func (z *SpiloSupervisor) spiloUpdate(old, cur interface{}) { func (z *SpiloController) spiloUpdate(old, cur interface{}) {
oldSpilo := old.(*Spilo) oldSpilo := old.(*spec.Spilo)
curSpilo := cur.(*Spilo) curSpilo := cur.(*spec.Spilo)
if oldSpilo.Spec.NumberOfInstances != curSpilo.Spec.NumberOfInstances { if oldSpilo.Spec.NumberOfInstances != curSpilo.Spec.NumberOfInstances {
z.UpdateStatefulSet(curSpilo) z.UpdateStatefulSet(curSpilo)
@ -175,8 +165,8 @@ func (z *SpiloSupervisor) spiloUpdate(old, cur interface{}) {
log.Printf("Update spilo old: %+v\ncurrent: %+v", *oldSpilo, *curSpilo) log.Printf("Update spilo old: %+v\ncurrent: %+v", *oldSpilo, *curSpilo)
} }
func (z *SpiloSupervisor) spiloDelete(obj interface{}) { func (z *SpiloController) spiloDelete(obj interface{}) {
spilo := obj.(*Spilo) spilo := obj.(*spec.Spilo)
err := z.DeleteStatefulSet(spilo.Metadata.Namespace, spilo.Metadata.Name) err := z.DeleteStatefulSet(spilo.Metadata.Namespace, spilo.Metadata.Name)
if err != nil { if err != nil {
@ -184,7 +174,7 @@ func (z *SpiloSupervisor) spiloDelete(obj interface{}) {
} }
} }
func (z *SpiloSupervisor) DeleteStatefulSet(ns, clusterName string) error { func (z *SpiloController) DeleteStatefulSet(ns, clusterName string) error {
orphanDependents := false orphanDependents := false
deleteOptions := v1.DeleteOptions{ deleteOptions := v1.DeleteOptions{
OrphanDependents: &orphanDependents, OrphanDependents: &orphanDependents,
@ -231,12 +221,12 @@ func (z *SpiloSupervisor) DeleteStatefulSet(ns, clusterName string) error {
log.Printf("Service %s.%s has been deleted\n", service.Namespace, service.Name) log.Printf("Service %s.%s has been deleted\n", service.Namespace, service.Name)
} }
z.DeleteEtcdKey(clusterName) z.etcdApiClient.DeleteEtcdKey(clusterName)
return nil return nil
} }
func (z *SpiloSupervisor) UpdateStatefulSet(spilo *Spilo) { func (z *SpiloController) UpdateStatefulSet(spilo *spec.Spilo) {
ns := (*spilo).Metadata.Namespace ns := (*spilo).Metadata.Namespace
statefulSet := z.createSetFromSpilo(spilo) statefulSet := z.createSetFromSpilo(spilo)
@ -247,7 +237,7 @@ func (z *SpiloSupervisor) UpdateStatefulSet(spilo *Spilo) {
} }
} }
func (z *SpiloSupervisor) UpdateStatefulSetImage(spilo *Spilo) { func (z *SpiloController) UpdateStatefulSetImage(spilo *spec.Spilo) {
ns := (*spilo).Metadata.Namespace ns := (*spilo).Metadata.Namespace
z.UpdateStatefulSet(spilo) z.UpdateStatefulSet(spilo)
@ -313,7 +303,7 @@ func (z *SpiloSupervisor) UpdateStatefulSetImage(spilo *Spilo) {
} }
} }
func (z *SpiloSupervisor) podWatcher(stopCh <-chan struct{}) { func (z *SpiloController) podWatcher(stopCh <-chan struct{}) {
//TODO: mind the namespace of the pod //TODO: mind the namespace of the pod
watchers := make(map[string] podWatcher) watchers := make(map[string] podWatcher)

View File

@ -1,27 +0,0 @@
package controller
import (
"fmt"
"github.com/coreos/etcd/client"
"golang.org/x/net/context"
"log"
)
func (z *SpiloSupervisor) DeleteEtcdKey(clusterName string) error {
options := client.DeleteOptions{
Recursive: true,
}
keyName := fmt.Sprintf(etcdKeyTemplate, clusterName)
resp, err := z.etcdApiClient.Delete(context.Background(), keyName, &options)
if resp != nil {
log.Printf("Response: %+v", *resp)
} else {
log.Fatal("No response from etcd")
}
log.Printf("Deleting key %s from ETCD", clusterName)
return err
}

View File

@ -1,14 +1,17 @@
package controller package controller
import ( import (
"log"
"k8s.io/client-go/pkg/api/resource" "k8s.io/client-go/pkg/api/resource"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/apis/apps/v1beta1"
"k8s.io/client-go/pkg/util/intstr" "k8s.io/client-go/pkg/util/intstr"
"log"
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
) )
func (z *SpiloSupervisor) CreateStatefulSet(spilo *Spilo) { func (z *SpiloController) CreateStatefulSet(spilo *spec.Spilo) {
ns := (*spilo).Metadata.Namespace ns := (*spilo).Metadata.Namespace
statefulSet := z.createSetFromSpilo(spilo) statefulSet := z.createSetFromSpilo(spilo)
@ -21,7 +24,7 @@ func (z *SpiloSupervisor) CreateStatefulSet(spilo *Spilo) {
} }
} }
func (z *SpiloSupervisor) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet { func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.StatefulSet {
clusterName := (*spilo).Metadata.Name clusterName := (*spilo).Metadata.Name
envVars := []v1.EnvVar{ envVars := []v1.EnvVar{
@ -166,7 +169,7 @@ func (z *SpiloSupervisor) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet {
} }
} }
func (z *SpiloSupervisor) CreateSecrets(ns, name string) { func (z *SpiloController) CreateSecrets(ns, name string) {
secret := v1.Secret{ secret := v1.Secret{
ObjectMeta: v1.ObjectMeta{ ObjectMeta: v1.ObjectMeta{
Name: name, Name: name,
@ -191,7 +194,7 @@ func (z *SpiloSupervisor) CreateSecrets(ns, name string) {
} }
} }
func (z *SpiloSupervisor) CreateService(ns, name string) { func (z *SpiloController) CreateService(ns, name string) {
service := v1.Service{ service := v1.Service{
ObjectMeta: v1.ObjectMeta{ ObjectMeta: v1.ObjectMeta{
Name: name, Name: name,
@ -214,7 +217,7 @@ func (z *SpiloSupervisor) CreateService(ns, name string) {
} }
} }
func (z *SpiloSupervisor) CreateEndPoint(ns, name string) { func (z *SpiloController) CreateEndPoint(ns, name string) {
endPoint := v1.Endpoints{ endPoint := v1.Endpoints{
ObjectMeta: v1.ObjectMeta{ ObjectMeta: v1.ObjectMeta{
Name: name, Name: name,

View File

@ -17,47 +17,18 @@ import (
"k8s.io/client-go/pkg/runtime/serializer" "k8s.io/client-go/pkg/runtime/serializer"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
) )
var ( var (
etcdHostOutside string
VENDOR = "acid.zalan.do" VENDOR = "acid.zalan.do"
VERSION = "0.0.1.dev" VERSION = "0.0.1.dev"
resyncPeriod = 5 * time.Minute resyncPeriod = 5 * time.Minute
etcdKeyTemplate = "/service/%s"
) )
type Options struct { type Options struct {
KubeConfig string KubeConfig string
}
type Pgconf struct {
Parameter string `json:"param"`
Value string `json:"value"`
}
type SpiloSpec struct {
EtcdHost string `json:"etcd_host"`
VolumeSize int `json:"volume_size"`
NumberOfInstances int32 `json:"number_of_instances"`
DockerImage string `json:"docker_image"`
PostgresConfiguration []Pgconf `json:"postgres_configuration"`
ResourceCPU string `json:"resource_cpu"`
ResourceMemory string `json:"resource_memory"`
}
type Spilo struct {
unversioned.TypeMeta `json:",inline"`
Metadata api.ObjectMeta `json:"metadata"`
Spec SpiloSpec `json:"spec"`
}
type SpiloList struct {
unversioned.TypeMeta `json:",inline"`
Metadata unversioned.ListMeta `json:"metadata"`
Items []Spilo `json:"items"`
} }
func KubernetesConfig(options Options) *rest.Config { func KubernetesConfig(options Options) *rest.Config {
@ -70,8 +41,6 @@ func KubernetesConfig(options Options) *rest.Config {
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig()
etcdHostOutside = config.Host
if err != nil { if err != nil {
log.Fatalf("Couldn't get Kubernetes default config: %s", err) log.Fatalf("Couldn't get Kubernetes default config: %s", err)
} }
@ -91,8 +60,8 @@ func newKubernetesSpiloClient(c *rest.Config) (*rest.RESTClient, error) {
func(scheme *runtime.Scheme) error { func(scheme *runtime.Scheme) error {
scheme.AddKnownTypes( scheme.AddKnownTypes(
*c.GroupVersion, *c.GroupVersion,
&Spilo{}, &spec.Spilo{},
&SpiloList{}, &spec.SpiloList{},
&api.ListOptions{}, &api.ListOptions{},
&api.DeleteOptions{}, &api.DeleteOptions{},
) )

View File

@ -1,17 +1,16 @@
package controller package controller
import ( import (
"encoding/json"
"log" "log"
"sync" "sync"
"net/url"
"fmt"
"strings"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/meta"
"k8s.io/client-go/pkg/api/unversioned"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"net/url"
"fmt" "github.bus.zalan.do/acid/postgres-operator/pkg/etcd"
"strings"
) )
type SpiloOperator struct { type SpiloOperator struct {
@ -19,7 +18,8 @@ type SpiloOperator struct {
ClientSet *kubernetes.Clientset ClientSet *kubernetes.Clientset
Client *rest.RESTClient Client *rest.RESTClient
Supervisor *SpiloSupervisor Controller *SpiloController
EtcdClient *etcd.EtcdClient
} }
func New(options Options) *SpiloOperator { func New(options Options) *SpiloOperator {
@ -36,18 +36,20 @@ func New(options Options) *SpiloOperator {
} }
ports := etcdService.Spec.Ports[0] ports := etcdService.Spec.Ports[0]
nodeurl, _ := url.Parse(config.Host) nodeurl, _ := url.Parse(config.Host)
etcdHostOutside = fmt.Sprintf("http://%s:%d", strings.Split(nodeurl.Host, ":")[0], ports.NodePort) etcdHostOutside := fmt.Sprintf("http://%s:%d", strings.Split(nodeurl.Host, ":")[0], ports.NodePort)
spiloClient, err := newKubernetesSpiloClient(config) spiloClient, err := newKubernetesSpiloClient(config)
if err != nil { if err != nil {
log.Fatalf("Couldn't create Spilo client: %s", err) log.Fatalf("Couldn't create Spilo client: %s", err)
} }
etcdClient := etcd.NewEctdClient(etcdHostOutside)
operator := &SpiloOperator{ operator := &SpiloOperator{
Options: options, Options: options,
ClientSet: clientSet, ClientSet: clientSet,
Client: spiloClient, Client: spiloClient,
Supervisor: newSupervisor(spiloClient, clientSet), Controller: newController(spiloClient, clientSet, etcdClient),
} }
return operator return operator
@ -56,54 +58,7 @@ func New(options Options) *SpiloOperator {
func (o *SpiloOperator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (o *SpiloOperator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
log.Printf("Spilo operator %v\n", VERSION) log.Printf("Spilo operator %v\n", VERSION)
go o.Supervisor.Run(stopCh, wg) go o.Controller.Run(stopCh, wg)
log.Println("Started working in background") log.Println("Started working in background")
} }
// The code below is used only to work around a known problem with third-party
// resources and ugorji. If/when these issues are resolved, the code below
// should no longer be required.
//
func (s *Spilo) GetObjectKind() unversioned.ObjectKind {
return &s.TypeMeta
}
func (s *Spilo) GetObjectMeta() meta.Object {
return &s.Metadata
}
func (sl *SpiloList) GetObjectKind() unversioned.ObjectKind {
return &sl.TypeMeta
}
func (sl *SpiloList) GetListMeta() unversioned.List {
return &sl.Metadata
}
type SpiloListCopy SpiloList
type SpiloCopy Spilo
func (e *Spilo) UnmarshalJSON(data []byte) error {
tmp := SpiloCopy{}
err := json.Unmarshal(data, &tmp)
if err != nil {
return err
}
tmp2 := Spilo(tmp)
*e = tmp2
return nil
}
func (el *SpiloList) UnmarshalJSON(data []byte) error {
tmp := SpiloListCopy{}
err := json.Unmarshal(data, &tmp)
if err != nil {
return err
}
tmp2 := SpiloList(tmp)
*el = tmp2
return nil
}

54
pkg/etcd/etcd.go Normal file
View File

@ -0,0 +1,54 @@
package etcd
import (
"fmt"
"github.com/coreos/etcd/client"
"golang.org/x/net/context"
"log"
"time"
)
const etcdKeyTemplate = "/service/%s"
type EtcdClient struct {
apiClient client.KeysAPI
}
func NewEctdClient(host string) *EtcdClient {
etcdClient := EtcdClient{}
cfg := client.Config{
Endpoints: []string{host},
Transport: client.DefaultTransport,
HeaderTimeoutPerRequest: time.Second,
}
c, err := client.New(cfg)
if err != nil {
log.Fatal(err)
}
etcdClient.apiClient = client.NewKeysAPI(c)
return &etcdClient
}
func (c *EtcdClient) DeleteEtcdKey(clusterName string) error {
options := client.DeleteOptions{
Recursive: true,
}
keyName := fmt.Sprintf(etcdKeyTemplate, clusterName)
resp, err := c.apiClient.Delete(context.Background(), keyName, &options)
if resp != nil {
log.Printf("Response: %+v", *resp)
} else {
log.Fatal("No response from etcd")
}
log.Printf("Deleting key %s from ETCD", clusterName)
return err
}

78
pkg/spec/spilo.go Normal file
View File

@ -0,0 +1,78 @@
package spec
import(
"encoding/json"
"k8s.io/client-go/pkg/api/meta"
"k8s.io/client-go/pkg/api/unversioned"
"k8s.io/client-go/pkg/api"
)
type Pgconf struct {
Parameter string `json:"param"`
Value string `json:"value"`
}
type SpiloSpec struct {
EtcdHost string `json:"etcd_host"`
VolumeSize int `json:"volume_size"`
NumberOfInstances int32 `json:"number_of_instances"`
DockerImage string `json:"docker_image"`
PostgresConfiguration []Pgconf `json:"postgres_configuration"`
ResourceCPU string `json:"resource_cpu"`
ResourceMemory string `json:"resource_memory"`
}
type Spilo struct {
unversioned.TypeMeta `json:",inline"`
Metadata api.ObjectMeta `json:"metadata"`
Spec SpiloSpec `json:"spec"`
}
type SpiloList struct {
unversioned.TypeMeta `json:",inline"`
Metadata unversioned.ListMeta `json:"metadata"`
Items []Spilo `json:"items"`
}
func (s *Spilo) GetObjectKind() unversioned.ObjectKind {
return &s.TypeMeta
}
func (s *Spilo) GetObjectMeta() meta.Object {
return &s.Metadata
}
func (sl *SpiloList) GetObjectKind() unversioned.ObjectKind {
return &sl.TypeMeta
}
func (sl *SpiloList) GetListMeta() unversioned.List {
return &sl.Metadata
}
type SpiloListCopy SpiloList
type SpiloCopy Spilo
func (e *Spilo) UnmarshalJSON(data []byte) error {
tmp := SpiloCopy{}
err := json.Unmarshal(data, &tmp)
if err != nil {
return err
}
tmp2 := Spilo(tmp)
*e = tmp2
return nil
}
func (el *SpiloList) UnmarshalJSON(data []byte) error {
tmp := SpiloListCopy{}
err := json.Unmarshal(data, &tmp)
if err != nil {
return err
}
tmp2 := SpiloList(tmp)
*el = tmp2
return nil
}