diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..ba4754ea0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof +/vendor/ diff --git a/README.md b/README.md index c723e6a22..f3c6007a0 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,29 @@ -# postgres-operator -PostgreSQL operator in Kubernetes: concepts and code. +# postgres operator prototype (WIP) + +### Create minikube + + $ minikube start + +### Deploy etcd + + $ kubectl create -f github.com/coreos/etcd/hack/kubernetes-deploy/etcd.yaml + +### Run operator + + $ go run main.go + +### Check if ThirdPartyResource has been registered + + $ kubectl get thirdpartyresources + + NAME DESCRIPTION VERSION(S) + spilo.acid.zalan.do A specification of Spilo StatefulSets v1 + + +### Create a new spilo cluster + + $ kubectl create -f testcluster.yaml + +### Watch Pods being created + + $ kubectl get pods -w \ No newline at end of file diff --git a/glide.lock b/glide.lock new file mode 100644 index 000000000..7083f22ea --- /dev/null +++ b/glide.lock @@ -0,0 +1,264 @@ +hash: c45b505f8cf3c9b967a7244e0a675ae11c158d0168496bed18a43270e2cebfe9 +updated: 2017-01-09T11:28:31.075588565+01:00 +imports: +- name: cloud.google.com/go + version: 3b1ae45394a234c385be014e9a488f2bb6eef821 + subpackages: + - compute/metadata + - internal +- name: github.com/blang/semver + version: 31b736133b98f26d5e078ec9eb591666edfd091f +- name: github.com/coreos/etcd + version: 3d5ba43211beec7fbb1472634a9c3b464581658a + subpackages: + - client + - pkg/pathutil + - pkg/types +- name: github.com/coreos/go-oidc + version: 5644a2f50e2d2d5ba0b474bc5bc55fea1925936d + subpackages: + - http + - jose + - key + - oauth2 + - oidc +- name: github.com/coreos/pkg + version: fa29b1d70f0beaddd4c7021607cc3c3be8ce94b8 + subpackages: + - capnslog + - health + - httputil + - timeutil +- name: github.com/davecgh/go-spew + version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d + subpackages: + - spew +- name: github.com/docker/distribution + version: cd27f179f2c10c5d300e6d09025b538c475b0d51 + subpackages: + - digest + - reference +- name: github.com/emicklei/go-restful + version: 89ef8af493ab468a45a42bb0d89a06fccdd2fb22 + subpackages: + - log + - swagger +- name: github.com/ghodss/yaml + version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee +- name: github.com/go-openapi/jsonpointer + version: 46af16f9f7b149af66e5d1bd010e3574dc06de98 +- name: github.com/go-openapi/jsonreference + version: 13c6e3589ad90f49bd3e3bbe2c2cb3d7a4142272 +- name: github.com/go-openapi/spec + version: 6aced65f8501fe1217321abf0749d354824ba2ff +- name: github.com/go-openapi/swag + version: 1d0bd113de87027671077d3c71eb3ac5d7dbba72 +- name: github.com/gogo/protobuf + version: 8d70fb3182befc465c4a1eac8ad4d38ff49778e2 + subpackages: + - proto + - sortkeys +- name: github.com/golang/glog + version: 44145f04b68cf362d9c4df2182967c2275eaefed +- name: github.com/golang/protobuf + version: 4bd1920723d7b7c925de087aa32e2187708897f7 + subpackages: + - jsonpb + - proto +- name: github.com/google/gofuzz + version: bbcb9da2d746f8bdbd6a936686a0a6067ada0ec5 +- name: github.com/howeyc/gopass + version: 3ca23474a7c7203e0a0a070fd33508f6efdb9b3d +- name: github.com/imdario/mergo + version: 6633656539c1639d9d78127b7d47c622b5d7b6dc +- name: github.com/jonboulle/clockwork + version: 2eee05ed794112d45db504eb05aa693efd2b8b09 +- name: github.com/mailru/easyjson + version: d5b7844b561a7bc640052f1b935f7b800330d7e0 + subpackages: + - buffer + - jlexer + - jwriter +- name: github.com/pborman/uuid + version: ca53cad383cad2479bbba7f7a1a05797ec1386e4 +- name: github.com/PuerkitoBio/purell + version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 +- name: github.com/PuerkitoBio/urlesc + version: 5bd2802263f21d8788851d5305584c82a5c75d7e +- name: github.com/spf13/pflag + version: 5ccb023bc27df288a957c5e994cd44fd19619465 +- name: github.com/ugorji/go + version: f1f1a805ed361a0e078bb537e4ea78cd37dcf065 + subpackages: + - codec +- name: golang.org/x/crypto + version: 1351f936d976c60a0a48d728281922cf63eafb8d + subpackages: + - bcrypt + - blowfish + - ssh/terminal +- name: golang.org/x/net + version: 6acef71eb69611914f7a30939ea9f6e194c78172 + subpackages: + - context + - context/ctxhttp + - http2 + - http2/hpack + - idna +- name: golang.org/x/oauth2 + version: 3c3a985cb79f52a3190fbc056984415ca6763d01 + subpackages: + - google + - internal + - jws + - jwt +- name: golang.org/x/sys + version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 + subpackages: + - unix +- name: golang.org/x/text + version: 2910a502d2bf9e43193af9d68ca516529614eed3 + subpackages: + - cases + - internal/tag + - language + - runes + - secure/bidirule + - secure/precis + - transform + - unicode/bidi + - unicode/norm + - width +- name: google.golang.org/appengine + version: 4f7eeb5305a4ba1966344836ba4af9996b7b4e05 + subpackages: + - internal + - internal/app_identity + - internal/base + - internal/datastore + - internal/log + - internal/modules + - internal/remote_api + - internal/urlfetch + - urlfetch +- name: gopkg.in/inf.v0 + version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 +- name: gopkg.in/yaml.v2 + version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 +- name: k8s.io/client-go + version: d81cb85237595f720d83eda492bae8f6162fc5c0 + subpackages: + - discovery + - kubernetes + - kubernetes/typed/apps/v1beta1 + - kubernetes/typed/authentication/v1beta1 + - kubernetes/typed/authorization/v1beta1 + - kubernetes/typed/autoscaling/v1 + - kubernetes/typed/batch/v1 + - kubernetes/typed/batch/v2alpha1 + - kubernetes/typed/certificates/v1alpha1 + - kubernetes/typed/core/v1 + - kubernetes/typed/extensions/v1beta1 + - kubernetes/typed/policy/v1beta1 + - kubernetes/typed/rbac/v1alpha1 + - kubernetes/typed/storage/v1beta1 + - pkg/api + - pkg/api/errors + - pkg/api/install + - pkg/api/meta + - pkg/api/meta/metatypes + - pkg/api/resource + - pkg/api/unversioned + - pkg/api/v1 + - pkg/api/validation/path + - pkg/apimachinery + - pkg/apimachinery/announced + - pkg/apimachinery/registered + - pkg/apis/apps + - pkg/apis/apps/install + - pkg/apis/apps/v1beta1 + - pkg/apis/authentication + - pkg/apis/authentication/install + - pkg/apis/authentication/v1beta1 + - pkg/apis/authorization + - pkg/apis/authorization/install + - pkg/apis/authorization/v1beta1 + - pkg/apis/autoscaling + - pkg/apis/autoscaling/install + - pkg/apis/autoscaling/v1 + - pkg/apis/batch + - pkg/apis/batch/install + - pkg/apis/batch/v1 + - pkg/apis/batch/v2alpha1 + - pkg/apis/certificates + - pkg/apis/certificates/install + - pkg/apis/certificates/v1alpha1 + - pkg/apis/extensions + - pkg/apis/extensions/install + - pkg/apis/extensions/v1beta1 + - pkg/apis/policy + - pkg/apis/policy/install + - pkg/apis/policy/v1beta1 + - pkg/apis/rbac + - pkg/apis/rbac/install + - pkg/apis/rbac/v1alpha1 + - pkg/apis/storage + - pkg/apis/storage/install + - pkg/apis/storage/v1beta1 + - pkg/auth/user + - pkg/conversion + - pkg/conversion/queryparams + - pkg/fields + - pkg/genericapiserver/openapi/common + - pkg/labels + - pkg/runtime + - pkg/runtime/serializer + - pkg/runtime/serializer/json + - pkg/runtime/serializer/protobuf + - pkg/runtime/serializer/recognizer + - pkg/runtime/serializer/streaming + - pkg/runtime/serializer/versioning + - pkg/selection + - pkg/third_party/forked/golang/reflect + - pkg/third_party/forked/golang/template + - pkg/types + - pkg/util + - pkg/util/cert + - pkg/util/clock + - pkg/util/diff + - pkg/util/errors + - pkg/util/flowcontrol + - pkg/util/framer + - pkg/util/homedir + - pkg/util/integer + - pkg/util/intstr + - pkg/util/json + - pkg/util/jsonpath + - pkg/util/labels + - pkg/util/net + - pkg/util/parsers + - pkg/util/rand + - pkg/util/ratelimit + - pkg/util/runtime + - pkg/util/sets + - pkg/util/uuid + - pkg/util/validation + - pkg/util/validation/field + - pkg/util/wait + - pkg/util/yaml + - pkg/version + - pkg/watch + - pkg/watch/versioned + - plugin/pkg/client/auth + - plugin/pkg/client/auth/gcp + - plugin/pkg/client/auth/oidc + - rest + - tools/auth + - tools/cache + - tools/clientcmd + - tools/clientcmd/api + - tools/clientcmd/api/latest + - tools/clientcmd/api/v1 + - tools/metrics + - transport +testImports: [] diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 000000000..3ee84e278 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,28 @@ +package: github.bus.zalan.do/mkabilov/postgres-operator +import: +- package: github.com/coreos/etcd + version: ^3.1.0-rc.1 + subpackages: + - client +- package: github.com/spf13/pflag +- package: golang.org/x/net + subpackages: + - context +- package: k8s.io/client-go + version: ^2.0.0-alpha.1 + subpackages: + - kubernetes + - pkg/api + - pkg/api/meta + - pkg/api/resource + - pkg/api/unversioned + - pkg/api/v1 + - pkg/apis/apps/v1beta1 + - pkg/apis/extensions/v1beta1 + - pkg/fields + - pkg/runtime + - pkg/runtime/serializer + - pkg/util/intstr + - rest + - tools/cache + - tools/clientcmd diff --git a/main.go b/main.go new file mode 100644 index 000000000..f08c04785 --- /dev/null +++ b/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "flag" + "log" + "os" + "os/signal" + "sync" + "syscall" + + "github.bus.zalan.do/acid/postgres-operator/operator" + "github.com/spf13/pflag" +) + +var options operator.Options + +func init() { + pflag.StringVar(&options.KubeConfig, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") +} + +func main() { + // Set logging output to standard console out + log.SetOutput(os.Stdout) + + pflag.CommandLine.AddGoFlagSet(flag.CommandLine) + pflag.Parse() + + sigs := make(chan os.Signal, 1) + stop := make(chan struct{}) + signal.Notify(sigs, os.Interrupt, syscall.SIGTERM) // Push signals into channel + + wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on + + spiloOperator := operator.New(options) + spiloOperator.Run(stop, wg) + + sig := <-sigs // Wait for signals (this hangs until a signal arrives) + log.Printf("Shutting down... %+v", sig) + + close(stop) // Tell goroutines to stop themselves + wg.Wait() // Wait for all to be stopped +} diff --git a/operator/etcd.go b/operator/etcd.go new file mode 100644 index 000000000..e1743f2bd --- /dev/null +++ b/operator/etcd.go @@ -0,0 +1,27 @@ +package operator + +import ( + "fmt" + "github.com/coreos/etcd/client" + "golang.org/x/net/context" + "log" +) + +func (z *PgZooKeeper) DeleteEtcdKey(clusterName string) error { + options := client.DeleteOptions{ + Recursive: true, + } + + keyName := fmt.Sprintf(etcdKeyTemplate, clusterName) + + resp, err := z.etcdApiClient.Delete(context.Background(), keyName, &options) + if resp != nil { + log.Printf("Response: %+v", *resp) + } else { + log.Fatal("No response from etcd") + } + + log.Printf("Deleting key %s from ETCD", clusterName) + + return err +} diff --git a/operator/objects.go b/operator/objects.go new file mode 100644 index 000000000..94adfafaa --- /dev/null +++ b/operator/objects.go @@ -0,0 +1,234 @@ +package operator + +import ( + "k8s.io/client-go/pkg/api/resource" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/apps/v1beta1" + "k8s.io/client-go/pkg/util/intstr" + "log" +) + +func (z *PgZooKeeper) CreateStatefulSet(spilo *Spilo) { + ns := (*spilo).Metadata.Namespace + + statefulSet := z.createSetFromSpilo(spilo) + + _, err := z.Clientset.StatefulSets(ns).Create(&statefulSet) + if err != nil { + log.Printf("Petset error: %+v", err) + } else { + log.Printf("Petset created: %+v", statefulSet) + } +} + +func (z *PgZooKeeper) createSetFromSpilo(spilo *Spilo) v1beta1.StatefulSet { + clusterName := (*spilo).Metadata.Name + + envVars := []v1.EnvVar{ + { + Name: "SCOPE", + Value: clusterName, + }, + { + Name: "PGROOT", + Value: "/home/postgres/pgdata/pgroot", + }, + { + Name: "ETCD_HOST", + Value: spilo.Spec.EtcdHost, + }, + { + Name: "POD_IP", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.podIP", + }, + }, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "PGPASSWORD_SUPERUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: clusterName, + }, + Key: "superuser-password", + }, + }, + }, + { + Name: "PGPASSWORD_ADMIN", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: clusterName, + }, + Key: "admin-password", + }, + }, + }, + { + Name: "PGPASSWORD_STANDBY", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: clusterName, + }, + Key: "replication-password", + }, + }, + }, + } + + resourceList := v1.ResourceList{} + + if (*spilo).Spec.ResourceCPU != "" { + resourceList[v1.ResourceCPU] = resource.MustParse((*spilo).Spec.ResourceCPU) + } + + if (*spilo).Spec.ResourceMemory != "" { + resourceList[v1.ResourceMemory] = resource.MustParse((*spilo).Spec.ResourceMemory) + } + + container := v1.Container{ + Name: clusterName, + Image: spilo.Spec.DockerImage, + ImagePullPolicy: v1.PullAlways, + Resources: v1.ResourceRequirements{ + Requests: resourceList, + }, + Ports: []v1.ContainerPort{ + { + ContainerPort: 8008, + Protocol: v1.ProtocolTCP, + }, + { + ContainerPort: 5432, + Protocol: v1.ProtocolTCP, + }, + }, + VolumeMounts: []v1.VolumeMount{ + { + Name: "pgdata", + MountPath: "/home/postgres/pgdata", + }, + }, + Env: envVars, + } + + terminateGracePeriodSeconds := int64(0) + + podSpec := v1.PodSpec{ + TerminationGracePeriodSeconds: &terminateGracePeriodSeconds, + Volumes: []v1.Volume{ + { + Name: "pgdata", + VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}, + }, + }, + Containers: []v1.Container{container}, + } + + template := v1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + "application": "spilo", + "spilo-cluster": clusterName, + }, + Annotations: map[string]string{"pod.alpha.kubernetes.io/initialized": "true"}, + }, + Spec: podSpec, + } + + return v1beta1.StatefulSet{ + ObjectMeta: v1.ObjectMeta{ + Name: clusterName, + Labels: map[string]string{ + "application": "spilo", + "spilo-cluster": clusterName, + }, + }, + Spec: v1beta1.StatefulSetSpec{ + Replicas: &spilo.Spec.NumberOfInstances, + ServiceName: clusterName, + Template: template, + }, + } +} + +func (z *PgZooKeeper) CreateSecrets(ns, name string) { + secret := v1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "application": "spilo", + "spilo-cluster": name, + }, + }, + Type: v1.SecretTypeOpaque, + Data: map[string][]byte{ + "superuser-password": []byte("emFsYW5kbw=="), + "replication-password": []byte("cmVwLXBhc3M="), + "admin-password": []byte("YWRtaW4="), + }, + } + + _, err := z.Clientset.Secrets(ns).Create(&secret) + if err != nil { + log.Printf("Secret error: %+v", err) + } else { + log.Printf("Secret created: %+v", secret) + } +} + +func (z *PgZooKeeper) CreateService(ns, name string) { + service := v1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "application": "spilo", + "spilo-cluster": name, + }, + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeClusterIP, + Ports: []v1.ServicePort{{Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, + }, + } + + _, err := z.Clientset.Services(ns).Create(&service) + if err != nil { + log.Printf("Service error: %+v", err) + } else { + log.Printf("Service created: %+v", service) + } +} + +func (z *PgZooKeeper) CreateEndPoint(ns, name string) { + endPoint := v1.Endpoints{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "application": "spilo", + "spilo-cluster": name, + }, + }, + } + + _, err := z.Clientset.Endpoints(ns).Create(&endPoint) + if err != nil { + log.Printf("Endpoint error: %+v", err) + } else { + log.Printf("Endpoint created: %+v", endPoint) + } +} diff --git a/operator/operator.go b/operator/operator.go new file mode 100644 index 000000000..5678380f1 --- /dev/null +++ b/operator/operator.go @@ -0,0 +1,123 @@ +package operator + +import ( + "fmt" + "log" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/extensions/v1beta1" + "k8s.io/client-go/pkg/runtime" + "k8s.io/client-go/pkg/runtime/serializer" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +var ( + etcdHostOutside string + + VENDOR = "acid.zalan.do" + VERSION = "0.0.1.dev" + resyncPeriod = 5 * time.Minute + + etcdKeyTemplate = "/service/%s" +) + +type Options struct { + KubeConfig string +} + +type Pgconf struct { + Parameter string `json:"param"` + Value string `json:"value"` +} + +type SpiloSpec struct { + EtcdHost string `json:"etcd_host"` + VolumeSize int `json:"volume_size"` + NumberOfInstances int32 `json:"number_of_instances"` + DockerImage string `json:"docker_image"` + PostgresConfiguration []Pgconf `json:"postgres_configuration"` + ResourceCPU string `json:"resource_cpu"` + ResourceMemory string `json:"resource_memory"` +} + +type Spilo struct { + unversioned.TypeMeta `json:",inline"` + Metadata api.ObjectMeta `json:"metadata"` + Spec SpiloSpec `json:"spec"` +} + +type SpiloList struct { + unversioned.TypeMeta `json:",inline"` + Metadata unversioned.ListMeta `json:"metadata"` + Items []Spilo `json:"items"` +} + +func KubernetesConfig(options Options) *rest.Config { + rules := clientcmd.NewDefaultClientConfigLoadingRules() + overrides := &clientcmd.ConfigOverrides{} + + if options.KubeConfig != "" { + rules.ExplicitPath = options.KubeConfig + } + + config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() + + etcdHostOutside = config.Host + + if err != nil { + log.Fatalf("Couldn't get Kubernetes default config: %s", err) + } + + return config +} + +func newKubernetesSpiloClient(c *rest.Config) (*rest.RESTClient, error) { + c.APIPath = "/apis" + c.GroupVersion = &unversioned.GroupVersion{ + Group: VENDOR, + Version: "v1", + } + c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} + + schemeBuilder := runtime.NewSchemeBuilder( + func(scheme *runtime.Scheme) error { + scheme.AddKnownTypes( + *c.GroupVersion, + &Spilo{}, + &SpiloList{}, + &api.ListOptions{}, + &api.DeleteOptions{}, + ) + return nil + }) + schemeBuilder.AddToScheme(api.Scheme) + + return rest.RESTClientFor(c) +} + +func EnsureSpiloThirdPartyResource(client *kubernetes.Clientset) error { + _, err := client.ExtensionsV1beta1().ThirdPartyResources().Get(fmt.Sprintf("spilo.%s", VENDOR)) + if err == nil { + return err + } + + // The resource doesn't exist, so we create it. + tpr := v1beta1.ThirdPartyResource{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("spilo.%s", VENDOR), + }, + Description: "A specification of Spilo StatefulSets", + Versions: []v1beta1.APIVersion{ + {Name: "v1"}, + }, + } + + _, err = client.ExtensionsV1beta1().ThirdPartyResources().Create(&tpr) + + return err +} diff --git a/operator/spilo.go b/operator/spilo.go new file mode 100644 index 000000000..5d7e7d00f --- /dev/null +++ b/operator/spilo.go @@ -0,0 +1,99 @@ +package operator + +import ( + "encoding/json" + "log" + "sync" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api/meta" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/rest" +) + +type SpiloOperator struct { + Options + + ClientSet *kubernetes.Clientset + SpiloClient *rest.RESTClient + + SpiloZooKeeper *PgZooKeeper +} + +func New(options Options) *SpiloOperator { + config := KubernetesConfig(options) + + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + log.Fatalf("Couldn't create Kubernetes client: %s", err) + } + + spiloClient, err := newKubernetesSpiloClient(config) + if err != nil { + log.Fatalf("Couldn't create Spilo client: %s", err) + } + + operator := &SpiloOperator{ + Options: options, + ClientSet: clientSet, + SpiloClient: spiloClient, + SpiloZooKeeper: newZookeeper(spiloClient, clientSet), + } + + return operator +} + +func (o *SpiloOperator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { + log.Printf("Spilo operator %v\n", VERSION) + + go o.SpiloZooKeeper.Run(stopCh, wg) + + log.Println("Started working in background") +} + +// The code below is used only to work around a known problem with third-party +// resources and ugorji. If/when these issues are resolved, the code below +// should no longer be required. +// + +func (s *Spilo) GetObjectKind() unversioned.ObjectKind { + return &s.TypeMeta +} + +func (s *Spilo) GetObjectMeta() meta.Object { + return &s.Metadata +} +func (sl *SpiloList) GetObjectKind() unversioned.ObjectKind { + return &sl.TypeMeta +} + +func (sl *SpiloList) GetListMeta() unversioned.List { + return &sl.Metadata +} + +type SpiloListCopy SpiloList +type SpiloCopy Spilo + +func (e *Spilo) UnmarshalJSON(data []byte) error { + tmp := SpiloCopy{} + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + tmp2 := Spilo(tmp) + *e = tmp2 + + return nil +} + +func (el *SpiloList) UnmarshalJSON(data []byte) error { + tmp := SpiloListCopy{} + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + tmp2 := SpiloList(tmp) + *el = tmp2 + + return nil +} diff --git a/operator/zookeeper.go b/operator/zookeeper.go new file mode 100644 index 000000000..87fd9c475 --- /dev/null +++ b/operator/zookeeper.go @@ -0,0 +1,318 @@ +package operator + +import ( + "fmt" + "log" + "sync" + "time" + + etcdclient "github.com/coreos/etcd/client" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/fields" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" +) + +const ( + ACTION_DELETE = "delete" + ACTION_UPDATE = "update" + ACTION_ADD = "add" +) + +type podEvent struct { + namespace string + name string + actionType string +} + +type podWatcher struct { + podNamespace string + podName string + eventsChannel chan podEvent + subscribe bool +} + +type PgZooKeeper struct { + podEvents chan podEvent + podWatchers chan podWatcher + SpiloClient *rest.RESTClient + Clientset *kubernetes.Clientset + + spiloInformer cache.SharedIndexInformer + podInformer cache.SharedIndexInformer + etcdApiClient etcdclient.KeysAPI +} + +func podsListWatch(client *kubernetes.Clientset) *cache.ListWatch { + return cache.NewListWatchFromClient(client.Core().RESTClient(), "pods", api.NamespaceAll, fields.Everything()) +} + +func newZookeeper(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset) *PgZooKeeper { + pgZooKeeper := &PgZooKeeper{ + SpiloClient: spiloClient, + Clientset: clientset, + } + + spiloInformer := cache.NewSharedIndexInformer( + cache.NewListWatchFromClient(spiloClient, "spilos", api.NamespaceAll, fields.Everything()), + &Spilo{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + + spiloInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: pgZooKeeper.spiloAdd, + UpdateFunc: pgZooKeeper.spiloUpdate, + DeleteFunc: pgZooKeeper.spiloDelete, + }) + + podInformer := cache.NewSharedIndexInformer( + podsListWatch(clientset), + &v1.Pod{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + + podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: pgZooKeeper.podAdd, + UpdateFunc: pgZooKeeper.podUpdate, + DeleteFunc: pgZooKeeper.podDelete, + }) + + pgZooKeeper.spiloInformer = spiloInformer + pgZooKeeper.podInformer = podInformer + + cfg := etcdclient.Config{ + Endpoints: []string{etcdHostOutside}, + Transport: etcdclient.DefaultTransport, + HeaderTimeoutPerRequest: time.Second, + } + + c, err := etcdclient.New(cfg) + if err != nil { + log.Fatal(err) + } + + pgZooKeeper.etcdApiClient = etcdclient.NewKeysAPI(c) + pgZooKeeper.podEvents = make(chan podEvent) + + return pgZooKeeper +} + +func (d *PgZooKeeper) podAdd(obj interface{}) { + pod := obj.(*v1.Pod) + d.podEvents <- podEvent{ + namespace: pod.Namespace, + name: pod.Name, + actionType: ACTION_ADD, + } +} + +func (d *PgZooKeeper) podDelete(obj interface{}) { + pod := obj.(*v1.Pod) + d.podEvents <- podEvent{ + namespace: pod.Namespace, + name: pod.Name, + actionType: ACTION_DELETE, + } +} + +func (d *PgZooKeeper) podUpdate(old, cur interface{}) { + oldPod := old.(*v1.Pod) + d.podEvents <- podEvent{ + namespace: oldPod.Namespace, + name: oldPod.Name, + actionType: ACTION_UPDATE, + } +} + +func (z *PgZooKeeper) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + wg.Add(1) + + if err := EnsureSpiloThirdPartyResource(z.Clientset); err != nil { + log.Fatalf("Couldn't create ThirdPartyResource: %s", err) + } + + go z.spiloInformer.Run(stopCh) + go z.podInformer.Run(stopCh) + go z.podWatcher(stopCh) + + <-stopCh +} + +func (z *PgZooKeeper) spiloAdd(obj interface{}) { + spilo := obj.(*Spilo) + + clusterName := (*spilo).Metadata.Name + ns := (*spilo).Metadata.Namespace + + z.CreateEndPoint(ns, clusterName) + z.CreateService(ns, clusterName) + z.CreateSecrets(ns, clusterName) + z.CreateStatefulSet(spilo) +} + +func (z *PgZooKeeper) spiloUpdate(old, cur interface{}) { + oldSpilo := old.(*Spilo) + curSpilo := cur.(*Spilo) + + if oldSpilo.Spec.NumberOfInstances != curSpilo.Spec.NumberOfInstances { + z.UpdateStatefulSet(curSpilo) + } + + if oldSpilo.Spec.DockerImage != curSpilo.Spec.DockerImage { + z.UpdateStatefulSetImage(curSpilo) + } + + log.Printf("Update spilo old: %+v cur: %+v", *oldSpilo, *curSpilo) +} + +func (z *PgZooKeeper) spiloDelete(obj interface{}) { + spilo := obj.(*Spilo) + + err := z.DeleteStatefulSet(spilo.Metadata.Namespace, spilo.Metadata.Name) + if err != nil { + log.Printf("Error while deleting stateful set: %+v", err) + } +} + +func (z *PgZooKeeper) DeleteStatefulSet(ns, clusterName string) error { + orphanDependents := false + deleteOptions := v1.DeleteOptions{ + OrphanDependents: &orphanDependents, + } + + listOptions := v1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", clusterName), + } + + podList, err := z.Clientset.Pods(ns).List(listOptions) + if err != nil { + log.Printf("Error: %+v", err) + } + + err = z.Clientset.StatefulSets(ns).Delete(clusterName, &deleteOptions) + if err != nil { + return err + } + log.Printf("StatefulSet %s.%s has been deleted\n", ns, clusterName) + + for _, pod := range podList.Items { + err = z.Clientset.Pods(pod.Namespace).Delete(pod.Name, &deleteOptions) + if err != nil { + log.Printf("Error while deleting Pod %s: %+v", pod.Name, err) + return err + } + + log.Printf("Pod %s.%s has been deleted\n", pod.Namespace, pod.Name) + } + + serviceList, err := z.Clientset.Services(ns).List(listOptions) + if err != nil { + return err + } + + for _, service := range serviceList.Items { + err = z.Clientset.Services(service.Namespace).Delete(service.Name, &deleteOptions) + if err != nil { + log.Printf("Error while deleting Service %s: %+v", service.Name, err) + + return err + } + + log.Printf("Service %s.%s has been deleted\n", service.Namespace, service.Name) + } + + z.DeleteEtcdKey(clusterName) + + return nil +} + +func (z *PgZooKeeper) UpdateStatefulSet(spilo *Spilo) { + ns := (*spilo).Metadata.Namespace + + statefulSet := z.createSetFromSpilo(spilo) + _, err := z.Clientset.StatefulSets(ns).Update(&statefulSet) + + if err != nil { + log.Printf("Error while updating StatefulSet: %s", err) + } +} + +func (z *PgZooKeeper) UpdateStatefulSetImage(spilo *Spilo) { + ns := (*spilo).Metadata.Namespace + + z.UpdateStatefulSet(spilo) + + listOptions := v1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", (*spilo).Metadata.Name), + } + + pods, err := z.Clientset.Pods(ns).List(listOptions) + if err != nil { + log.Printf("Error while getting pods: %s", err) + } + + orphanDependents := true + deleteOptions := v1.DeleteOptions{ + OrphanDependents: &orphanDependents, + } + + var masterPodName string + for _, pod := range pods.Items { + log.Printf("Pod processing: %s", pod.Name) + + role, ok := pod.Labels["spilo-role"] + if ok == false { + log.Println("No spilo-role label") + continue + } + if role == "master" { + masterPodName = pod.Name + log.Printf("Skipping master: %s", masterPodName) + continue + } + + err := z.Clientset.Pods(ns).Delete(pod.Name, &deleteOptions) + if err != nil { + log.Printf("Error while deleting Pod %s.%s: %s", pod.Namespace, pod.Name, err) + } else { + log.Printf("Pod deleted: %s.%s", pod.Namespace, pod.Name) + } + + //TODO: wait until Pod recreated + } + + //TODO: do manual failover + err = z.Clientset.Pods(ns).Delete(masterPodName, &deleteOptions) + if err != nil { + log.Printf("Error while deleting Pod %s.%s: %s", ns, masterPodName, err) + } else { + log.Printf("Pod deleted: %s.%s", ns, masterPodName) + } +} + +func (z *PgZooKeeper) podWatcher(stopCh <-chan struct{}) { + watchers := make(map[string]chan podEvent) + for { + select { + case watcher := <-z.podWatchers: + if watcher.subscribe { + watchers[watcher.podName] = watcher.eventsChannel + } else { + close(watcher.eventsChannel) + delete(watchers, watcher.podName) + } + case podEvent := <-z.podEvents: + podChannel, ok := watchers[podEvent.name] + if ok == false { + continue + } + + podChannel <- podEvent + } + } +} diff --git a/testcluster.yaml b/testcluster.yaml new file mode 100644 index 000000000..cca415e61 --- /dev/null +++ b/testcluster.yaml @@ -0,0 +1,18 @@ +apiVersion: "acid.zalan.do/v1" +kind: Spilo + +metadata: + name: testcluster + +spec: + etcd_host: etcd-client.default.svc.cluster.local:2379 + volume_size: 100 # GB + resource_cpu: 111m + resource_memory: 222Mi + number_of_instances: 3 + docker_image: registry.opensource.zalan.do/acid/spilotest-9.6:1.1-p10 # put the spilo image here + postgres_configuration: + - param: "max_connections" + value: "10" + - param: "shared_buffers" + value: "500MB"