proper names small fixes

This commit is contained in:
Murat Kabilov 2017-01-23 10:11:31 +01:00
parent 8f41c42359
commit c299f2f143
6 changed files with 30 additions and 338 deletions

2
.gitignore vendored
View File

@ -22,4 +22,4 @@ _testmain.go
*.exe *.exe
*.test *.test
*.prof *.prof
/vendor/ /vendor/

View File

@ -7,7 +7,7 @@ import (
"log" "log"
) )
func (z *SpiloZooKeeper) DeleteEtcdKey(clusterName string) error { func (z *SpiloSupervisor) DeleteEtcdKey(clusterName string) error {
options := client.DeleteOptions{ options := client.DeleteOptions{
Recursive: true, Recursive: true,
} }

View File

@ -8,7 +8,7 @@ import (
"log" "log"
) )
func (z *SpiloZooKeeper) CreateStatefulSet(spilo *Spilo) { func (z *SpiloSupervisor) CreateStatefulSet(spilo *Spilo) {
ns := (*spilo).Metadata.Namespace ns := (*spilo).Metadata.Namespace
statefulSet := z.createSetFromSpilo(spilo) statefulSet := z.createSetFromSpilo(spilo)
@ -21,7 +21,7 @@ func (z *SpiloZooKeeper) CreateStatefulSet(spilo *Spilo) {
} }
} }
func (z *SpiloZooKeeper) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet { func (z *SpiloSupervisor) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet {
clusterName := (*spilo).Metadata.Name clusterName := (*spilo).Metadata.Name
envVars := []v1.EnvVar{ envVars := []v1.EnvVar{
@ -126,7 +126,7 @@ func (z *SpiloZooKeeper) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet {
Env: envVars, Env: envVars,
} }
terminateGracePeriodSeconds := int64(0) terminateGracePeriodSeconds := int64(30)
podSpec := v1.PodSpec{ podSpec := v1.PodSpec{
TerminationGracePeriodSeconds: &terminateGracePeriodSeconds, TerminationGracePeriodSeconds: &terminateGracePeriodSeconds,
@ -166,7 +166,7 @@ func (z *SpiloZooKeeper) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet {
} }
} }
func (z *SpiloZooKeeper) CreateSecrets(ns, name string) { func (z *SpiloSupervisor) CreateSecrets(ns, name string) {
secret := v1.Secret{ secret := v1.Secret{
ObjectMeta: v1.ObjectMeta{ ObjectMeta: v1.ObjectMeta{
Name: name, Name: name,
@ -191,7 +191,7 @@ func (z *SpiloZooKeeper) CreateSecrets(ns, name string) {
} }
} }
func (z *SpiloZooKeeper) CreateService(ns, name string) { func (z *SpiloSupervisor) CreateService(ns, name string) {
service := v1.Service{ service := v1.Service{
ObjectMeta: v1.ObjectMeta{ ObjectMeta: v1.ObjectMeta{
Name: name, Name: name,
@ -214,7 +214,7 @@ func (z *SpiloZooKeeper) CreateService(ns, name string) {
} }
} }
func (z *SpiloZooKeeper) CreateEndPoint(ns, name string) { func (z *SpiloSupervisor) CreateEndPoint(ns, name string) {
endPoint := v1.Endpoints{ endPoint := v1.Endpoints{
ObjectMeta: v1.ObjectMeta{ ObjectMeta: v1.ObjectMeta{
Name: name, Name: name,

View File

@ -9,15 +9,17 @@ import (
"k8s.io/client-go/pkg/api/meta" "k8s.io/client-go/pkg/api/meta"
"k8s.io/client-go/pkg/api/unversioned" "k8s.io/client-go/pkg/api/unversioned"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"net/url"
"fmt"
"strings"
) )
type SpiloOperator struct { type SpiloOperator struct {
Options Options
ClientSet *kubernetes.Clientset ClientSet *kubernetes.Clientset
SpiloClient *rest.RESTClient Client *rest.RESTClient
Supervisor *SpiloSupervisor
SpiloZooKeeper *SpiloZooKeeper
} }
func New(options Options) *SpiloOperator { func New(options Options) *SpiloOperator {
@ -28,6 +30,14 @@ func New(options Options) *SpiloOperator {
log.Fatalf("Couldn't create Kubernetes client: %s", err) log.Fatalf("Couldn't create Kubernetes client: %s", err)
} }
etcdService, _ := clientSet.Services("default").Get("etcd-client")
if len(etcdService.Spec.Ports) != 1 {
log.Fatalln("Can't find Etcd cluster")
}
ports := etcdService.Spec.Ports[0]
nodeurl, _ := url.Parse(config.Host)
etcdHostOutside = fmt.Sprintf("http://%s:%d", strings.Split(nodeurl.Host, ":")[0], ports.NodePort)
spiloClient, err := newKubernetesSpiloClient(config) spiloClient, err := newKubernetesSpiloClient(config)
if err != nil { if err != nil {
log.Fatalf("Couldn't create Spilo client: %s", err) log.Fatalf("Couldn't create Spilo client: %s", err)
@ -36,8 +46,8 @@ func New(options Options) *SpiloOperator {
operator := &SpiloOperator{ operator := &SpiloOperator{
Options: options, Options: options,
ClientSet: clientSet, ClientSet: clientSet,
SpiloClient: spiloClient, Client: spiloClient,
SpiloZooKeeper: newZookeeper(spiloClient, clientSet), Supervisor: newSupervisor(spiloClient, clientSet),
} }
return operator return operator
@ -46,7 +56,7 @@ func New(options Options) *SpiloOperator {
func (o *SpiloOperator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (o *SpiloOperator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
log.Printf("Spilo operator %v\n", VERSION) log.Printf("Spilo operator %v\n", VERSION)
go o.SpiloZooKeeper.Run(stopCh, wg) go o.Supervisor.Run(stopCh, wg)
log.Println("Started working in background") log.Println("Started working in background")
} }

View File

@ -1,318 +0,0 @@
package operator
import (
"fmt"
"log"
"sync"
"time"
etcdclient "github.com/coreos/etcd/client"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
const (
ACTION_DELETE = "delete"
ACTION_UPDATE = "update"
ACTION_ADD = "add"
)
type podEvent struct {
namespace string
name string
actionType string
}
type podWatcher struct {
podNamespace string
podName string
eventsChannel chan podEvent
subscribe bool
}
type SpiloZooKeeper struct {
podEvents chan podEvent
podWatchers chan podWatcher
SpiloClient *rest.RESTClient
Clientset *kubernetes.Clientset
spiloInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
etcdApiClient etcdclient.KeysAPI
}
func podsListWatch(client *kubernetes.Clientset) *cache.ListWatch {
return cache.NewListWatchFromClient(client.Core().RESTClient(), "pods", api.NamespaceAll, fields.Everything())
}
func newZookeeper(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset) *SpiloZooKeeper {
spiloZooKeeper := &SpiloZooKeeper{
SpiloClient: spiloClient,
Clientset: clientset,
}
spiloInformer := cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(spiloClient, "spilos", api.NamespaceAll, fields.Everything()),
&Spilo{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
spiloInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: spiloZooKeeper.spiloAdd,
UpdateFunc: spiloZooKeeper.spiloUpdate,
DeleteFunc: spiloZooKeeper.spiloDelete,
})
podInformer := cache.NewSharedIndexInformer(
podsListWatch(clientset),
&v1.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: spiloZooKeeper.podAdd,
UpdateFunc: spiloZooKeeper.podUpdate,
DeleteFunc: spiloZooKeeper.podDelete,
})
spiloZooKeeper.spiloInformer = spiloInformer
spiloZooKeeper.podInformer = podInformer
cfg := etcdclient.Config{
Endpoints: []string{etcdHostOutside},
Transport: etcdclient.DefaultTransport,
HeaderTimeoutPerRequest: time.Second,
}
c, err := etcdclient.New(cfg)
if err != nil {
log.Fatal(err)
}
spiloZooKeeper.etcdApiClient = etcdclient.NewKeysAPI(c)
spiloZooKeeper.podEvents = make(chan podEvent)
return spiloZooKeeper
}
func (d *SpiloZooKeeper) podAdd(obj interface{}) {
pod := obj.(*v1.Pod)
d.podEvents <- podEvent{
namespace: pod.Namespace,
name: pod.Name,
actionType: ACTION_ADD,
}
}
func (d *SpiloZooKeeper) podDelete(obj interface{}) {
pod := obj.(*v1.Pod)
d.podEvents <- podEvent{
namespace: pod.Namespace,
name: pod.Name,
actionType: ACTION_DELETE,
}
}
func (d *SpiloZooKeeper) podUpdate(old, cur interface{}) {
oldPod := old.(*v1.Pod)
d.podEvents <- podEvent{
namespace: oldPod.Namespace,
name: oldPod.Name,
actionType: ACTION_UPDATE,
}
}
func (z *SpiloZooKeeper) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
wg.Add(1)
if err := EnsureSpiloThirdPartyResource(z.Clientset); err != nil {
log.Fatalf("Couldn't create ThirdPartyResource: %s", err)
}
go z.spiloInformer.Run(stopCh)
go z.podInformer.Run(stopCh)
go z.podWatcher(stopCh)
<-stopCh
}
func (z *SpiloZooKeeper) spiloAdd(obj interface{}) {
spilo := obj.(*Spilo)
clusterName := (*spilo).Metadata.Name
ns := (*spilo).Metadata.Namespace
z.CreateEndPoint(ns, clusterName)
z.CreateService(ns, clusterName)
z.CreateSecrets(ns, clusterName)
z.CreateStatefulSet(spilo)
}
func (z *SpiloZooKeeper) spiloUpdate(old, cur interface{}) {
oldSpilo := old.(*Spilo)
curSpilo := cur.(*Spilo)
if oldSpilo.Spec.NumberOfInstances != curSpilo.Spec.NumberOfInstances {
z.UpdateStatefulSet(curSpilo)
}
if oldSpilo.Spec.DockerImage != curSpilo.Spec.DockerImage {
z.UpdateStatefulSetImage(curSpilo)
}
log.Printf("Update spilo old: %+v cur: %+v", *oldSpilo, *curSpilo)
}
func (z *SpiloZooKeeper) spiloDelete(obj interface{}) {
spilo := obj.(*Spilo)
err := z.DeleteStatefulSet(spilo.Metadata.Namespace, spilo.Metadata.Name)
if err != nil {
log.Printf("Error while deleting stateful set: %+v", err)
}
}
func (z *SpiloZooKeeper) DeleteStatefulSet(ns, clusterName string) error {
orphanDependents := false
deleteOptions := v1.DeleteOptions{
OrphanDependents: &orphanDependents,
}
listOptions := v1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", clusterName),
}
podList, err := z.Clientset.Pods(ns).List(listOptions)
if err != nil {
log.Printf("Error: %+v", err)
}
err = z.Clientset.StatefulSets(ns).Delete(clusterName, &deleteOptions)
if err != nil {
return err
}
log.Printf("StatefulSet %s.%s has been deleted\n", ns, clusterName)
for _, pod := range podList.Items {
err = z.Clientset.Pods(pod.Namespace).Delete(pod.Name, &deleteOptions)
if err != nil {
log.Printf("Error while deleting Pod %s: %+v", pod.Name, err)
return err
}
log.Printf("Pod %s.%s has been deleted\n", pod.Namespace, pod.Name)
}
serviceList, err := z.Clientset.Services(ns).List(listOptions)
if err != nil {
return err
}
for _, service := range serviceList.Items {
err = z.Clientset.Services(service.Namespace).Delete(service.Name, &deleteOptions)
if err != nil {
log.Printf("Error while deleting Service %s: %+v", service.Name, err)
return err
}
log.Printf("Service %s.%s has been deleted\n", service.Namespace, service.Name)
}
z.DeleteEtcdKey(clusterName)
return nil
}
func (z *SpiloZooKeeper) UpdateStatefulSet(spilo *Spilo) {
ns := (*spilo).Metadata.Namespace
statefulSet := z.createSetFromSpilo(spilo)
_, err := z.Clientset.StatefulSets(ns).Update(&statefulSet)
if err != nil {
log.Printf("Error while updating StatefulSet: %s", err)
}
}
func (z *SpiloZooKeeper) UpdateStatefulSetImage(spilo *Spilo) {
ns := (*spilo).Metadata.Namespace
z.UpdateStatefulSet(spilo)
listOptions := v1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", (*spilo).Metadata.Name),
}
pods, err := z.Clientset.Pods(ns).List(listOptions)
if err != nil {
log.Printf("Error while getting pods: %s", err)
}
orphanDependents := true
deleteOptions := v1.DeleteOptions{
OrphanDependents: &orphanDependents,
}
var masterPodName string
for _, pod := range pods.Items {
log.Printf("Pod processing: %s", pod.Name)
role, ok := pod.Labels["spilo-role"]
if ok == false {
log.Println("No spilo-role label")
continue
}
if role == "master" {
masterPodName = pod.Name
log.Printf("Skipping master: %s", masterPodName)
continue
}
err := z.Clientset.Pods(ns).Delete(pod.Name, &deleteOptions)
if err != nil {
log.Printf("Error while deleting Pod %s.%s: %s", pod.Namespace, pod.Name, err)
} else {
log.Printf("Pod deleted: %s.%s", pod.Namespace, pod.Name)
}
//TODO: wait until Pod recreated
}
//TODO: do manual failover
err = z.Clientset.Pods(ns).Delete(masterPodName, &deleteOptions)
if err != nil {
log.Printf("Error while deleting Pod %s.%s: %s", ns, masterPodName, err)
} else {
log.Printf("Pod deleted: %s.%s", ns, masterPodName)
}
}
func (z *SpiloZooKeeper) podWatcher(stopCh <-chan struct{}) {
watchers := make(map[string]chan podEvent)
for {
select {
case watcher := <-z.podWatchers:
if watcher.subscribe {
watchers[watcher.podName] = watcher.eventsChannel
} else {
close(watcher.eventsChannel)
delete(watchers, watcher.podName)
}
case podEvent := <-z.podEvents:
podChannel, ok := watchers[podEvent.name]
if ok == false {
continue
}
podChannel <- podEvent
}
}
}

View File

@ -1,16 +1,16 @@
apiVersion: "acid.zalan.do/v1" apiVersion: "zalan.do/v1"
kind: Spilo kind: "Spilo"
metadata: metadata:
name: testcluster name: testcluster
spec: spec:
docker_image: registry.opensource.zalan.do/acid/spilo-9.6:1.2-p5
etcd_host: etcd-client.default.svc.cluster.local:2379 etcd_host: etcd-client.default.svc.cluster.local:2379
volume_size: 100 # GB volume_size: 100 # GB
resource_cpu: 111m resource_cpu: 100m
resource_memory: 222Mi resource_memory: 500Mi
number_of_instances: 3 number_of_instances: 3
docker_image: registry.opensource.zalan.do/acid/spilo-9.6:1.2-p5 # put the spilo image here
postgres_configuration: postgres_configuration:
- param: "max_connections" - param: "max_connections"
value: "10" value: "10"