diff --git a/README.md b/README.md index f2e23157a..9d55acec3 100644 --- a/README.md +++ b/README.md @@ -36,14 +36,14 @@ ### Check if ThirdPartyResource has been registered $ kubectl get thirdpartyresources - - NAME DESCRIPTION VERSION(S) - spilo.acid.zalan.do A specification of Spilo StatefulSets v1 - + + NAME DESCRIPTION VERSION(S) + postgresql.acid.zalan.do Managed PostgreSQL clusters v1 + ### Create a new spilo cluster - $ kubectl create -f manifests/testspilo.yaml + $ kubectl create -f manifests/testpostgresql.yaml ### Watch Pods being created diff --git a/cmd/main.go b/cmd/main.go index 3e11a2ad4..e95ad3563 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,35 +9,63 @@ import ( "syscall" "github.bus.zalan.do/acid/postgres-operator/pkg/controller" - "github.com/spf13/pflag" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" ) -var options controller.Options -var version string +var ( + KubeConfigFile string + Namespace string + OutOfCluster bool + version string +) func init() { - pflag.StringVar(&options.KubeConfig, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") - pflag.BoolVar(&options.OutOfCluster, "outofcluster", false, "Whether the operator runs in- our outside of the Kubernetes cluster.") + flag.StringVar(&KubeConfigFile, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") + flag.BoolVar(&OutOfCluster, "outofcluster", false, "Whether the operator runs in- our outside of the Kubernetes cluster.") + flag.Parse() + + Namespace = os.Getenv("MY_POD_NAMESPACE") + if len(Namespace) == 0 { + Namespace = "default" + } +} + +func ControllerConfig() *controller.Config { + restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster) + if err != nil { + log.Fatalf("Can't get REST config: %s", err) + } + + client, err := k8sutil.KubernetesClient(restConfig) + if err != nil { + log.Fatalf("Can't create client: %s", err) + } + + restClient, err := k8sutil.KubernetesRestClient(restConfig) + + return &controller.Config{ + Namespace: Namespace, + KubeClient: client, + RestClient: restClient, + } } func main() { - // Set logging output to standard console out log.SetOutput(os.Stdout) log.Printf("Spilo operator %s\n", version) - pflag.CommandLine.AddGoFlagSet(flag.CommandLine) - pflag.Parse() - sigs := make(chan os.Signal, 1) stop := make(chan struct{}) signal.Notify(sigs, os.Interrupt, syscall.SIGTERM) // Push signals into channel wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on - c := controller.New(options) + cfg := ControllerConfig() + + c := controller.New(cfg) c.Run(stop, wg) - sig := <-sigs // Wait for signals (this hangs until a signal arrives) + sig := <-sigs log.Printf("Shutting down... %+v", sig) close(stop) // Tell goroutines to stop themselves diff --git a/glide.lock b/glide.lock index c39d79bac..3eb6261e7 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 19831a9408f172d009617c781ea65141606a216aa8e238797e102c511f4afb1f -updated: 2017-01-27T13:24:40.647376238+01:00 +hash: 1ffee26a74225c382e3b5503ef187c46d5132732c3da5bb25909ad40409d197d +updated: 2017-02-03T12:58:01.868568623+01:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -8,12 +8,6 @@ imports: - internal - name: github.com/blang/semver version: 31b736133b98f26d5e078ec9eb591666edfd091f -- name: github.com/coreos/etcd - version: 8ba2897a21e4fc51b298ca553d251318425f93ae - subpackages: - - client - - pkg/pathutil - - pkg/types - name: github.com/coreos/go-oidc version: 5644a2f50e2d2d5ba0b474bc5bc55fea1925936d subpackages: @@ -83,6 +77,8 @@ imports: version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 - name: github.com/PuerkitoBio/urlesc version: 5bd2802263f21d8788851d5305584c82a5c75d7e +- name: github.com/Sirupsen/logrus + version: d26492970760ca5d33129d2d799e34be5c4782eb - name: github.com/spf13/pflag version: 5ccb023bc27df288a957c5e994cd44fd19619465 - name: github.com/ugorji/go diff --git a/glide.yaml b/glide.yaml index f694702b5..da56825d0 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,28 +1,20 @@ package: github.bus.zalan.do/acid/postgres-operator import: -- package: github.com/coreos/etcd - version: ^3.1.0-rc.1 - subpackages: - - client -- package: github.com/spf13/pflag -- package: golang.org/x/net - subpackages: - - context +- package: github.com/Sirupsen/logrus + version: ^0.11.0 - package: k8s.io/client-go version: ^2.0.0-alpha.1 subpackages: - kubernetes - pkg/api + - pkg/api/errors - pkg/api/meta - - pkg/api/resource - pkg/api/unversioned - pkg/api/v1 - - pkg/apis/apps/v1beta1 - pkg/apis/extensions/v1beta1 - pkg/fields - pkg/runtime - pkg/runtime/serializer - - pkg/util/intstr - rest - tools/cache - tools/clientcmd diff --git a/manifests/testpostgresql.yaml b/manifests/testpostgresql.yaml new file mode 100644 index 000000000..cb8a17774 --- /dev/null +++ b/manifests/testpostgresql.yaml @@ -0,0 +1,41 @@ +apiVersion: "acid.zalan.do/v1" +kind: "Postgresql" + +metadata: + name: testcluster + +spec: + volume: + size: 100Gi + storageClass: gp2 + numberOfInstances: 3 + postgresql: + version: "9.6" + parameters: + shared_buffers: "500MB" + max_connections: "10" + log_statement: "all" + users: + jdoe: + - superuser + - createdb + rmiles: + rroe: + resources: + cpu: 100m + memory: 500Mi + patroni: + initdb: + encoding: "UTF8" + locale: "en_US.UTF-8" + data-checksums: "true" + pg_hba: + - hostssl all all 0.0.0.0/0 md5 + - host all all 0.0.0.0/0 md5 + ttl: 30 + loop_wait: &loop_wait 10 + retry_timeout: 10 + maximum_lag_on_failover: 33554432 + maintenanceWindows: + - 01:00-06:00 #UTC + - Sat:00:00-Sat:04:00 diff --git a/manifests/testpostgresql_prime.yaml b/manifests/testpostgresql_prime.yaml new file mode 100644 index 000000000..242833211 --- /dev/null +++ b/manifests/testpostgresql_prime.yaml @@ -0,0 +1,41 @@ +apiVersion: "acid.zalan.do/v1" +kind: "Postgresql" + +metadata: + name: testcluster + +spec: + volume: + size: 100Gi + storageClass: gp2 + numberOfInstances: 2 + postgresql: + version: "9.6" + parameters: + shared_buffers: "500MB" + max_connections: "10" + log_statement: "all" + users: + jdoe: + - superuser + - createdb + rmiles: + rroe: + resources: + cpu: 100m + memory: 500Mi + patroni: + initdb: + encoding: "UTF8" + locale: "en_US.UTF-8" + data-checksums: "true" + pg_hba: + - hostssl all all 0.0.0.0/0 md5 + - host all all 0.0.0.0/0 md5 + ttl: 30 + loop_wait: &loop_wait 10 + retry_timeout: 10 + maximum_lag_on_failover: 33554432 + maintenanceWindows: + - 01:00-06:00 #UTC + - Sat:00:00-Sat:04:00 diff --git a/manifests/testspilo.yaml b/manifests/testspilo.yaml deleted file mode 100644 index d0d04c794..000000000 --- a/manifests/testspilo.yaml +++ /dev/null @@ -1,18 +0,0 @@ -apiVersion: "acid.zalan.do/v1" -kind: "Spilo" - -metadata: - name: testcluster - -spec: - docker_image: registry.opensource.zalan.do/acid/spilo-9.6:1.2-p9 - etcd_host: etcd-client.default.svc.cluster.local:2379 - volume_size: 100 # GB - resource_cpu: 100m - resource_memory: 500Mi - number_of_instances: 3 - postgres_configuration: - - param: "max_connections" - value: "10" - - param: "shared_buffers" - value: "500MB" diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 000000000..2903df700 --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,156 @@ +package cluster + +// Postgres ThirdPartyResource object i.e. Spilo + +import ( + "fmt" + + "github.com/Sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/rest" + + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" +) + +var patroniUsers = []string{"superuser", "replication", "admin"} + +//TODO: remove struct duplication +type Config struct { + Namespace string + KubeClient *kubernetes.Clientset //TODO: move clients to the better place? + RestClient *rest.RESTClient +} + +type Cluster struct { + logger *logrus.Entry + config Config + etcdHost string + dockerImage string + cluster *spec.Postgresql + pgUsers []pgUser +} + +type pgUser struct { + username string + secretKey string + password []byte + flags []string +} + +func New(cfg Config, spec *spec.Postgresql) *Cluster { + lg := logrus.WithField("pkg", "cluster").WithField("cluster-name", spec.Metadata.Name) + + //TODO: check if image exist + dockerImage := fmt.Sprintf("registry.opensource.zalan.do/acid/spilo-%s", (*spec.Spec).PostgresqlParam.Version) + + cluster := &Cluster{ + config: cfg, + cluster: spec, + logger: lg, + etcdHost: constants.EtcdHost, + dockerImage: dockerImage, + } + cluster.init() + + return cluster +} + +func secretUserKey(userName string) string { + return fmt.Sprintf("%s-password", userName) +} + +func (c *Cluster) init() { + for _, userName := range patroniUsers { + user := pgUser{ + username: userName, + secretKey: secretUserKey(userName), + password: util.RandomPasswordBytes(constants.PasswordLength), + } + c.pgUsers = append(c.pgUsers, user) + } + + for userName, userFlags := range (*c.cluster.Spec).Users { + user := pgUser{ + username: userName, + secretKey: secretUserKey(userName), + password: util.RandomPasswordBytes(constants.PasswordLength), + flags: userFlags, + } + c.pgUsers = append(c.pgUsers, user) + } +} + +func (c *Cluster) Create() error { + c.createEndPoint() + c.createService() + c.applySecrets() + c.createStatefulSet() + + //TODO: wait for "spilo-role" label to appear on each pod + + return nil +} + +func (c *Cluster) Delete() error { + clusterName := (*c.cluster).Metadata.Name + nameSpace := c.config.Namespace + orphanDependents := false + deleteOptions := &v1.DeleteOptions{ + OrphanDependents: &orphanDependents, + } + + listOptions := v1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", clusterName), + } + + kubeClient := c.config.KubeClient + + podList, err := kubeClient.Pods(nameSpace).List(listOptions) + if err != nil { + c.logger.Errorf("Error: %+v", err) + } + + err = kubeClient.StatefulSets(nameSpace).Delete(clusterName, deleteOptions) + if err != nil { + c.logger.Errorf("Error: %+v", err) + } + c.logger.Infof("StatefulSet %s.%s has been deleted\n", nameSpace, clusterName) + + for _, pod := range podList.Items { + err = kubeClient.Pods(nameSpace).Delete(pod.Name, deleteOptions) + if err != nil { + c.logger.Errorf("Error while deleting Pod %s: %+v", pod.Name, err) + return err + } + + c.logger.Infof("Pod %s.%s has been deleted\n", pod.Namespace, pod.Name) + } + + serviceList, err := kubeClient.Services(nameSpace).List(listOptions) + if err != nil { + return err + } + + for _, service := range serviceList.Items { + err = kubeClient.Services(nameSpace).Delete(service.Name, deleteOptions) + if err != nil { + c.logger.Errorf("Error while deleting Service %s: %+v", service.Name, err) + + return err + } + + c.logger.Infof("Service %s.%s has been deleted\n", service.Namespace, service.Name) + } + + err = kubeClient.Secrets(nameSpace).Delete(clusterName, deleteOptions) + if err != nil { + c.logger.Errorf("Error while deleting Secret %s: %+v", clusterName, err) + } + + //TODO: delete key from etcd + + return nil +} diff --git a/pkg/controller/objects.go b/pkg/cluster/objects.go similarity index 56% rename from pkg/controller/objects.go rename to pkg/cluster/objects.go index 186078c48..0e4d2afc5 100644 --- a/pkg/controller/objects.go +++ b/pkg/cluster/objects.go @@ -1,31 +1,16 @@ -package controller +package cluster import ( - "log" - "k8s.io/client-go/pkg/api/resource" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/util/intstr" - "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" ) -func (z *SpiloController) CreateStatefulSet(spilo *spec.Spilo) { - ns := (*spilo).Metadata.Namespace - - statefulSet := z.createSetFromSpilo(spilo) - - _, err := z.Clientset.StatefulSets(ns).Create(&statefulSet) - if err != nil { - log.Printf("Petset error: %+v", err) - } else { - log.Printf("Petset created: %+v", statefulSet) - } -} - -func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.StatefulSet { - clusterName := (*spilo).Metadata.Name +func (c *Cluster) createStatefulSet() { + clusterName := (*c.cluster).Metadata.Name envVars := []v1.EnvVar{ { @@ -38,7 +23,7 @@ func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.Stateful }, { Name: "ETCD_HOST", - Value: spilo.Spec.EtcdHost, + Value: c.etcdHost, }, { Name: "POD_IP", @@ -65,7 +50,7 @@ func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.Stateful LocalObjectReference: v1.LocalObjectReference{ Name: clusterName, }, - Key: "superuser-password", + Key: secretUserKey("superuser"), }, }, }, @@ -76,7 +61,7 @@ func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.Stateful LocalObjectReference: v1.LocalObjectReference{ Name: clusterName, }, - Key: "admin-password", + Key: secretUserKey("admin"), }, }, }, @@ -87,7 +72,7 @@ func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.Stateful LocalObjectReference: v1.LocalObjectReference{ Name: clusterName, }, - Key: "replication-password", + Key: secretUserKey("replication"), }, }, }, @@ -95,17 +80,17 @@ func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.Stateful resourceList := v1.ResourceList{} - if (*spilo).Spec.ResourceCPU != "" { - resourceList[v1.ResourceCPU] = resource.MustParse((*spilo).Spec.ResourceCPU) + if cpu := (*c.cluster).Spec.Resources.Cpu; cpu != "" { + resourceList[v1.ResourceCPU] = resource.MustParse(cpu) } - if (*spilo).Spec.ResourceMemory != "" { - resourceList[v1.ResourceMemory] = resource.MustParse((*spilo).Spec.ResourceMemory) + if memory := (*c.cluster).Spec.Resources.Memory; memory != "" { + resourceList[v1.ResourceMemory] = resource.MustParse(memory) } container := v1.Container{ Name: clusterName, - Image: spilo.Spec.DockerImage, + Image: c.dockerImage, ImagePullPolicy: v1.PullAlways, Resources: v1.ResourceRequirements{ Requests: resourceList, @@ -123,7 +108,7 @@ func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.Stateful VolumeMounts: []v1.VolumeMount{ { Name: "pgdata", - MountPath: "/home/postgres/pgdata", + MountPath: "/home/postgres/pgdata", //TODO: fetch from manifesto }, }, Env: envVars, @@ -153,7 +138,7 @@ func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.Stateful Spec: podSpec, } - return v1beta1.StatefulSet{ + statefulSet := &v1beta1.StatefulSet{ ObjectMeta: v1.ObjectMeta{ Name: clusterName, Labels: map[string]string{ @@ -162,45 +147,67 @@ func (z *SpiloController) createSetFromSpilo(spilo *spec.Spilo) v1beta1.Stateful }, }, Spec: v1beta1.StatefulSetSpec{ - Replicas: &spilo.Spec.NumberOfInstances, + Replicas: &c.cluster.Spec.NumberOfInstances, ServiceName: clusterName, Template: template, }, } + + c.config.KubeClient.StatefulSets(c.config.Namespace).Create(statefulSet) } -func (z *SpiloController) CreateSecrets(ns, name string) { +func (c *Cluster) applySecrets() { + clusterName := (*c.cluster).Metadata.Name + secrets := make(map[string][]byte, len(c.pgUsers)) + for _, user := range c.pgUsers { + secrets[user.secretKey] = user.password + } + secret := v1.Secret{ ObjectMeta: v1.ObjectMeta{ - Name: name, + Name: clusterName, Labels: map[string]string{ "application": "spilo", - "spilo-cluster": name, + "spilo-cluster": clusterName, }, }, Type: v1.SecretTypeOpaque, - Data: map[string][]byte{ - "superuser-password": []byte("emFsYW5kbw=="), - "replication-password": []byte("cmVwLXBhc3M="), - "admin-password": []byte("YWRtaW4="), - }, + Data: secrets, } - _, err := z.Clientset.Secrets(ns).Create(&secret) - if err != nil { - log.Printf("Secret error: %+v", err) + _, err := c.config.KubeClient.Secrets(c.config.Namespace).Get(clusterName) + + //TODO: possible race condition (as well as while creating the other objects) + if !k8sutil.ResourceNotFound(err) { + _, err = c.config.KubeClient.Secrets(c.config.Namespace).Update(&secret) } else { - log.Printf("Secret created: %+v", secret) + _, err = c.config.KubeClient.Secrets(c.config.Namespace).Create(&secret) } + + if err != nil { + c.logger.Errorf("Error while creating or updating secret: %+v", err) + } else { + c.logger.Infof("Secret created: %+v", secret) + } + + //TODO: remove secrets of the deleted users } -func (z *SpiloController) CreateService(ns, name string) { +func (c *Cluster) createService() { + clusterName := (*c.cluster).Metadata.Name + + _, err := c.config.KubeClient.Services(c.config.Namespace).Get(clusterName) + if !k8sutil.ResourceNotFound(err) { + c.logger.Infof("Service '%s' already exists", clusterName) + return + } + service := v1.Service{ ObjectMeta: v1.ObjectMeta{ - Name: name, + Name: clusterName, Labels: map[string]string{ "application": "spilo", - "spilo-cluster": name, + "spilo-cluster": clusterName, }, }, Spec: v1.ServiceSpec{ @@ -209,29 +216,37 @@ func (z *SpiloController) CreateService(ns, name string) { }, } - _, err := z.Clientset.Services(ns).Create(&service) + _, err = c.config.KubeClient.Services(c.config.Namespace).Create(&service) if err != nil { - log.Printf("Service error: %+v", err) + c.logger.Errorf("Error while creating service: %+v", err) } else { - log.Printf("Service created: %+v", service) + c.logger.Infof("Service created: %+v", service) } } -func (z *SpiloController) CreateEndPoint(ns, name string) { +func (c *Cluster) createEndPoint() { + clusterName := (*c.cluster).Metadata.Name + + _, err := c.config.KubeClient.Endpoints(c.config.Namespace).Get(clusterName) + if !k8sutil.ResourceNotFound(err) { + c.logger.Infof("Endpoint '%s' already exists", clusterName) + return + } + endPoint := v1.Endpoints{ ObjectMeta: v1.ObjectMeta{ - Name: name, + Name: clusterName, Labels: map[string]string{ "application": "spilo", - "spilo-cluster": name, + "spilo-cluster": clusterName, }, }, } - _, err := z.Clientset.Endpoints(ns).Create(&endPoint) + _, err = c.config.KubeClient.Endpoints(c.config.Namespace).Create(&endPoint) if err != nil { - log.Printf("Endpoint error: %+v", err) + c.logger.Errorf("Error while creating endpoint: %+v", err) } else { - log.Printf("Endpoint created: %+v", endPoint) + c.logger.Infof("Endpoint created: %+v", endPoint) } } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9bfd2a20f..7be511d2b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -2,329 +2,162 @@ package controller import ( "fmt" - "log" "sync" + "github.com/Sirupsen/logrus" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api/v1" + v1beta1extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" "k8s.io/client-go/pkg/fields" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "github.bus.zalan.do/acid/postgres-operator/pkg/spec" - "github.bus.zalan.do/acid/postgres-operator/pkg/etcd" + "github.bus.zalan.do/acid/postgres-operator/pkg/cluster" + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" ) -const ( - ACTION_DELETE = "delete" - ACTION_UPDATE = "update" - ACTION_ADD = "add" -) - -type podEvent struct { - namespace string - name string - actionType string +type Config struct { + Namespace string + KubeClient *kubernetes.Clientset + RestClient *rest.RESTClient } -type podWatcher struct { - podNamespace string - podName string - eventsChannel chan podEvent - subscribe bool +type Controller struct { + Config + + logger *logrus.Entry + events chan *Event + clusters map[string]*cluster.Cluster + stopChMap map[string]chan struct{} + waitCluster sync.WaitGroup + postgresqlInformer cache.SharedIndexInformer } -type SpiloController struct { - podEvents chan podEvent - podWatchers chan podWatcher - SpiloClient *rest.RESTClient - Clientset *kubernetes.Clientset - etcdApiClient *etcd.EtcdClient - - spiloInformer cache.SharedIndexInformer - podInformer cache.SharedIndexInformer +type Event struct { + Type string + Object *spec.Postgresql } -func podsListWatch(client *kubernetes.Clientset) *cache.ListWatch { - return cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "pods", api.NamespaceAll, fields.Everything()) -} - -func newController(spiloClient *rest.RESTClient, clientset *kubernetes.Clientset, etcdClient *etcd.EtcdClient) *SpiloController { - spiloController := &SpiloController{ - SpiloClient: spiloClient, - Clientset: clientset, - etcdApiClient: etcdClient, - } - - spiloInformer := cache.NewSharedIndexInformer( - cache.NewListWatchFromClient(spiloClient, "spilos", api.NamespaceAll, fields.Everything()), - &spec.Spilo{}, - resyncPeriod, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - spiloInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: spiloController.spiloAdd, - UpdateFunc: spiloController.spiloUpdate, - DeleteFunc: spiloController.spiloDelete, - }) - - podInformer := cache.NewSharedIndexInformer( - podsListWatch(clientset), - &v1.Pod{}, - resyncPeriod, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: spiloController.podAdd, - UpdateFunc: spiloController.podUpdate, - DeleteFunc: spiloController.podDelete, - }) - - spiloController.spiloInformer = spiloInformer - spiloController.podInformer = podInformer - - spiloController.podEvents = make(chan podEvent) - - return spiloController -} - -func (d *SpiloController) podAdd(obj interface{}) { - pod := obj.(*v1.Pod) - d.podEvents <- podEvent{ - namespace: pod.Namespace, - name: pod.Name, - actionType: ACTION_ADD, +func New(cfg *Config) *Controller { + return &Controller{ + Config: *cfg, + logger: logrus.WithField("pkg", "controller"), + clusters: make(map[string]*cluster.Cluster), + stopChMap: map[string]chan struct{}{}, } } -func (d *SpiloController) podDelete(obj interface{}) { - pod := obj.(*v1.Pod) - d.podEvents <- podEvent{ - namespace: pod.Namespace, - name: pod.Name, - actionType: ACTION_DELETE, - } -} - -func (d *SpiloController) podUpdate(old, cur interface{}) { - oldPod := old.(*v1.Pod) - d.podEvents <- podEvent{ - namespace: oldPod.Namespace, - name: oldPod.Name, - actionType: ACTION_UPDATE, - } -} - -func (z *SpiloController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { +func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() wg.Add(1) - if err := EnsureSpiloThirdPartyResource(z.Clientset); err != nil { - log.Fatalf("Couldn't create ThirdPartyResource: %s", err) - } + c.initController() - go z.spiloInformer.Run(stopCh) - go z.podInformer.Run(stopCh) - go z.podWatcher(stopCh) + go c.watchTpr(stopCh) + go c.watchTprEvents(stopCh) + + c.logger.Info("Started working in background") +} + +func (c *Controller) watchTpr(stopCh <-chan struct{}) { + go c.postgresqlInformer.Run(stopCh) <-stopCh } -func (z *SpiloController) spiloAdd(obj interface{}) { - spilo := obj.(*spec.Spilo) +func (c *Controller) watchTprEvents(stopCh <-chan struct{}) { + //fmt.Println("Watching tpr events") - clusterName := (*spilo).Metadata.Name - ns := (*spilo).Metadata.Namespace - - //TODO: check if object already exists before creating - z.CreateEndPoint(ns, clusterName) - z.CreateService(ns, clusterName) - z.CreateSecrets(ns, clusterName) - z.CreateStatefulSet(spilo) + <-stopCh } -func (z *SpiloController) spiloUpdate(old, cur interface{}) { - oldSpilo := old.(*spec.Spilo) - curSpilo := cur.(*spec.Spilo) - - if oldSpilo.Spec.NumberOfInstances != curSpilo.Spec.NumberOfInstances { - z.UpdateStatefulSet(curSpilo) +func (c *Controller) createTPR() error { + tpr := &v1beta1extensions.ThirdPartyResource{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor), + }, + Versions: []v1beta1extensions.APIVersion{ + {Name: constants.TPRApiVersion}, + }, + Description: constants.TPRDescription, } - if oldSpilo.Spec.DockerImage != curSpilo.Spec.DockerImage { - log.Printf("Updating DockerImage: %s.%s", - curSpilo.Metadata.Namespace, - curSpilo.Metadata.Name) + _, err := c.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) - z.UpdateStatefulSetImage(curSpilo) - } - - log.Printf("Update spilo old: %+v\ncurrent: %+v", *oldSpilo, *curSpilo) -} - -func (z *SpiloController) spiloDelete(obj interface{}) { - spilo := obj.(*spec.Spilo) - - err := z.DeleteStatefulSet(spilo.Metadata.Namespace, spilo.Metadata.Name) if err != nil { - log.Printf("Error while deleting stateful set: %+v", err) - } -} - -func (z *SpiloController) DeleteStatefulSet(ns, clusterName string) error { - orphanDependents := false - deleteOptions := v1.DeleteOptions{ - OrphanDependents: &orphanDependents, - } - - listOptions := v1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", clusterName), - } - - podList, err := z.Clientset.Pods(ns).List(listOptions) - if err != nil { - log.Printf("Error: %+v", err) - } - - err = z.Clientset.StatefulSets(ns).Delete(clusterName, &deleteOptions) - if err != nil { - return err - } - log.Printf("StatefulSet %s.%s has been deleted\n", ns, clusterName) - - for _, pod := range podList.Items { - err = z.Clientset.Pods(pod.Namespace).Delete(pod.Name, &deleteOptions) - if err != nil { - log.Printf("Error while deleting Pod %s: %+v", pod.Name, err) + if !k8sutil.IsKubernetesResourceAlreadyExistError(err) { return err - } - - log.Printf("Pod %s.%s has been deleted\n", pod.Namespace, pod.Name) - } - - serviceList, err := z.Clientset.Services(ns).List(listOptions) - if err != nil { - return err - } - - for _, service := range serviceList.Items { - err = z.Clientset.Services(service.Namespace).Delete(service.Name, &deleteOptions) - if err != nil { - log.Printf("Error while deleting Service %s: %+v", service.Name, err) - - return err - } - - log.Printf("Service %s.%s has been deleted\n", service.Namespace, service.Name) - } - - z.etcdApiClient.DeleteEtcdKey(clusterName) - - return nil -} - -func (z *SpiloController) UpdateStatefulSet(spilo *spec.Spilo) { - ns := (*spilo).Metadata.Namespace - - statefulSet := z.createSetFromSpilo(spilo) - _, err := z.Clientset.StatefulSets(ns).Update(&statefulSet) - - if err != nil { - log.Printf("Error while updating StatefulSet: %s", err) - } -} - -func (z *SpiloController) UpdateStatefulSetImage(spilo *spec.Spilo) { - ns := (*spilo).Metadata.Namespace - - z.UpdateStatefulSet(spilo) - - listOptions := v1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", (*spilo).Metadata.Name), - } - - pods, err := z.Clientset.Pods(ns).List(listOptions) - if err != nil { - log.Printf("Error while getting pods: %s", err) - } - - orphanDependents := true - deleteOptions := v1.DeleteOptions{ - OrphanDependents: &orphanDependents, - } - - var masterPodName string - for _, pod := range pods.Items { - log.Printf("Pod processing: %s", pod.Name) - - role, ok := pod.Labels["spilo-role"] - if ok == false { - log.Println("No spilo-role label") - continue - } - if role == "master" { - masterPodName = pod.Name - log.Printf("Skipping master: %s", masterPodName) - continue - } - - err := z.Clientset.Pods(ns).Delete(pod.Name, &deleteOptions) - if err != nil { - log.Printf("Error while deleting Pod %s.%s: %s", pod.Namespace, pod.Name, err) } else { - log.Printf("Pod deleted: %s.%s", pod.Namespace, pod.Name) + c.logger.Info("ThirdPartyResource already registered") } - - w1 := podWatcher{ - podNamespace: pod.Namespace, - podName: pod.Name, - eventsChannel: make(chan podEvent, 1), - subscribe: true, - } - - log.Printf("Watching pod %s.%s being recreated", pod.Namespace, pod.Name) - z.podWatchers <- w1 - for e := range w1.eventsChannel { - if e.actionType == ACTION_ADD { break } - } - - log.Printf("Pod %s.%s has been recreated", pod.Namespace, pod.Name) } - //TODO: do manual failover - err = z.Clientset.Pods(ns).Delete(masterPodName, &deleteOptions) + restClient := c.RestClient + + return k8sutil.WaitTPRReady(restClient, constants.TPRReadyWaitInterval, constants.TPRReadyWaitTimeout, c.Namespace) +} + +func (c *Controller) makeClusterConfig() cluster.Config { + return cluster.Config{ + Namespace: c.Namespace, + KubeClient: c.KubeClient, + RestClient: c.RestClient, + } +} + +func (c *Controller) initController() { + err := c.createTPR() if err != nil { - log.Printf("Error while deleting Pod %s.%s: %s", ns, masterPodName, err) - } else { - log.Printf("Pod deleted: %s.%s", ns, masterPodName) + c.logger.Fatalf("Can't register ThirdPartyResource: %s", err) } + + c.postgresqlInformer = cache.NewSharedIndexInformer( + cache.NewListWatchFromClient(c.RestClient, constants.ResourceName, v1.NamespaceAll, fields.Everything()), + &spec.Postgresql{}, + constants.ResyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.clusterAdd, + UpdateFunc: c.clusterUpdate, + DeleteFunc: c.clusterDelete, + }) } -func (z *SpiloController) podWatcher(stopCh <-chan struct{}) { - //TODO: mind the namespace of the pod +func (c *Controller) clusterAdd(obj interface{}) { + pg := obj.(*spec.Postgresql) - watchers := make(map[string] podWatcher) - for { - select { - case watcher := <-z.podWatchers: - if watcher.subscribe { - watchers[watcher.podName] = watcher - } else { - close(watcher.eventsChannel) - delete(watchers, watcher.podName) - } - case event := <-z.podEvents: - log.Printf("Pod watcher event: %s.%s - %s", event.namespace, event.name, event.actionType) - log.Printf("Current watchers: %+v", watchers) - podWatcher, ok := watchers[event.name] - if ok == false { - continue - } - - podWatcher.eventsChannel <- event - } + if pg.Spec == nil { + return } + + cluster := cluster.New(c.makeClusterConfig(), pg) + cluster.Create() + + c.logger.Infof("Add: %+v", cluster) +} + +func (c *Controller) clusterUpdate(prev, cur interface{}) { + pgPrev := prev.(*spec.Postgresql) + pgCur := cur.(*spec.Postgresql) + + if pgPrev.Spec == nil || pgCur.Spec == nil { + return + } + + c.logger.Infof("Update: %+v -> %+v", *pgPrev.Spec, *pgCur.Spec) +} + +func (c *Controller) clusterDelete(obj interface{}) { + pg := obj.(*spec.Postgresql) + if pg.Spec == nil { + return + } + + cluster := cluster.New(c.makeClusterConfig(), pg) + cluster.Delete() + + c.logger.Infof("Delete: %+v", *pg.Spec) } diff --git a/pkg/controller/operator.go b/pkg/controller/operator.go deleted file mode 100644 index e8d592aa1..000000000 --- a/pkg/controller/operator.go +++ /dev/null @@ -1,108 +0,0 @@ -package controller - -import ( - "fmt" - "log" - "time" - - "net/http" - - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/pkg/api" - apierrors "k8s.io/client-go/pkg/api/errors" - "k8s.io/client-go/pkg/api/unversioned" - "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/pkg/apis/extensions/v1beta1" - "k8s.io/client-go/pkg/runtime" - "k8s.io/client-go/pkg/runtime/serializer" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - - "github.bus.zalan.do/acid/postgres-operator/pkg/spec" -) - -var ( - VENDOR = "acid.zalan.do" - VERSION = "0.0.1.dev" - resyncPeriod = 5 * time.Minute -) - -type Options struct { - KubeConfig string - OutOfCluster bool -} - -func KubernetesConfig(options Options) (config *rest.Config) { - var err error - if options.OutOfCluster { - /* out-of-cluster process */ - rules := clientcmd.NewDefaultClientConfigLoadingRules() - overrides := &clientcmd.ConfigOverrides{} - rules.ExplicitPath = options.KubeConfig - config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() - } else { - /* in-cluster pod */ - config, err = rest.InClusterConfig() - } - if err != nil { - log.Fatalf("Couldn't get Kubernetes default config: %s", err) - } - return -} - -func newKubernetesSpiloClient(c *rest.Config) (*rest.RESTClient, error) { - c.APIPath = "/apis" - c.GroupVersion = &unversioned.GroupVersion{ - Group: VENDOR, - Version: "v1", - } - c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} - - schemeBuilder := runtime.NewSchemeBuilder( - func(scheme *runtime.Scheme) error { - scheme.AddKnownTypes( - *c.GroupVersion, - &spec.Spilo{}, - &spec.SpiloList{}, - &api.ListOptions{}, - &api.DeleteOptions{}, - ) - return nil - }) - schemeBuilder.AddToScheme(api.Scheme) - - return rest.RESTClientFor(c) -} - -//TODO: Move to separate package -func IsKubernetesResourceNotFoundError(err error) bool { - se, ok := err.(*apierrors.StatusError) - if !ok { - return false - } - if se.Status().Code == http.StatusNotFound && se.Status().Reason == unversioned.StatusReasonNotFound { - return true - } - return false -} - -func EnsureSpiloThirdPartyResource(client *kubernetes.Clientset) error { - // The resource doesn't exist, so we create it. - tpr := v1beta1.ThirdPartyResource{ - ObjectMeta: v1.ObjectMeta{ - Name: fmt.Sprintf("spilo.%s", VENDOR), - }, - Description: "A specification of Spilo StatefulSets", - Versions: []v1beta1.APIVersion{ - {Name: "v1"}, - }, - } - - _, err := client.ExtensionsV1beta1().ThirdPartyResources().Create(&tpr) - - if IsKubernetesResourceNotFoundError(err) { - return err - } - - return nil -} diff --git a/pkg/controller/spilo.go b/pkg/controller/spilo.go deleted file mode 100644 index 8523c33b3..000000000 --- a/pkg/controller/spilo.go +++ /dev/null @@ -1,73 +0,0 @@ -package controller - -import ( - "log" - "sync" - "net/url" - "fmt" - "strings" - - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - - "github.bus.zalan.do/acid/postgres-operator/pkg/etcd" -) - -type SpiloOperator struct { - Options - - ClientSet *kubernetes.Clientset - Client *rest.RESTClient - Controller *SpiloController - EtcdClient *etcd.EtcdClient -} - - -func getEtcdServiceName(cls *kubernetes.Clientset, config *rest.Config, outOfCluster bool) (etcdServiceName string) { - etcdService, _ := cls.Services("default").Get("etcd-client") - if outOfCluster { - ports := etcdService.Spec.Ports[0] - if ports.NodePort == 0 { - log.Fatal("Etcd port is not exposed\nHint: add NodePort to your Etcd service") - } - nodeurl, _ := url.Parse(config.Host) - etcdServiceName = fmt.Sprintf("http://%s:%d", strings.Split(nodeurl.Host, ":")[0], ports.NodePort) - } else { - if len(etcdService.Spec.Ports) != 1 { - log.Fatal("Can't find Etcd service named 'etcd-client'") - } - etcdServiceName = fmt.Sprintf("%s.%s.svc.cluster.local", etcdService.Name, etcdService.Namespace) - } - return -} - -func New(options Options) *SpiloOperator { - config := KubernetesConfig(options) - - spiloClient, err := newKubernetesSpiloClient(config) - if err != nil { - log.Fatalf("Couldn't create Spilo client: %s", err) - } - - clientSet, err := kubernetes.NewForConfig(config) - if err != nil { - log.Fatalf("Couldn't create Kubernetes client: %s", err) - } - - etcdClient := etcd.NewEctdClient(getEtcdServiceName(clientSet, config, options.OutOfCluster)) - - operator := &SpiloOperator{ - Options: options, - ClientSet: clientSet, - Client: spiloClient, - Controller: newController(spiloClient, clientSet, etcdClient), - } - - return operator -} - -func (o *SpiloOperator) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { - go o.Controller.Run(stopCh, wg) - - log.Println("Started working in background") -} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go deleted file mode 100644 index e51016d91..000000000 --- a/pkg/etcd/etcd.go +++ /dev/null @@ -1,54 +0,0 @@ -package etcd - -import ( - "fmt" - "github.com/coreos/etcd/client" - "golang.org/x/net/context" - "log" - "time" -) - -const etcdKeyTemplate = "/service/%s" - -type EtcdClient struct { - apiClient client.KeysAPI -} - -func NewEctdClient(host string) *EtcdClient { - etcdClient := EtcdClient{} - - cfg := client.Config{ - Endpoints: []string{host}, - Transport: client.DefaultTransport, - HeaderTimeoutPerRequest: time.Second, - } - - c, err := client.New(cfg) - if err != nil { - log.Fatal(err) - } - - etcdClient.apiClient = client.NewKeysAPI(c) - - return &etcdClient -} - - -func (c *EtcdClient) DeleteEtcdKey(clusterName string) error { - options := client.DeleteOptions{ - Recursive: true, - } - - keyName := fmt.Sprintf(etcdKeyTemplate, clusterName) - - resp, err := c.apiClient.Delete(context.Background(), keyName, &options) - if resp != nil { - log.Printf("Response: %+v", *resp) - } else { - log.Fatal("No response from etcd") - } - - log.Printf("Deleting key %s from ETCD", clusterName) - - return err -} diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go new file mode 100644 index 000000000..765548d59 --- /dev/null +++ b/pkg/spec/postgresql.go @@ -0,0 +1,139 @@ +package spec + +import ( + "encoding/json" + + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/meta" + "k8s.io/client-go/pkg/api/unversioned" +) + +type MaintenanceWindow struct { + StartTime string + EndTime string + //StartTime time.Time // Start time + //StartWeekday time.Weekday // Start weekday + // + //EndTime time.Time // End time + //EndWeekday time.Weekday // End weekday +} + +type Volume struct { + Size string `json:"size"` + StorageClass string `json:"storageClass"` +} + +type PostgresqlParam struct { + Version string `json:"version"` + Parameters map[string]string `json:"parameters"` +} + +type Resources struct { + Cpu string `json:"cpu"` + Memory string `json:"memory"` +} + +type Patroni struct { + InitDB map[string]string `json:"initdb"` + PgHba []string `json:"pg_hba"` + TTL uint32 `json:"ttl"` + LoopWait uint32 `json:"loop_wait"` + RetryTimeout uint32 `json:"retry_timeout"` + MaximumLagOnFailover float32 `json:"maximum_lag_on_failover"` // float32 because https://github.com/kubernetes/kubernetes/issues/30213 +} + +type UserFlags []string + +type PostgresSpec struct { + Resources `json:"resources,omitempty"` + Patroni `json:"patroni,omitempty"` + PostgresqlParam `json:"postgresql"` + Volume `json:"volume,omitempty"` + + NumberOfInstances int32 `json:"numberOfInstances"` + Users map[string]UserFlags `json:"users"` + MaintenanceWindows []string `json:"maintenanceWindows,omitempty"` + + EtcdHost string + DockerImage string +} + +type PostgresStatus struct { + // Phase is the cluster running phase + Phase string `json:"phase"` + Reason string `json:"reason"` + + // ControlPuased indicates the operator pauses the control of the cluster. + ControlPaused bool `json:"controlPaused"` + + // Size is the current size of the cluster + Size int `json:"size"` + // CurrentVersion is the current cluster version + CurrentVersion string `json:"currentVersion"` + // TargetVersion is the version the cluster upgrading to. + // If the cluster is not upgrading, TargetVersion is empty. + TargetVersion string `json:"targetVersion"` +} + +// PostgreSQL Third Party (resource) Object +type Postgresql struct { + unversioned.TypeMeta `json:",inline"` + Metadata api.ObjectMeta `json:"metadata"` + + Spec *PostgresSpec `json:"spec"` + Status *PostgresStatus `json:"status"` +} + +type PostgresqlList struct { + unversioned.TypeMeta `json:",inline"` + Metadata unversioned.ListMeta `json:"metadata"` + + Items []Postgresql `json:"items"` +} + +func (p *Postgresql) GetObjectKind() unversioned.ObjectKind { + return &p.TypeMeta +} + +func (p *Postgresql) GetObjectMeta() meta.Object { + return &p.Metadata +} + +func (pl *PostgresqlList) GetObjectKind() unversioned.ObjectKind { + return &pl.TypeMeta +} + +func (pl *PostgresqlList) GetListMeta() unversioned.List { + return &pl.Metadata +} + +// The code below is used only to work around a known problem with third-party +// resources and ugorji. If/when these issues are resolved, the code below +// should no longer be required. +// +type PostgresqlListCopy PostgresqlList +type PostgresqlCopy Postgresql + +func (p *Postgresql) UnmarshalJSON(data []byte) error { + tmp := PostgresqlCopy{} + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + tmp2 := Postgresql(tmp) + *p = tmp2 + + return nil +} + +func (pl *PostgresqlList) UnmarshalJSON(data []byte) error { + tmp := PostgresqlListCopy{} + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + tmp2 := PostgresqlList(tmp) + *pl = tmp2 + + return nil +} diff --git a/pkg/spec/spilo.go b/pkg/spec/spilo.go deleted file mode 100644 index 3d372f301..000000000 --- a/pkg/spec/spilo.go +++ /dev/null @@ -1,78 +0,0 @@ -package spec - -import( - "encoding/json" - - "k8s.io/client-go/pkg/api/meta" - "k8s.io/client-go/pkg/api/unversioned" - "k8s.io/client-go/pkg/api" -) - -type Pgconf struct { - Parameter string `json:"param"` - Value string `json:"value"` -} - -type SpiloSpec struct { - EtcdHost string `json:"etcd_host"` - VolumeSize int `json:"volume_size"` - NumberOfInstances int32 `json:"number_of_instances"` - DockerImage string `json:"docker_image"` - PostgresConfiguration []Pgconf `json:"postgres_configuration"` - ResourceCPU string `json:"resource_cpu"` - ResourceMemory string `json:"resource_memory"` -} - -type Spilo struct { - unversioned.TypeMeta `json:",inline"` - Metadata api.ObjectMeta `json:"metadata"` - Spec SpiloSpec `json:"spec"` -} - -type SpiloList struct { - unversioned.TypeMeta `json:",inline"` - Metadata unversioned.ListMeta `json:"metadata"` - Items []Spilo `json:"items"` -} - -func (s *Spilo) GetObjectKind() unversioned.ObjectKind { - return &s.TypeMeta -} - -func (s *Spilo) GetObjectMeta() meta.Object { - return &s.Metadata -} -func (sl *SpiloList) GetObjectKind() unversioned.ObjectKind { - return &sl.TypeMeta -} - -func (sl *SpiloList) GetListMeta() unversioned.List { - return &sl.Metadata -} - -type SpiloListCopy SpiloList -type SpiloCopy Spilo - -func (e *Spilo) UnmarshalJSON(data []byte) error { - tmp := SpiloCopy{} - err := json.Unmarshal(data, &tmp) - if err != nil { - return err - } - tmp2 := Spilo(tmp) - *e = tmp2 - - return nil -} - -func (el *SpiloList) UnmarshalJSON(data []byte) error { - tmp := SpiloListCopy{} - err := json.Unmarshal(data, &tmp) - if err != nil { - return err - } - tmp2 := SpiloList(tmp) - *el = tmp2 - - return nil -} diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go new file mode 100644 index 000000000..c577a5799 --- /dev/null +++ b/pkg/util/constants/constants.go @@ -0,0 +1,19 @@ +package constants + +import "time" + +const ( + TPRName = "postgresql" + TPRVendor = "acid.zalan.do" + TPRDescription = "Managed PostgreSQL clusters" + TPRReadyWaitInterval = 3 * time.Second + TPRReadyWaitTimeout = 30 * time.Second + TPRApiVersion = "v1" + + ResourceName = TPRName + "s" + ResyncPeriod = 5 * time.Minute + + EtcdHost = "etcd-client.default.svc.cluster.local:2379" //TODO: move to the operator spec + + PasswordLength = 64 +) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go new file mode 100644 index 000000000..b118df506 --- /dev/null +++ b/pkg/util/k8sutil/k8sutil.go @@ -0,0 +1,66 @@ +package k8sutil + +import ( + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api" + apierrors "k8s.io/client-go/pkg/api/errors" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/pkg/runtime" + "k8s.io/client-go/pkg/runtime/serializer" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" +) + +func RestConfig(kubeConfig string, outOfCluster bool) (config *rest.Config, err error) { + if outOfCluster { + /* out-of-cluster process */ + rules := clientcmd.NewDefaultClientConfigLoadingRules() + overrides := &clientcmd.ConfigOverrides{} + rules.ExplicitPath = kubeConfig + config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig() + } else { + /* in-cluster pod */ + config, err = rest.InClusterConfig() + } + + return +} + +func KubernetesClient(config *rest.Config) (client *kubernetes.Clientset, err error) { + return kubernetes.NewForConfig(config) +} + +func IsKubernetesResourceAlreadyExistError(err error) bool { + return apierrors.IsAlreadyExists(err) +} + +func ResourceNotFound(err error) bool { + return apierrors.IsNotFound(err) +} + +func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) { + c.APIPath = "/apis" + c.GroupVersion = &unversioned.GroupVersion{ + Group: constants.TPRVendor, + Version: constants.TPRApiVersion, + } + c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} + + schemeBuilder := runtime.NewSchemeBuilder( + func(scheme *runtime.Scheme) error { + scheme.AddKnownTypes( + *c.GroupVersion, + &spec.Postgresql{}, + &spec.PostgresqlList{}, + &api.ListOptions{}, + &api.DeleteOptions{}, + ) + return nil + }) + schemeBuilder.AddToScheme(api.Scheme) + + return rest.RESTClientFor(c) +} diff --git a/pkg/util/k8sutil/tpr_util.go b/pkg/util/k8sutil/tpr_util.go new file mode 100644 index 000000000..fa40360d2 --- /dev/null +++ b/pkg/util/k8sutil/tpr_util.go @@ -0,0 +1,28 @@ +package k8sutil + +import ( + "fmt" + "time" + + "k8s.io/client-go/rest" + + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil" +) + +func listClustersURI(ns string) string { + return fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s", constants.TPRVendor, constants.TPRApiVersion, ns, constants.ResourceName) +} + +func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { + return retryutil.Retry(interval, int(timeout/interval), func() (bool, error) { + _, err := restclient.Get().RequestURI(listClustersURI(ns)).DoRaw() + if err != nil { + if ResourceNotFound(err) { // not set up yet. wait more. + return false, nil + } + return false, err + } + return true, nil + }) +} diff --git a/pkg/util/retryutil/retry_util.go b/pkg/util/retryutil/retry_util.go new file mode 100644 index 000000000..6e81f6599 --- /dev/null +++ b/pkg/util/retryutil/retry_util.go @@ -0,0 +1,43 @@ +package retryutil + +import ( + "fmt" + "time" +) + +type RetryError struct { + n int +} + +func (e *RetryError) Error() string { + return fmt.Sprintf("still failing after %d retries", e.n) +} + +type ConditionFunc func() (bool, error) + +// Retry retries f every interval until after maxRetries. +// The interval won't be affected by how long f takes. +// For example, if interval is 3s, f takes 1s, another f will be called 2s later. +// However, if f takes longer than interval, it will be delayed. +func Retry(interval time.Duration, maxRetries int, f ConditionFunc) error { + if maxRetries <= 0 { + return fmt.Errorf("maxRetries (%d) should be > 0", maxRetries) + } + tick := time.NewTicker(interval) + defer tick.Stop() + + for i := 0; ; i++ { + ok, err := f() + if err != nil { + return err + } + if ok { + return nil + } + if i+1 == maxRetries { + break + } + <-tick.C + } + return &RetryError{maxRetries} +} diff --git a/pkg/util/util.go b/pkg/util/util.go new file mode 100644 index 000000000..c2232341d --- /dev/null +++ b/pkg/util/util.go @@ -0,0 +1,21 @@ +package util + +import ( + "math/rand" + "time" +) + +var passwordChars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*=") + +func init() { + rand.Seed(int64(time.Now().Unix())) +} + +func RandomPasswordBytes(n int) []byte { + b := make([]byte, n) + for i := range b { + b[i] = passwordChars[rand.Intn(len(passwordChars))] + } + + return b +}