diff --git a/manifests/testpostgresql.yaml b/manifests/testpostgresql.yaml index cb8a17774..bb0008d6c 100644 --- a/manifests/testpostgresql.yaml +++ b/manifests/testpostgresql.yaml @@ -5,25 +5,31 @@ metadata: name: testcluster spec: + teamId: "50051320" volume: size: 100Gi storageClass: gp2 numberOfInstances: 3 + users: #Application/Robot users + jdoe: + - superuser + - createdb + rmiles: + rroe: + allowedSourceRanges: #Load balancer source ranges + - 127.0.0.1/32 + +#Expert section + pamUsersSecret: human-users postgresql: version: "9.6" parameters: shared_buffers: "500MB" max_connections: "10" log_statement: "all" - users: - jdoe: - - superuser - - createdb - rmiles: - rroe: resources: - cpu: 100m - memory: 500Mi + cpu: 10m + memory: 20Mi patroni: initdb: encoding: "UTF8" diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 2bb37c0b0..85ceeca70 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -3,25 +3,45 @@ package cluster // Postgres ThirdPartyResource object i.e. Spilo import ( + "context" + "database/sql" "fmt" + "regexp" + "strings" + "sync" "github.com/Sirupsen/logrus" + etcdclient "github.com/coreos/etcd/client" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/labels" "k8s.io/client-go/rest" "github.bus.zalan.do/acid/postgres-operator/pkg/spec" "github.bus.zalan.do/acid/postgres-operator/pkg/util" "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil" ) -var patroniUsers = []string{"superuser", "replication"} +var ( + superUsername = "superuser" + replicationUsername = "replication" + + alphaNumericRegexp = regexp.MustCompile("^[a-zA-Z0-9]*$") +) //TODO: remove struct duplication type Config struct { Namespace string KubeClient *kubernetes.Clientset //TODO: move clients to the better place? RestClient *rest.RESTClient + EtcdClient etcdclient.KeysAPI +} + +type pgUser struct { + name string + password string + flags []string } type Cluster struct { @@ -30,35 +50,29 @@ type Cluster struct { etcdHost string dockerImage string cluster *spec.Postgresql - pgUsers []pgUser -} + pgUsers map[string]pgUser -type pgUser struct { - username []byte - password []byte - flags []string + pgDb *sql.DB + mu sync.Mutex } func New(cfg Config, spec *spec.Postgresql) *Cluster { lg := logrus.WithField("pkg", "cluster").WithField("cluster-name", spec.Metadata.Name) - //TODO: check if image exist - dockerImage := fmt.Sprintf("registry.opensource.zalan.do/acid/spilo-%s", (*spec.Spec).PostgresqlParam.Version) - cluster := &Cluster{ config: cfg, cluster: spec, logger: lg, etcdHost: constants.EtcdHost, - dockerImage: dockerImage, + dockerImage: constants.SpiloImage, } cluster.init() return cluster } -func (c *Cluster) labels() map[string]string { - return map[string]string{ +func (c *Cluster) labelsSet() labels.Set { + return labels.Set{ "application": "spilo", "spilo-cluster": (*c.cluster).Metadata.Name, } @@ -73,23 +87,143 @@ func (c *Cluster) credentialSecretName(userName string) string { constants.TPRVendor) } -func (c *Cluster) init() { - for _, userName := range patroniUsers { - user := pgUser{ - username: []byte(userName), - password: util.RandomPasswordBytes(constants.PasswordLength), +func isValidUsername(username string) bool { + return alphaNumericRegexp.MatchString(username) +} + +func validateUserFlags(userFlags []string) (flags []string, err error) { + uniqueFlags := make(map[string]bool) + + for _, flag := range userFlags { + if !alphaNumericRegexp.MatchString(flag) { + err = fmt.Errorf("User flag '%s' is not alphanumeric", flag) + return + } else { + flag = strings.ToUpper(flag) + if _, ok := uniqueFlags[flag]; !ok { + uniqueFlags[flag] = true + } } - c.pgUsers = append(c.pgUsers, user) } - for userName, userFlags := range (*c.cluster.Spec).Users { - user := pgUser{ - username: []byte(userName), - password: util.RandomPasswordBytes(constants.PasswordLength), - flags: userFlags, - } - c.pgUsers = append(c.pgUsers, user) + if _, ok := uniqueFlags["NOLOGIN"]; !ok { + uniqueFlags["LOGIN"] = true } + + flags = []string{} + for k := range uniqueFlags { + flags = append(flags, k) + } + + return +} + +func (c *Cluster) init() { + users := (*c.cluster.Spec).Users + c.pgUsers = make(map[string]pgUser, len(users)+2) // + [superuser and replication] + + c.pgUsers[superUsername] = pgUser{ + name: superUsername, + password: util.RandomPassword(constants.PasswordLength), + } + + c.pgUsers[replicationUsername] = pgUser{ + name: replicationUsername, + password: util.RandomPassword(constants.PasswordLength), + } + + for userName, userFlags := range users { + if !isValidUsername(userName) { + c.logger.Warningf("Invalid '%s' username", userName) + continue + } + + flags, err := validateUserFlags(userFlags) + if err != nil { + c.logger.Warningf("Invalid flags for user '%s': %s", userName, err) + } + + c.pgUsers[userName] = pgUser{ + name: userName, + password: util.RandomPassword(constants.PasswordLength), + flags: flags, + } + } +} + + +func (c *Cluster) waitPodsDestroy() error { + ls := c.labelsSet() + + listOptions := v1.ListOptions{ + LabelSelector: ls.String(), + } + return retryutil.Retry( + constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval), + func() (bool, error) { + pods, err := c.config.KubeClient.Pods(c.config.Namespace).List(listOptions) + if err != nil { + return false, err + } + + return len(pods.Items) == 0, nil + }) +} + +func (c *Cluster) waitStatefulsetReady() error { + return retryutil.Retry(constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval), + func() (bool, error) { + listOptions := v1.ListOptions{ + LabelSelector: c.labelsSet().String(), + } + ss, err := c.config.KubeClient.StatefulSets(c.config.Namespace).List(listOptions) + if err != nil { + return false, err + } + + if len(ss.Items) != 1 { + return false, fmt.Errorf("StatefulSet is not found") + } + + return *ss.Items[0].Spec.Replicas == ss.Items[0].Status.Replicas, nil + }) +} + +func (c *Cluster) waitPodLabelsReady() error { + ls := c.labelsSet() + + listOptions := v1.ListOptions{ + LabelSelector: ls.String(), + } + masterListOption := v1.ListOptions{ + LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "master"}).String(), + } + replicaListOption := v1.ListOptions{ + LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "replica"}).String(), + } + pods, err := c.config.KubeClient.Pods(c.config.Namespace).List(listOptions) + if err != nil { + return err + } + podsNumber := len(pods.Items) + + return retryutil.Retry( + constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval), + func() (bool, error) { + masterPods, err := c.config.KubeClient.Pods(c.config.Namespace).List(masterListOption) + if err != nil { + return false, err + } + replicaPods, err := c.config.KubeClient.Pods(c.config.Namespace).List(replicaListOption) + if err != nil { + return false, err + } + if len(masterPods.Items) > 1 { + return false, fmt.Errorf("Too many masters") + } + + return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil + }) } func (c *Cluster) Create() error { @@ -98,7 +232,36 @@ func (c *Cluster) Create() error { c.applySecrets() c.createStatefulSet() - //TODO: wait for "spilo-role" label to appear on each pod + c.logger.Info("Waiting for cluster being ready") + err := c.waitClusterReady() + if err != nil { + c.logger.Errorf("Failed to create cluster: %s", err) + return err + } + c.logger.Info("Cluster is ready") + + err = c.initDbConn() + if err != nil { + return fmt.Errorf("Failed to init db connection: %s", err) + } + + c.createUsers() + + return nil +} + +func (c *Cluster) waitClusterReady() error { + // TODO: wait for the first Pod only + err := c.waitStatefulsetReady() + if err != nil { + return fmt.Errorf("Statuful set error: %s", err) + } + + // TODO: wait only for master + err = c.waitPodLabelsReady() + if err != nil { + return fmt.Errorf("Pod labels error: %s", err) + } return nil } @@ -112,46 +275,43 @@ func (c *Cluster) Delete() error { } listOptions := v1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", "spilo-cluster", clusterName), + LabelSelector: c.labelsSet().String(), } kubeClient := c.config.KubeClient podList, err := kubeClient.Pods(nameSpace).List(listOptions) if err != nil { - c.logger.Errorf("Error: %+v", err) + return fmt.Errorf("Can't get list of pods: %s", err) } err = kubeClient.StatefulSets(nameSpace).Delete(clusterName, deleteOptions) if err != nil { - c.logger.Errorf("Error: %+v", err) + return fmt.Errorf("Can't delete statefulset: %s", err) } - c.logger.Infof("StatefulSet %s.%s has been deleted\n", nameSpace, clusterName) + c.logger.Infof("StatefulSet %s.%s has been deleted", nameSpace, clusterName) for _, pod := range podList.Items { err = kubeClient.Pods(nameSpace).Delete(pod.Name, deleteOptions) if err != nil { - c.logger.Errorf("Error while deleting Pod %s: %+v", pod.Name, err) - return err + return fmt.Errorf("Error while deleting pod %s.%s: %s", pod.Name, pod.Namespace, err) } - c.logger.Infof("Pod %s.%s has been deleted\n", pod.Namespace, pod.Name) + c.logger.Infof("Pod %s.%s has been deleted", pod.Name, pod.Namespace) } serviceList, err := kubeClient.Services(nameSpace).List(listOptions) if err != nil { - return err + return fmt.Errorf("Can't get list of the services: %s", err) } for _, service := range serviceList.Items { err = kubeClient.Services(nameSpace).Delete(service.Name, deleteOptions) if err != nil { - c.logger.Errorf("Error while deleting Service %s: %+v", service.Name, err) - - return err + return fmt.Errorf("Can't delete service %s.%s: %s", service.Name, service.Namespace, err) } - c.logger.Infof("Service %s.%s has been deleted\n", service.Namespace, service.Name) + c.logger.Infof("Service %s.%s has been deleted", service.Name, service.Namespace) } secretsList, err := kubeClient.Secrets(nameSpace).List(listOptions) @@ -161,15 +321,31 @@ func (c *Cluster) Delete() error { for _, secret := range secretsList.Items { err = kubeClient.Secrets(nameSpace).Delete(secret.Name, deleteOptions) if err != nil { - c.logger.Errorf("Error while deleting Secret %s: %+v", secret.Name, err) - - return err + return fmt.Errorf("Can't delete secret %s.%s: %s", secret.Name, secret.Namespace, err) } - c.logger.Infof("Secret %s.%s has been deleted\n", secret.Namespace, secret.Name) + c.logger.Infof("Secret %s.%s has been deleted", secret.Name, secret.Namespace) } - //TODO: delete key from etcd + c.waitPodsDestroy() + + etcdKey := fmt.Sprintf("/service/%s", clusterName) + + resp, err := c.config.EtcdClient.Delete(context.Background(), + etcdKey, + &etcdclient.DeleteOptions{Recursive: true}) + + if err != nil { + return fmt.Errorf("Can't delete etcd key: %s", err) + } + + if resp == nil { + c.logger.Warningf("No response from etcd cluster") + } + + c.logger.Infof("Etcd key '%s' has been deleted", etcdKey) + + //TODO: Ensure objects are deleted return nil } diff --git a/pkg/cluster/pg.go b/pkg/cluster/pg.go new file mode 100644 index 000000000..a99ff1bfd --- /dev/null +++ b/pkg/cluster/pg.go @@ -0,0 +1,39 @@ +package cluster + +import ( + "database/sql" + "fmt" + + _ "github.com/lib/pq" + "strings" +) + +func (c *Cluster) pgConnectionString() string { + hostname := fmt.Sprintf("%s.%s.svc.cluster.local", (*c.cluster).Metadata.Name, (*c.cluster).Metadata.Namespace) + password := c.pgUsers[superUsername].password + + return fmt.Sprintf("host='%s' dbname=postgres sslmode=require user=postgres password='%s'", + hostname, + strings.Replace(password, "$", "\\$", -1)) +} + +func (c *Cluster) initDbConn() error { + if c.pgDb == nil { + c.mu.Lock() + defer c.mu.Unlock() + if c.pgDb == nil { + conn, err := sql.Open("postgres", c.pgConnectionString()) + if err != nil { + return err + } + err = conn.Ping() + if err != nil { + return err + } + + c.pgDb = conn + } + } + + return nil +} diff --git a/pkg/cluster/objects.go b/pkg/cluster/resources.go similarity index 60% rename from pkg/cluster/objects.go rename to pkg/cluster/resources.go index 5760ec75d..8ac6b857d 100644 --- a/pkg/cluster/objects.go +++ b/pkg/cluster/resources.go @@ -6,9 +6,24 @@ import ( "k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/util/intstr" + "fmt" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" + "strings" ) +var createUserSQL = `DO $$ +BEGIN + SET local synchronous_commit = 'local'; + PERFORM * FROM pg_authid WHERE rolname = '%s'; + IF FOUND THEN + ALTER ROLE "%s" WITH %s PASSWORD '%s'; + ELSE + CREATE ROLE "%s" WITH %s PASSWORD '%s'; + END IF; +END; +$$` + func (c *Cluster) createStatefulSet() { clusterName := (*c.cluster).Metadata.Name @@ -65,6 +80,24 @@ func (c *Cluster) createStatefulSet() { }, }, }, + { + Name: "PAM_OAUTH2", //TODO: get from the operator tpr spec + Value: "https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees", //space before uid is obligatory + }, + { + Name: "SPILO_CONFIGURATION", //TODO: get from the operator tpr spec + Value: fmt.Sprintf(` +postgresql: + bin_dir: /usr/lib/postgresql/%s/bin +bootstrap: + initdb: + - auth-host: md5 + - auth-local: trust + pg_hba: + - hostnossl all all all reject + - hostssl all +%s all pam + - hostssl all all all md5`, (*c.cluster.Spec).Version, constants.PamRoleName), + }, } resourceList := v1.ResourceList{} @@ -118,7 +151,7 @@ func (c *Cluster) createStatefulSet() { template := v1.PodTemplateSpec{ ObjectMeta: v1.ObjectMeta{ - Labels: c.labels(), + Labels: c.labelsSet(), Annotations: map[string]string{"pod.alpha.kubernetes.io/initialized": "true"}, }, Spec: podSpec, @@ -127,7 +160,7 @@ func (c *Cluster) createStatefulSet() { statefulSet := &v1beta1.StatefulSet{ ObjectMeta: v1.ObjectMeta{ Name: clusterName, - Labels: c.labels(), + Labels: c.labelsSet(), }, Spec: v1beta1.StatefulSetSpec{ Replicas: &c.cluster.Spec.NumberOfInstances, @@ -140,32 +173,46 @@ func (c *Cluster) createStatefulSet() { } func (c *Cluster) applySecrets() { + //TODO: do not override current secrets + var err error - for _, user := range c.pgUsers { + for userName, pgUser := range c.pgUsers { secret := v1.Secret{ ObjectMeta: v1.ObjectMeta{ - Name: c.credentialSecretName(string(user.username)), - Labels: c.labels(), + Name: c.credentialSecretName(userName), + Labels: c.labelsSet(), }, Type: v1.SecretTypeOpaque, Data: map[string][]byte{ - "username": user.username, - "password": user.password, + "username": []byte(pgUser.name), + "password": []byte(pgUser.password), }, } _, err = c.config.KubeClient.Secrets(c.config.Namespace).Create(&secret) if k8sutil.IsKubernetesResourceAlreadyExistError(err) { - _, err = c.config.KubeClient.Secrets(c.config.Namespace).Update(&secret) + c.logger.Infof("Skipping update of '%s'", secret.Name) + + curSecrets, err := c.config.KubeClient.Secrets(c.config.Namespace).Get(c.credentialSecretName(userName)) if err != nil { - c.logger.Errorf("Error while updating secret: %+v", err) - } else { - c.logger.Infof("Secret updated: %+v", secret) + c.logger.Errorf("Can't get current secret: %s", err) } + user := pgUser + user.password = string(curSecrets.Data["password"]) + c.pgUsers[userName] = user + c.logger.Infof("Password fetched for user '%s' from the secrets", userName) + + continue + //_, err = c.config.KubeClient.Secrets(c.config.Namespace).Update(&secret) + //if err != nil { + // c.logger.Errorf("Error while updating secret: %+v", err) + //} else { + // c.logger.Infof("Secret updated: %+v", secret) + //} } else { if err != nil { c.logger.Errorf("Error while creating secret: %+v", err) } else { - c.logger.Infof("Secret created: %+v", secret) + c.logger.Infof("Secret created: %s", secret.Name) } } } @@ -185,11 +232,12 @@ func (c *Cluster) createService() { service := v1.Service{ ObjectMeta: v1.ObjectMeta{ Name: clusterName, - Labels: c.labels(), + Labels: c.labelsSet(), }, Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeClusterIP, + Type: v1.ServiceTypeLoadBalancer, Ports: []v1.ServicePort{{Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, + LoadBalancerSourceRanges: (*c.cluster).Spec.AllowedSourceRanges, }, } @@ -197,7 +245,7 @@ func (c *Cluster) createService() { if err != nil { c.logger.Errorf("Error while creating service: %+v", err) } else { - c.logger.Infof("Service created: %+v", service) + c.logger.Infof("Service created: %s", service.Name) } } @@ -213,7 +261,7 @@ func (c *Cluster) createEndPoint() { endPoint := v1.Endpoints{ ObjectMeta: v1.ObjectMeta{ Name: clusterName, - Labels: c.labels(), + Labels: c.labelsSet(), }, } @@ -221,6 +269,43 @@ func (c *Cluster) createEndPoint() { if err != nil { c.logger.Errorf("Error while creating endpoint: %+v", err) } else { - c.logger.Infof("Endpoint created: %+v", endPoint) + c.logger.Infof("Endpoint created: %s", endPoint.Name) } } + +func (c *Cluster) createUser(user pgUser) { + var userType string + var flags []string = user.flags + + if user.password == "" { + userType = "human" + flags = append(flags, fmt.Sprintf("IN ROLE '%s'", constants.PamRoleName)) + } else { + userType = "app" + } + + userFlags := strings.Join(flags, " ") + query := fmt.Sprintf(createUserSQL, + user.name, + user.name, userFlags, user.password, + user.name, userFlags, user.password) + + _, err := c.pgDb.Query(query) + if err != nil { + c.logger.Errorf("Can't create %s user '%s': %s", user.name, err) + } else { + c.logger.Infof("%s user '%s' with flags %s has been created", userType, user.name, flags) + } +} + +func (c *Cluster) createUsers() error { + for userName, user := range c.pgUsers { + if userName == superUsername || userName == replicationUsername { + continue + } + + c.createUser(user) + } + + return nil +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 7be511d2b..fc0488af9 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/Sirupsen/logrus" + etcdclient "github.com/coreos/etcd/client" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" v1beta1extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" @@ -22,16 +23,18 @@ type Config struct { Namespace string KubeClient *kubernetes.Clientset RestClient *rest.RESTClient + EtcdClient etcdclient.KeysAPI } type Controller struct { - Config + config Config + + logger *logrus.Entry + events chan *Event + clusters map[string]*cluster.Cluster + stopChMap map[string]chan struct{} + waitCluster sync.WaitGroup - logger *logrus.Entry - events chan *Event - clusters map[string]*cluster.Cluster - stopChMap map[string]chan struct{} - waitCluster sync.WaitGroup postgresqlInformer cache.SharedIndexInformer } @@ -42,10 +45,10 @@ type Event struct { func New(cfg *Config) *Controller { return &Controller{ - Config: *cfg, + config: *cfg, logger: logrus.WithField("pkg", "controller"), clusters: make(map[string]*cluster.Cluster), - stopChMap: map[string]chan struct{}{}, + stopChMap: make(map[string]chan struct{}), } } @@ -54,9 +57,14 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { wg.Add(1) c.initController() + err := c.initEtcdClient() + if err != nil { + c.logger.Errorf("Can't get etcd client: %s", err) + + return + } go c.watchTpr(stopCh) - go c.watchTprEvents(stopCh) c.logger.Info("Started working in background") } @@ -67,12 +75,6 @@ func (c *Controller) watchTpr(stopCh <-chan struct{}) { <-stopCh } -func (c *Controller) watchTprEvents(stopCh <-chan struct{}) { - //fmt.Println("Watching tpr events") - - <-stopCh -} - func (c *Controller) createTPR() error { tpr := &v1beta1extensions.ThirdPartyResource{ ObjectMeta: v1.ObjectMeta{ @@ -84,26 +86,29 @@ func (c *Controller) createTPR() error { Description: constants.TPRDescription, } - _, err := c.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) + _, err := c.config.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) if err != nil { if !k8sutil.IsKubernetesResourceAlreadyExistError(err) { return err } else { - c.logger.Info("ThirdPartyResource already registered") + c.logger.Info("ThirdPartyResource is already registered") } + } else { + c.logger.Info("ThirdPartyResource has been registered") } - restClient := c.RestClient + restClient := c.config.RestClient - return k8sutil.WaitTPRReady(restClient, constants.TPRReadyWaitInterval, constants.TPRReadyWaitTimeout, c.Namespace) + return k8sutil.WaitTPRReady(restClient, constants.TPRReadyWaitInterval, constants.TPRReadyWaitTimeout, c.config.Namespace) } func (c *Controller) makeClusterConfig() cluster.Config { return cluster.Config{ - Namespace: c.Namespace, - KubeClient: c.KubeClient, - RestClient: c.RestClient, + Namespace: c.config.Namespace, + KubeClient: c.config.KubeClient, + RestClient: c.config.RestClient, + EtcdClient: c.config.EtcdClient, } } @@ -114,7 +119,7 @@ func (c *Controller) initController() { } c.postgresqlInformer = cache.NewSharedIndexInformer( - cache.NewListWatchFromClient(c.RestClient, constants.ResourceName, v1.NamespaceAll, fields.Everything()), + cache.NewListWatchFromClient(c.config.RestClient, constants.ResourceName, v1.NamespaceAll, fields.Everything()), &spec.Postgresql{}, constants.ResyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) @@ -129,14 +134,23 @@ func (c *Controller) initController() { func (c *Controller) clusterAdd(obj interface{}) { pg := obj.(*spec.Postgresql) + //TODO: why do we need to have this check if pg.Spec == nil { return } - cluster := cluster.New(c.makeClusterConfig(), pg) - cluster.Create() + clusterName := (*pg).Metadata.Name - c.logger.Infof("Add: %+v", cluster) + cl := cluster.New(c.makeClusterConfig(), pg) + err := cl.Create() + if err != nil { + c.logger.Errorf("Can't create cluster: %s", err) + return + } + c.stopChMap[clusterName] = make(chan struct{}) + c.clusters[clusterName] = cl + + c.logger.Infof("Postgresql cluster %s.%s has been created", clusterName, (*pg).Metadata.Namespace) } func (c *Controller) clusterUpdate(prev, cur interface{}) { @@ -146,8 +160,11 @@ func (c *Controller) clusterUpdate(prev, cur interface{}) { if pgPrev.Spec == nil || pgCur.Spec == nil { return } + if pgPrev.Metadata.ResourceVersion == pgCur.Metadata.ResourceVersion { + return + } - c.logger.Infof("Update: %+v -> %+v", *pgPrev.Spec, *pgCur.Spec) + c.logger.Infof("Update: %+v -> %+v", *pgPrev, *pgCur) } func (c *Controller) clusterDelete(obj interface{}) { @@ -155,9 +172,17 @@ func (c *Controller) clusterDelete(obj interface{}) { if pg.Spec == nil { return } + clusterName := (*pg).Metadata.Name cluster := cluster.New(c.makeClusterConfig(), pg) - cluster.Delete() + err := cluster.Delete() + if err != nil { + c.logger.Errorf("Can't delete cluster '%s.%s': %s", clusterName, (*pg).Metadata.Namespace, err) + return + } - c.logger.Infof("Delete: %+v", *pg.Spec) + close(c.stopChMap[clusterName]) + delete(c.clusters, clusterName) + + c.logger.Infof("Cluster delete: %s", (*pg).Metadata.Name) } diff --git a/pkg/controller/etcd.go b/pkg/controller/etcd.go new file mode 100644 index 000000000..78dbbb617 --- /dev/null +++ b/pkg/controller/etcd.go @@ -0,0 +1,25 @@ +package controller + +import ( + "fmt" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" + etcdclient "github.com/coreos/etcd/client" + "time" +) + +func (c *Controller) initEtcdClient() error { + etcdUrl := fmt.Sprintf("http://%s", constants.EtcdHost) + + cfg, err := etcdclient.New(etcdclient.Config{ + Endpoints: []string{etcdUrl}, + Transport: etcdclient.DefaultTransport, + HeaderTimeoutPerRequest: time.Second, + }) + if err != nil { + return err + } + + c.config.EtcdClient = etcdclient.NewKeysAPI(cfg) + + return nil +} diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index 765548d59..42e0b9bd4 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -50,9 +50,12 @@ type PostgresSpec struct { PostgresqlParam `json:"postgresql"` Volume `json:"volume,omitempty"` - NumberOfInstances int32 `json:"numberOfInstances"` - Users map[string]UserFlags `json:"users"` - MaintenanceWindows []string `json:"maintenanceWindows,omitempty"` + TeamId string `json:"teamId"` + AllowedSourceRanges []string `json:"allowedSourceRanges"` + NumberOfInstances int32 `json:"numberOfInstances"` + Users map[string]UserFlags `json:"users"` + MaintenanceWindows []string `json:"maintenanceWindows,omitempty"` + PamUsersSecret string `json:"pamUsersSecret",omitempty` EtcdHost string DockerImage string diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go index c577a5799..47ea4394e 100644 --- a/pkg/util/constants/constants.go +++ b/pkg/util/constants/constants.go @@ -3,17 +3,21 @@ package constants import "time" const ( - TPRName = "postgresql" - TPRVendor = "acid.zalan.do" - TPRDescription = "Managed PostgreSQL clusters" - TPRReadyWaitInterval = 3 * time.Second - TPRReadyWaitTimeout = 30 * time.Second - TPRApiVersion = "v1" + TPRName = "postgresql" + TPRVendor = "acid.zalan.do" + TPRDescription = "Managed PostgreSQL clusters" + TPRReadyWaitInterval = 3 * time.Second + TPRReadyWaitTimeout = 30 * time.Second + TPRApiVersion = "v1" + ResourceCheckInterval = 3 * time.Second + ResourceCheckTimeout = 10 * time.Minute ResourceName = TPRName + "s" ResyncPeriod = 5 * time.Minute - EtcdHost = "etcd-client.default.svc.cluster.local:2379" //TODO: move to the operator spec + EtcdHost = "etcd-client.default.svc.cluster.local:2379" //TODO: move to the operator spec + SpiloImage = "registry.opensource.zalan.do/acid/spilo-9.6:1.2-p11" + PamRoleName = "zalandos" PasswordLength = 64 ) diff --git a/pkg/util/util.go b/pkg/util/util.go index 22dac260a..4681b15b9 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -5,17 +5,17 @@ import ( "time" ) -var passwordChars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$^&*=") +var passwordChars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") func init() { rand.Seed(int64(time.Now().Unix())) } -func RandomPasswordBytes(n int) []byte { +func RandomPassword(n int) string { b := make([]byte, n) for i := range b { b[i] = passwordChars[rand.Intn(len(passwordChars))] } - return b + return string(b) }