From ae77fa15e8aae4b167678ad473546c59f8daddbb Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 6 Mar 2017 11:19:07 +0100 Subject: [PATCH] Pod Rolling update introduce Pod events channel; add parsing of the MaintenanceWindows section; skip deleting Etcd key on cluster delete; use external etcd host; watch for tpr/pods in the namespace of the operator pod only; --- cmd/main.go | 1 + docker/Dockerfile | 2 +- glide.lock | 6 +- manifests/platform-credentials.yaml | 1 + manifests/postgres-operator.yaml | 11 +- manifests/testpostgresql.yaml | 1 - pkg/cluster/cluster.go | 520 ++++++++++++--------------- pkg/cluster/pg.go | 50 ++- pkg/cluster/pod.go | 169 +++++++++ pkg/cluster/resources.go | 534 ++++++++++++---------------- pkg/cluster/util.go | 188 ++++++++++ pkg/controller/controller.go | 167 +++------ pkg/controller/etcd.go | 3 +- pkg/controller/pod.go | 134 +++++++ pkg/controller/postgresql.go | 189 ++++++++++ pkg/controller/util.go | 57 +++ pkg/spec/postgresql.go | 155 +++++--- pkg/spec/types.go | 40 +++ pkg/util/constants/constants.go | 26 +- pkg/util/k8sutil/k8sutil.go | 2 +- pkg/util/resources/factory.go | 265 ++++++++++++++ pkg/util/teams/teams.go | 4 +- pkg/util/util.go | 28 +- 23 files changed, 1763 insertions(+), 790 deletions(-) create mode 100644 pkg/cluster/pod.go create mode 100644 pkg/cluster/util.go create mode 100644 pkg/controller/pod.go create mode 100644 pkg/controller/postgresql.go create mode 100644 pkg/controller/util.go create mode 100644 pkg/spec/types.go create mode 100644 pkg/util/resources/factory.go diff --git a/cmd/main.go b/cmd/main.go index 67e57c9b0..4ebe7c038 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -57,6 +57,7 @@ func ControllerConfig() *controller.Config { func main() { log.SetOutput(os.Stdout) log.Printf("Spilo operator %s\n", version) + log.Printf("MY_POD_NAMESPACE=%s\n", podNamespace) sigs := make(chan os.Signal, 1) stop := make(chan struct{}) diff --git a/docker/Dockerfile b/docker/Dockerfile index 552d4f621..66abb6c30 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,7 +1,7 @@ FROM alpine MAINTAINER Team ACID @ Zalando -# We need root certificates for dealing with teams api over https +# We need root certificates to deal with teams api over https RUN apk --no-cache add ca-certificates COPY build/* / diff --git a/glide.lock b/glide.lock index 59b0fac2d..6ec597be6 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: 8ffa20f78f1084d76efd6765df697e41edbe154a139a22292563cfc0eb50cd51 -updated: 2017-02-16T18:54:58.087728447+01:00 +updated: 2017-02-27T16:30:41.210217992+01:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -9,7 +9,7 @@ imports: - name: github.com/blang/semver version: 31b736133b98f26d5e078ec9eb591666edfd091f - name: github.com/coreos/etcd - version: 8ba2897a21e4fc51b298ca553d251318425f93ae + version: 714e7ec8db7f8398880197be10771fe89c480ee5 subpackages: - client - pkg/pathutil @@ -92,7 +92,7 @@ imports: - name: github.com/PuerkitoBio/urlesc version: 5bd2802263f21d8788851d5305584c82a5c75d7e - name: github.com/Sirupsen/logrus - version: c078b1e43f58d563c74cebe63c85789e76ddb627 + version: 0208149b40d863d2c1a2f8fe5753096a9cf2cc8b - name: github.com/spf13/pflag version: 08b1a584251b5b62f458943640fc8ebd4d50aaa5 - name: github.com/ugorji/go diff --git a/manifests/platform-credentials.yaml b/manifests/platform-credentials.yaml index 44ecf7f24..0a320b838 100644 --- a/manifests/platform-credentials.yaml +++ b/manifests/platform-credentials.yaml @@ -2,6 +2,7 @@ apiVersion: "zalando.org/v1" kind: PlatformCredentialsSet metadata: name: postgresql-operator + namespace: acid spec: application: postgresql-operator tokens: diff --git a/manifests/postgres-operator.yaml b/manifests/postgres-operator.yaml index 397d88010..59e86f31a 100644 --- a/manifests/postgres-operator.yaml +++ b/manifests/postgres-operator.yaml @@ -13,4 +13,13 @@ spec: serviceAccountName: operator containers: - name: postgres-operator - image: postgres-operator:0.1 + image: pierone.example.com/acid/postgres-operator:0.1 + env: + - name: MY_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: MY_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name diff --git a/manifests/testpostgresql.yaml b/manifests/testpostgresql.yaml index 190e17dfe..319ecdbea 100644 --- a/manifests/testpostgresql.yaml +++ b/manifests/testpostgresql.yaml @@ -21,7 +21,6 @@ spec: - 127.0.0.1/32 #Expert section - pamUsersSecret: human-users postgresql: version: "9.6" parameters: diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index ef68403c3..beb6ae7da 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -3,32 +3,32 @@ package cluster // Postgres ThirdPartyResource object i.e. Spilo import ( - "context" "database/sql" + "encoding/json" "fmt" + "reflect" "regexp" - "strings" "sync" "github.com/Sirupsen/logrus" etcdclient "github.com/coreos/etcd/client" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/pkg/labels" + "k8s.io/client-go/pkg/apis/apps/v1beta1" + "k8s.io/client-go/pkg/types" "k8s.io/client-go/rest" "github.bus.zalan.do/acid/postgres-operator/pkg/spec" "github.bus.zalan.do/acid/postgres-operator/pkg/util" "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" - "github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/resources" "github.bus.zalan.do/acid/postgres-operator/pkg/util/teams" ) var ( - superuserName = "postgres" - replicationUsername = "replication" - - alphaNumericRegexp = regexp.MustCompile("^[a-zA-Z0-9]*$") + alphaNumericRegexp = regexp.MustCompile("^[a-zA-Z][a-zA-Z0-9]*$") ) //TODO: remove struct duplication @@ -40,351 +40,279 @@ type Config struct { TeamsAPIClient *teams.TeamsAPI } -type pgUser struct { - name string - password string - flags []string +type KubeResources struct { + Services map[types.UID]*v1.Service + Endpoints map[types.UID]*v1.Endpoints + Secrets map[types.UID]*v1.Secret + Statefulsets map[types.UID]*v1beta1.StatefulSet + //Pods are treated separately } type Cluster struct { - config Config - logger *logrus.Entry - etcdHost string - dockerImage string - cluster *spec.Postgresql - pgUsers map[string]pgUser - - pgDb *sql.DB - mu sync.Mutex + KubeResources + spec.Postgresql + config Config + logger *logrus.Entry + etcdHost string + dockerImage string + pgUsers map[string]spec.PgUser + podEvents chan spec.PodEvent + podSubscribers map[spec.PodName]chan spec.PodEvent + pgDb *sql.DB + mu sync.Mutex } -func New(cfg Config, spec *spec.Postgresql) *Cluster { - lg := logrus.WithField("pkg", "cluster").WithField("cluster-name", spec.Metadata.Name) +func New(cfg Config, pgSpec spec.Postgresql) *Cluster { + lg := logrus.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) + kubeResources := KubeResources{ + Services: make(map[types.UID]*v1.Service), + Endpoints: make(map[types.UID]*v1.Endpoints), + Secrets: make(map[types.UID]*v1.Secret), + Statefulsets: make(map[types.UID]*v1beta1.StatefulSet), + } cluster := &Cluster{ - config: cfg, - cluster: spec, - logger: lg, - etcdHost: constants.EtcdHost, - dockerImage: constants.SpiloImage, + config: cfg, + Postgresql: pgSpec, + logger: lg, + etcdHost: constants.EtcdHost, + dockerImage: constants.SpiloImage, + pgUsers: make(map[string]spec.PgUser), + podEvents: make(chan spec.PodEvent), + podSubscribers: make(map[spec.PodName]chan spec.PodEvent), + KubeResources: kubeResources, } - cluster.init() return cluster } -func (c *Cluster) getReadonlyToken() (string, error) { - // for some reason PlatformCredentialsSet creates secrets only in the default namespace - credentialsSecret, err := c.config.KubeClient.Secrets(v1.NamespaceDefault).Get("postgresql-operator") +func (c *Cluster) ClusterName() spec.ClusterName { + return spec.ClusterName{ + Name: c.Metadata.Name, + Namespace: c.Metadata.Namespace, + } +} + +func (c *Cluster) Run(stopCh <-chan struct{}) { + go c.podEventsDispatcher(stopCh) + + <-stopCh +} + +func (c *Cluster) NeedsRollingUpdate(otherSpec *spec.Postgresql) bool { + //TODO: add more checks + if c.Spec.Version != otherSpec.Spec.Version { + return true + } + + if !reflect.DeepEqual(c.Spec.Resources, otherSpec.Spec.Resources) { + return true + } + + return false +} + +func (c *Cluster) MustSetStatus(status spec.PostgresStatus) { + b, err := json.Marshal(status) + if err != nil { + c.logger.Fatalf("Can't marshal status: %s", err) + } + request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods + + _, err = c.config.RestClient.Patch(api.MergePatchType). + RequestURI(c.Metadata.GetSelfLink()). + Body(request). + DoRaw() + + if k8sutil.ResourceNotFound(err) { + c.logger.Warningf("Can't set status for the non-existing cluster") + return + } if err != nil { - return "", fmt.Errorf("Can't get credentials secret: %s", err) + c.logger.Fatalf("Can't set status for cluster '%s': %s", c.ClusterName(), err) } - data := credentialsSecret.Data - - if string(data["read-only-token-type"]) != "Bearer" { - return "", fmt.Errorf("Wrong token type: %s", data["read-only-token-type"]) - } - - return string(data["read-only-token-secret"]), nil - -} - -func (c *Cluster) getTeamMembers() ([]string, error) { - token, err := c.getReadonlyToken() - if err != nil { - return nil, fmt.Errorf("Can't get oauth token: %s", err) - } - - c.config.TeamsAPIClient.OauthToken = token - teamInfo, err := c.config.TeamsAPIClient.TeamInfo((*c.cluster.Spec).TeamId) - if err != nil { - return nil, fmt.Errorf("Can't get team info: %s", err) - } - - return teamInfo.Members, nil -} - -func (c *Cluster) labelsSet() labels.Set { - return labels.Set{ - "application": "spilo", - "spilo-cluster": (*c.cluster).Metadata.Name, - } -} - -func (c *Cluster) credentialSecretName(username string) string { - return fmt.Sprintf( - "%s.%s.credentials.%s.%s", - username, - (*c.cluster).Metadata.Name, - constants.TPRName, - constants.TPRVendor) -} - -func isValidUsername(username string) bool { - return alphaNumericRegexp.MatchString(username) -} - -func normalizeUserFlags(userFlags []string) (flags []string, err error) { - uniqueFlags := make(map[string]bool) - - for _, flag := range userFlags { - if !alphaNumericRegexp.MatchString(flag) { - err = fmt.Errorf("User flag '%s' is not alphanumeric", flag) - return - } else { - flag = strings.ToUpper(flag) - if _, ok := uniqueFlags[flag]; !ok { - uniqueFlags[flag] = true - } - } - } - - flags = []string{} - for k := range uniqueFlags { - flags = append(flags, k) - } - - return -} - -func (c *Cluster) init() { - users := (*c.cluster.Spec).Users - c.pgUsers = make(map[string]pgUser, len(users)+2) // + [superuser and replication] - - c.pgUsers[superuserName] = pgUser{ - name: superuserName, - password: util.RandomPassword(constants.PasswordLength), - } - - c.pgUsers[replicationUsername] = pgUser{ - name: replicationUsername, - password: util.RandomPassword(constants.PasswordLength), - } - - for username, userFlags := range users { - if !isValidUsername(username) { - c.logger.Warningf("Invalid username: '%s'", username) - continue - } - - flags, err := normalizeUserFlags(userFlags) - if err != nil { - c.logger.Warningf("Invalid flags for user '%s': %s", username, err) - } - - c.pgUsers[username] = pgUser{ - name: username, - password: util.RandomPassword(constants.PasswordLength), - flags: flags, - } - } - - teamMembers, err := c.getTeamMembers() - if err != nil { - c.logger.Errorf("Can't get list of team members: %s", err) - } else { - for _, username := range teamMembers { - c.pgUsers[username] = pgUser{name: username} - } - } -} - -func (c *Cluster) waitPodDelete() error { - ls := c.labelsSet() - - listOptions := v1.ListOptions{ - LabelSelector: ls.String(), - } - return retryutil.Retry( - constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval), - func() (bool, error) { - pods, err := c.config.KubeClient.Pods((*c.cluster).Metadata.Namespace).List(listOptions) - if err != nil { - return false, err - } - - return len(pods.Items) == 0, nil - }) -} - -func (c *Cluster) waitStatefulsetReady() error { - return retryutil.Retry(constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval), - func() (bool, error) { - listOptions := v1.ListOptions{ - LabelSelector: c.labelsSet().String(), - } - ss, err := c.config.KubeClient.StatefulSets((*c.cluster).Metadata.Namespace).List(listOptions) - if err != nil { - return false, err - } - - if len(ss.Items) != 1 { - return false, fmt.Errorf("StatefulSet is not found") - } - - return *ss.Items[0].Spec.Replicas == ss.Items[0].Status.Replicas, nil - }) -} - -func (c *Cluster) waitPodLabelsReady() error { - ls := c.labelsSet() - namespace := (*c.cluster).Metadata.Namespace - - listOptions := v1.ListOptions{ - LabelSelector: ls.String(), - } - masterListOption := v1.ListOptions{ - LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "master"}).String(), - } - replicaListOption := v1.ListOptions{ - LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "replica"}).String(), - } - pods, err := c.config.KubeClient.Pods(namespace).List(listOptions) - if err != nil { - return err - } - podsNumber := len(pods.Items) - - return retryutil.Retry( - constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval), - func() (bool, error) { - masterPods, err := c.config.KubeClient.Pods(namespace).List(masterListOption) - if err != nil { - return false, err - } - replicaPods, err := c.config.KubeClient.Pods(namespace).List(replicaListOption) - if err != nil { - return false, err - } - if len(masterPods.Items) > 1 { - return false, fmt.Errorf("Too many masters") - } - - return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil - }) } func (c *Cluster) Create() error { - c.createEndpoint() - c.createService() - c.applySecrets() - c.createStatefulSet() + //TODO: service will create endpoint implicitly + ep, err := c.createEndpoint() + if err != nil { + return fmt.Errorf("Can't create endpoint: %s", err) + } + c.logger.Infof("Endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) + + service, err := c.createService() + if err != nil { + return fmt.Errorf("Can't create service: %s", err) + } else { + c.logger.Infof("Service '%s' has been successfully created", util.NameFromMeta(service.ObjectMeta)) + } + + c.initSystemUsers() + err = c.initRobotUsers() + if err != nil { + return fmt.Errorf("Can't init robot users: %s", err) + } + + err = c.initHumanUsers() + if err != nil { + return fmt.Errorf("Can't init human users: %s", err) + } + + err = c.applySecrets() + if err != nil { + return fmt.Errorf("Can't create secrets: %s", err) + } else { + c.logger.Infof("Secrets have been successfully created") + } + + ss, err := c.createStatefulSet() + if err != nil { + return fmt.Errorf("Can't create StatefulSet: %s", err) + } else { + c.logger.Infof("StatefulSet '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) + } c.logger.Info("Waiting for cluster being ready") - err := c.waitClusterReady() + err = c.waitClusterReady() if err != nil { c.logger.Errorf("Failed to create cluster: %s", err) return err } - c.logger.Info("Cluster is ready") err = c.initDbConn() if err != nil { - return fmt.Errorf("Failed to init db connection: %s", err) + return fmt.Errorf("Can't init db connection: %s", err) } - c.createUsers() + err = c.createUsers() + if err != nil { + return fmt.Errorf("Can't create users: %s", err) + } else { + c.logger.Infof("Users have been successfully created") + } + + c.ListResources() return nil } -func (c *Cluster) waitClusterReady() error { - // TODO: wait for the first Pod only - err := c.waitStatefulsetReady() +func (c *Cluster) Update(newSpec *spec.Postgresql, rollingUpdate bool) error { + nSpec := newSpec.Spec + clusterName := c.ClusterName() + resourceList := resources.ResourceList(nSpec.Resources) + template := resources.PodTemplate(clusterName, resourceList, c.dockerImage, nSpec.Version, c.etcdHost) + statefulSet := resources.StatefulSet(clusterName, template, nSpec.NumberOfInstances) + + //TODO: mind the case of updating allowedSourceRanges + err := c.updateStatefulSet(statefulSet) if err != nil { - return fmt.Errorf("Statuful set error: %s", err) + return fmt.Errorf("Can't upate cluster: %s", err) } - // TODO: wait only for master - err = c.waitPodLabelsReady() - if err != nil { - return fmt.Errorf("Pod labels error: %s", err) + if rollingUpdate { + err = c.recreatePods() + // TODO: wait for actual streaming to the replica + if err != nil { + return fmt.Errorf("Can't recreate pods: %s", err) + } } return nil } func (c *Cluster) Delete() error { - clusterName := (*c.cluster).Metadata.Name - namespace := (*c.cluster).Metadata.Namespace - orphanDependents := false - deleteOptions := &v1.DeleteOptions{ - OrphanDependents: &orphanDependents, - } - - listOptions := v1.ListOptions{ - LabelSelector: c.labelsSet().String(), - } - - kubeClient := c.config.KubeClient - - podList, err := kubeClient.Pods(namespace).List(listOptions) - if err != nil { - return fmt.Errorf("Can't get list of pods: %s", err) - } - - err = kubeClient.StatefulSets(namespace).Delete(clusterName, deleteOptions) - if err != nil { - return fmt.Errorf("Can't delete statefulset: %s", err) - } - c.logger.Infof("Statefulset '%s' has been deleted", util.FullObjectName(namespace, clusterName)) - - for _, pod := range podList.Items { - err = kubeClient.Pods(namespace).Delete(pod.Name, deleteOptions) + for _, obj := range c.Statefulsets { + err := c.deleteStatefulSet(obj) if err != nil { - return fmt.Errorf("Error while deleting pod '%s': %s", util.FullObjectName(pod.Namespace, pod.Name), err) + c.logger.Errorf("Can't delete StatefulSet: %s", err) + } else { + c.logger.Infof("StatefulSet '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) } - - c.logger.Infof("Pod '%s' has been deleted", util.FullObjectName(pod.Namespace, pod.Name)) } - serviceList, err := kubeClient.Services(namespace).List(listOptions) - if err != nil { - return fmt.Errorf("Can't get list of the services: %s", err) - } - - for _, service := range serviceList.Items { - err = kubeClient.Services(namespace).Delete(service.Name, deleteOptions) + for _, obj := range c.Secrets { + err := c.deleteSecret(obj) if err != nil { - return fmt.Errorf("Can't delete service '%s': %s", util.FullObjectName(service.Namespace, service.Name), err) + c.logger.Errorf("Can't delete secret: %s", err) + } else { + c.logger.Infof("Secret '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) } - - c.logger.Infof("Service '%s' has been deleted", util.FullObjectName(service.Namespace, service.Name)) } - secretsList, err := kubeClient.Secrets(namespace).List(listOptions) - if err != nil { - return err - } - for _, secret := range secretsList.Items { - err = kubeClient.Secrets(namespace).Delete(secret.Name, deleteOptions) + for _, obj := range c.Endpoints { + err := c.deleteEndpoint(obj) if err != nil { - return fmt.Errorf("Can't delete secret '%s': %s", util.FullObjectName(secret.Namespace, secret.Name), err) + c.logger.Errorf("Can't delete endpoint: %s", err) + } else { + c.logger.Infof("Endpoint '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) } - - c.logger.Infof("Secret '%s' has been deleted", util.FullObjectName(secret.Namespace, secret.Name)) } - c.waitPodDelete() - - etcdKey := fmt.Sprintf("/service/%s", clusterName) - - resp, err := c.config.EtcdClient.Delete(context.Background(), - etcdKey, - &etcdclient.DeleteOptions{Recursive: true}) + for _, obj := range c.Services { + err := c.deleteService(obj) + if err != nil { + c.logger.Errorf("Can't delete service: %s", err) + } else { + c.logger.Infof("Service '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) + } + } + err := c.deletePods() if err != nil { - return fmt.Errorf("Can't delete etcd key: %s", err) + return fmt.Errorf("Can't delete pods: %s", err) + } + + return nil +} + +func (c *Cluster) ReceivePodEvent(event spec.PodEvent) { + c.podEvents <- event +} + +func (c *Cluster) initSystemUsers() { + c.pgUsers[constants.SuperuserName] = spec.PgUser{ + Name: constants.SuperuserName, + Password: util.RandomPassword(constants.PasswordLength), + } + + c.pgUsers[constants.ReplicationUsername] = spec.PgUser{ + Name: constants.ReplicationUsername, + Password: util.RandomPassword(constants.PasswordLength), + } +} + +func (c *Cluster) initRobotUsers() error { + for username, userFlags := range c.Spec.Users { + if !isValidUsername(username) { + return fmt.Errorf("Invalid username: '%s'", username) + } + + flags, err := normalizeUserFlags(userFlags) + if err != nil { + return fmt.Errorf("Invalid flags for user '%s': %s", username, err) + } + + c.pgUsers[username] = spec.PgUser{ + Name: username, + Password: util.RandomPassword(constants.PasswordLength), + Flags: flags, + } + } + + return nil +} + +func (c *Cluster) initHumanUsers() error { + teamMembers, err := c.getTeamMembers() + if err != nil { + return fmt.Errorf("Can't get list of team members: %s", err) + } else { + for _, username := range teamMembers { + c.pgUsers[username] = spec.PgUser{Name: username} + } } - if resp == nil { - c.logger.Warningf("No response from etcd cluster") - } - - c.logger.Infof("Etcd key '%s' has been deleted", etcdKey) - - //TODO: Ensure objects are deleted - return nil } diff --git a/pkg/cluster/pg.go b/pkg/cluster/pg.go index 408aa4953..6791f03b9 100644 --- a/pkg/cluster/pg.go +++ b/pkg/cluster/pg.go @@ -3,18 +3,23 @@ package cluster import ( "database/sql" "fmt" + "strings" _ "github.com/lib/pq" - "strings" + + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" ) +var createUserSQL = `SET LOCAL synchronous_commit = 'local'; CREATE ROLE "%s" %s PASSWORD %s;` + func (c *Cluster) pgConnectionString() string { - hostname := fmt.Sprintf("%s.%s.svc.cluster.local", (*c.cluster).Metadata.Name, (*c.cluster).Metadata.Namespace) - password := c.pgUsers[superuserName].password + hostname := fmt.Sprintf("%s.%s.svc.cluster.local", c.Metadata.Name, c.Metadata.Namespace) + password := c.pgUsers[constants.SuperuserName].Password return fmt.Sprintf("host='%s' dbname=postgres sslmode=require user='%s' password='%s'", hostname, - superuserName, + constants.SuperuserName, strings.Replace(password, "$", "\\$", -1)) } @@ -39,3 +44,40 @@ func (c *Cluster) initDbConn() error { return nil } + +func (c *Cluster) createPgUser(user spec.PgUser) (isHuman bool, err error) { + var flags []string = user.Flags + + if user.Password == "" { + isHuman = true + flags = append(flags, fmt.Sprintf("IN ROLE \"%s\"", constants.PamRoleName)) + } else { + isHuman = false + } + + addLoginFlag := true + for _, v := range flags { + if v == "NOLOGIN" { + addLoginFlag = false + break + } + } + if addLoginFlag { + flags = append(flags, "LOGIN") + } + + userFlags := strings.Join(flags, " ") + userPassword := fmt.Sprintf("'%s'", user.Password) + if user.Password == "" { + userPassword = "NULL" + } + query := fmt.Sprintf(createUserSQL, user.Name, userFlags, userPassword) + + _, err = c.pgDb.Query(query) // TODO: Try several times + if err != nil { + err = fmt.Errorf("DB error: %s", err) + return + } + + return +} diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go new file mode 100644 index 000000000..f39e3bc13 --- /dev/null +++ b/pkg/cluster/pod.go @@ -0,0 +1,169 @@ +package cluster + +import ( + "fmt" + + "k8s.io/client-go/pkg/api/v1" + + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util" +) + +func (c *Cluster) clusterPods() ([]v1.Pod, error) { + ns := c.Metadata.Namespace + listOptions := v1.ListOptions{ + LabelSelector: c.labelsSet().String(), + } + + pods, err := c.config.KubeClient.Pods(ns).List(listOptions) + if err != nil { + return nil, fmt.Errorf("Can't get list of pods: %s", err) + } + + return pods.Items, nil +} + +func (c *Cluster) deletePods() error { + pods, err := c.clusterPods() + if err != nil { + return err + } + + for _, obj := range pods { + err := c.deletePod(&obj) + if err != nil { + c.logger.Errorf("Can't delete pod: %s", err) + } else { + c.logger.Infof("Pod '%s' has been deleted", util.NameFromMeta(obj.ObjectMeta)) + } + } + + return nil +} + +func (c *Cluster) deletePod(pod *v1.Pod) error { + podName := spec.PodName{ + Namespace: pod.Namespace, + Name: pod.Name, + } + + ch := make(chan spec.PodEvent) + if _, ok := c.podSubscribers[podName]; ok { + panic("Pod '" + podName.String() + "' is already subscribed") + } + c.podSubscribers[podName] = ch + defer func() { + close(ch) + delete(c.podSubscribers, podName) + }() + + err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions) + if err != nil { + return err + } + + err = c.waitForPodDeletion(ch) + if err != nil { + return err + } + + return nil +} + +func (c *Cluster) recreatePod(pod v1.Pod, spiloRole string) error { + podName := spec.PodName{ + Namespace: pod.Namespace, + Name: pod.Name, + } + + orphanDependents := false + deleteOptions := &v1.DeleteOptions{ + OrphanDependents: &orphanDependents, + } + + ch := make(chan spec.PodEvent) + if _, ok := c.podSubscribers[podName]; ok { + panic("Pod '" + podName.String() + "' is already subscribed") + } + c.podSubscribers[podName] = ch + defer func() { + close(ch) + delete(c.podSubscribers, podName) + }() + + err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions) + if err != nil { + return fmt.Errorf("Can't delete pod: %s", err) + } + err = c.waitForPodDeletion(ch) + if err != nil { + return err + } + err = c.waitForPodLabel(ch, spiloRole) + if err != nil { + return err + } + c.logger.Infof("Pod '%s' is ready", podName) + + return nil +} + +func (c *Cluster) podEventsDispatcher(stopCh <-chan struct{}) { + c.logger.Infof("Watching '%s' cluster", c.ClusterName()) + for { + select { + case event := <-c.podEvents: + if subscriber, ok := c.podSubscribers[event.PodName]; ok { + c.logger.Debugf("New event for '%s' pod", event.PodName) + go func() { subscriber <- event }() //TODO: is it a right way to do nonblocking send to the channel? + } else { + c.logger.Debugf("Skipping event for an unwatched pod '%s'", event.PodName) + } + case <-stopCh: + return + } + } +} + +func (c *Cluster) recreatePods() error { + ls := c.labelsSet() + namespace := c.Metadata.Namespace + + listOptions := v1.ListOptions{ + LabelSelector: ls.String(), + } + pods, err := c.config.KubeClient.Pods(namespace).List(listOptions) + if err != nil { + return fmt.Errorf("Can't get list of pods: %s", err) + } else { + c.logger.Infof("There are %d pods in the cluster to recreate", len(pods.Items)) + } + + var masterPod v1.Pod + for _, pod := range pods.Items { + role, ok := pod.Labels["spilo-role"] + if !ok { + continue + } + + if role == "master" { + masterPod = pod + continue + } + + err = c.recreatePod(pod, "replica") + if err != nil { + return fmt.Errorf("Can't recreate replica pod '%s': %s", util.NameFromMeta(pod.ObjectMeta), err) + } + } + + //TODO: do manual failover + //TODO: specify master, leave new master empty + c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) + err = c.recreatePod(masterPod, "replica") + if err != nil { + return fmt.Errorf("Can't recreate master pod '%s': %s", util.NameFromMeta(masterPod.ObjectMeta), err) + } + + return nil +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 09905c40c..8e24a1fe8 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -2,330 +2,258 @@ package cluster import ( "fmt" - "strings" - "k8s.io/client-go/pkg/api/resource" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" - "k8s.io/client-go/pkg/util/intstr" "github.bus.zalan.do/acid/postgres-operator/pkg/util" "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/resources" ) -var createUserSQL = `DO $$ -BEGIN - SET LOCAL synchronous_commit = 'local'; - CREATE ROLE "%s" %s PASSWORD %s; -END; -$$` - -func (c *Cluster) createStatefulSet() { - meta := (*c.cluster).Metadata - - envVars := []v1.EnvVar{ - { - Name: "SCOPE", - Value: meta.Name, - }, - { - Name: "PGROOT", - Value: "/home/postgres/pgdata/pgroot", - }, - { - Name: "ETCD_HOST", - Value: c.etcdHost, - }, - { - Name: "POD_IP", - ValueFrom: &v1.EnvVarSource{ - FieldRef: &v1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "status.podIP", - }, - }, - }, - { - Name: "POD_NAMESPACE", - ValueFrom: &v1.EnvVarSource{ - FieldRef: &v1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "metadata.namespace", - }, - }, - }, - { - Name: "PGPASSWORD_SUPERUSER", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: &v1.SecretKeySelector{ - LocalObjectReference: v1.LocalObjectReference{ - Name: c.credentialSecretName(superuserName), - }, - Key: "password", - }, - }, - }, - { - Name: "PGPASSWORD_STANDBY", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: &v1.SecretKeySelector{ - LocalObjectReference: v1.LocalObjectReference{ - Name: c.credentialSecretName(replicationUsername), - }, - Key: "password", - }, - }, - }, - { - Name: "PAM_OAUTH2", //TODO: get from the operator tpr spec - Value: constants.PamConfiguration, //space before uid is obligatory - }, - { - Name: "SPILO_CONFIGURATION", //TODO: get from the operator tpr spec - Value: fmt.Sprintf(` -postgresql: - bin_dir: /usr/lib/postgresql/%s/bin -bootstrap: - initdb: - - auth-host: md5 - - auth-local: trust - users: - %s: - password: NULL - options: - - createdb - - nologin - pg_hba: - - hostnossl all all all reject - - hostssl all +%s all pam - - hostssl all all all md5`, (*c.cluster.Spec).Version, constants.PamRoleName, constants.PamRoleName), - }, - } - - resourceList := v1.ResourceList{} - - if cpu := (*c.cluster).Spec.Resources.Cpu; cpu != "" { - resourceList[v1.ResourceCPU] = resource.MustParse(cpu) - } - - if memory := (*c.cluster).Spec.Resources.Memory; memory != "" { - resourceList[v1.ResourceMemory] = resource.MustParse(memory) - } - - container := v1.Container{ - Name: meta.Name, - Image: c.dockerImage, - ImagePullPolicy: v1.PullAlways, - Resources: v1.ResourceRequirements{ - Requests: resourceList, - }, - Ports: []v1.ContainerPort{ - { - ContainerPort: 8008, - Protocol: v1.ProtocolTCP, - }, - { - ContainerPort: 5432, - Protocol: v1.ProtocolTCP, - }, - }, - VolumeMounts: []v1.VolumeMount{ - { - Name: "pgdata", - MountPath: "/home/postgres/pgdata", //TODO: fetch from manifesto - }, - }, - Env: envVars, - } - - terminateGracePeriodSeconds := int64(30) - - podSpec := v1.PodSpec{ - TerminationGracePeriodSeconds: &terminateGracePeriodSeconds, - Volumes: []v1.Volume{ - { - Name: "pgdata", - VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}, - }, - }, - Containers: []v1.Container{container}, - } - - template := v1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ - Labels: c.labelsSet(), - Namespace: meta.Namespace, - Annotations: map[string]string{"pod.alpha.kubernetes.io/initialized": "true"}, - }, - Spec: podSpec, - } - - statefulSet := &v1beta1.StatefulSet{ - ObjectMeta: v1.ObjectMeta{ - Name: meta.Name, - Namespace: meta.Namespace, - Labels: c.labelsSet(), - }, - Spec: v1beta1.StatefulSetSpec{ - Replicas: &c.cluster.Spec.NumberOfInstances, - ServiceName: meta.Name, - Template: template, - }, - } - - _, err := c.config.KubeClient.StatefulSets(meta.Namespace).Create(statefulSet) - if err != nil { - c.logger.Errorf("Can't create statefulset: %s", err) - } else { - c.logger.Infof("Statefulset has been created: '%s'", util.FullObjectNameFromMeta(statefulSet.ObjectMeta)) - } +var orphanDependents = false +var deleteOptions = &v1.DeleteOptions{ + OrphanDependents: &orphanDependents, } -func (c *Cluster) applySecrets() { - var err error - namespace := (*c.cluster).Metadata.Namespace - for username, pgUser := range c.pgUsers { - //Skip users with no password i.e. human users (they'll be authenticated using pam) - if pgUser.password == "" { +func (c *Cluster) LoadResources() error { + ns := c.Metadata.Namespace + listOptions := v1.ListOptions{ + LabelSelector: c.labelsSet().String(), + } + + services, err := c.config.KubeClient.Services(ns).List(listOptions) + if err != nil { + return fmt.Errorf("Can't get list of services: %s", err) + } + for i, service := range services.Items { + if _, ok := c.Services[service.UID]; ok { continue } - secret := v1.Secret{ - ObjectMeta: v1.ObjectMeta{ - Name: c.credentialSecretName(username), - Namespace: namespace, - Labels: c.labelsSet(), - }, - Type: v1.SecretTypeOpaque, - Data: map[string][]byte{ - "username": []byte(pgUser.name), - "password": []byte(pgUser.password), - }, - } - _, err = c.config.KubeClient.Secrets(namespace).Create(&secret) - if k8sutil.IsKubernetesResourceAlreadyExistError(err) { - c.logger.Infof("Skipping update of '%s'", secret.Name) - - curSecrets, err := c.config.KubeClient.Secrets(namespace).Get(c.credentialSecretName(username)) - if err != nil { - c.logger.Errorf("Can't get current secret: %s", err) - } - user := pgUser - user.password = string(curSecrets.Data["password"]) - c.pgUsers[username] = user - c.logger.Infof("Password fetched for user '%s' from the secrets", username) - - continue - } else { - if err != nil { - c.logger.Errorf("Error while creating secret: %s", err) - } else { - c.logger.Infof("Secret created: '%s'", util.FullObjectNameFromMeta(secret.ObjectMeta)) - } - } - } -} - -func (c *Cluster) createService() { - meta := (*c.cluster).Metadata - - _, err := c.config.KubeClient.Services(meta.Namespace).Get(meta.Name) - if !k8sutil.ResourceNotFound(err) { - c.logger.Infof("Service '%s' already exists", meta.Name) - return + c.Services[service.UID] = &services.Items[i] } - service := v1.Service{ - ObjectMeta: v1.ObjectMeta{ - Name: meta.Name, - Namespace: meta.Namespace, - Labels: c.labelsSet(), - }, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeLoadBalancer, - Ports: []v1.ServicePort{{Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, - LoadBalancerSourceRanges: (*c.cluster).Spec.AllowedSourceRanges, - }, - } - - _, err = c.config.KubeClient.Services(meta.Namespace).Create(&service) + endpoints, err := c.config.KubeClient.Endpoints(ns).List(listOptions) if err != nil { - c.logger.Errorf("Error while creating service: %+v", err) - } else { - c.logger.Infof("Service created: '%s'", util.FullObjectNameFromMeta(service.ObjectMeta)) + return fmt.Errorf("Can't get list of endpoints: %s", err) } -} - -func (c *Cluster) createEndpoint() { - meta := (*c.cluster).Metadata - - _, err := c.config.KubeClient.Endpoints(meta.Namespace).Get(meta.Name) - if !k8sutil.ResourceNotFound(err) { - c.logger.Infof("Endpoint '%s' already exists", meta.Name) - return - } - - endpoint := v1.Endpoints{ - ObjectMeta: v1.ObjectMeta{ - Name: meta.Name, - Namespace: meta.Namespace, - Labels: c.labelsSet(), - }, - } - - _, err = c.config.KubeClient.Endpoints(meta.Namespace).Create(&endpoint) - if err != nil { - c.logger.Errorf("Error while creating endpoint: %+v", err) - } else { - c.logger.Infof("Endpoint created: %s", endpoint.Name) - } -} - -func (c *Cluster) createUser(user pgUser) { - var userType string - var flags []string = user.flags - - if user.password == "" { - userType = "human" - flags = append(flags, fmt.Sprintf("IN ROLE \"%s\"", constants.PamRoleName)) - } else { - userType = "app" - } - - addLoginFlag := true - for _, v := range flags { - if v == "NOLOGIN" { - addLoginFlag = false - break - } - } - if addLoginFlag { - flags = append(flags, "LOGIN") - } - - userFlags := strings.Join(flags, " ") - userPassword := fmt.Sprintf("'%s'", user.password) - if user.password == "" { - userPassword = "NULL" - } - query := fmt.Sprintf(createUserSQL, user.name, userFlags, userPassword) - - _, err := c.pgDb.Query(query) - if err != nil { - c.logger.Errorf("Can't create %s user '%s': %s", user.name, err) - } else { - c.logger.Infof("Created %s user '%s' with %s flags", userType, user.name, flags) - } -} - -func (c *Cluster) createUsers() error { - for username, user := range c.pgUsers { - if username == superuserName || username == replicationUsername { + for i, endpoint := range endpoints.Items { + if _, ok := c.Endpoints[endpoint.UID]; ok { continue } + c.Endpoints[endpoint.UID] = &endpoints.Items[i] + c.logger.Debugf("Endpoint loaded, uid: %s", endpoint.UID) + } - c.createUser(user) + secrets, err := c.config.KubeClient.Secrets(ns).List(listOptions) + if err != nil { + return fmt.Errorf("Can't get list of secrets: %s", err) + } + for i, secret := range secrets.Items { + if _, ok := c.Secrets[secret.UID]; ok { + continue + } + c.Secrets[secret.UID] = &secrets.Items[i] + c.logger.Debugf("Secret loaded, uid: %s", secret.UID) + } + + statefulSets, err := c.config.KubeClient.StatefulSets(ns).List(listOptions) + if err != nil { + return fmt.Errorf("Can't get list of stateful sets: %s", err) + } + for i, statefulSet := range statefulSets.Items { + if _, ok := c.Statefulsets[statefulSet.UID]; ok { + continue + } + c.Statefulsets[statefulSet.UID] = &statefulSets.Items[i] + c.logger.Debugf("StatefulSet loaded, uid: %s", statefulSet.UID) } return nil } + +func (c *Cluster) ListResources() error { + for _, obj := range c.Statefulsets { + c.logger.Infof("StatefulSet: %s", util.NameFromMeta(obj.ObjectMeta)) + } + + for _, obj := range c.Secrets { + c.logger.Infof("Secret: %s", util.NameFromMeta(obj.ObjectMeta)) + } + + for _, obj := range c.Endpoints { + c.logger.Infof("Endpoint: %s", util.NameFromMeta(obj.ObjectMeta)) + } + + for _, obj := range c.Services { + c.logger.Infof("Service: %s", util.NameFromMeta(obj.ObjectMeta)) + } + + pods, err := c.clusterPods() + if err != nil { + return fmt.Errorf("Can't get pods: %s", err) + } + + for _, obj := range pods { + c.logger.Infof("Pod: %s", util.NameFromMeta(obj.ObjectMeta)) + } + + return nil +} + +func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { + cSpec := c.Spec + clusterName := c.ClusterName() + resourceList := resources.ResourceList(cSpec.Resources) + template := resources.PodTemplate(clusterName, resourceList, c.dockerImage, cSpec.Version, c.etcdHost) + statefulSet := resources.StatefulSet(clusterName, template, cSpec.NumberOfInstances) + + statefulSet, err := c.config.KubeClient.StatefulSets(statefulSet.Namespace).Create(statefulSet) + if k8sutil.ResourceAlreadyExists(err) { + return nil, fmt.Errorf("StatefulSet '%s' already exists", util.NameFromMeta(statefulSet.ObjectMeta)) + } + if err != nil { + return nil, err + } + c.Statefulsets[statefulSet.UID] = statefulSet + c.logger.Debugf("Created new StatefulSet, uid: %s", statefulSet.UID) + + return statefulSet, nil +} + +func (c *Cluster) updateStatefulSet(statefulSet *v1beta1.StatefulSet) error { + statefulSet, err := c.config.KubeClient.StatefulSets(statefulSet.Namespace).Update(statefulSet) + if err != nil { + c.Statefulsets[statefulSet.UID] = statefulSet + } + + return err +} + +func (c *Cluster) deleteStatefulSet(statefulSet *v1beta1.StatefulSet) error { + err := c.config.KubeClient. + StatefulSets(statefulSet.Namespace). + Delete(statefulSet.Name, deleteOptions) + + if err != nil { + return err + } + delete(c.Statefulsets, statefulSet.UID) + + return nil +} + +func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { + endpoint := resources.Endpoint(c.ClusterName()) + + endpoint, err := c.config.KubeClient.Endpoints(endpoint.Namespace).Create(endpoint) + if k8sutil.ResourceAlreadyExists(err) { + return nil, fmt.Errorf("Endpoint '%s' already exists", util.NameFromMeta(endpoint.ObjectMeta)) + } + if err != nil { + return nil, err + } + c.Endpoints[endpoint.UID] = endpoint + c.logger.Debugf("Created new endpoint, uid: %s", endpoint.UID) + + return endpoint, nil +} + +func (c *Cluster) deleteEndpoint(endpoint *v1.Endpoints) error { + err := c.config.KubeClient.Endpoints(endpoint.Namespace).Delete(endpoint.Name, deleteOptions) + if err != nil { + return err + } + delete(c.Endpoints, endpoint.UID) + + return nil +} + +func (c *Cluster) createService() (*v1.Service, error) { + service := resources.Service(c.ClusterName(), c.Spec.AllowedSourceRanges) + + service, err := c.config.KubeClient.Services(service.Namespace).Create(service) + if k8sutil.ResourceAlreadyExists(err) { + return nil, fmt.Errorf("Service '%s' already exists", util.NameFromMeta(service.ObjectMeta)) + } + if err != nil { + return nil, err + } + c.Services[service.UID] = service + c.logger.Debugf("Created new service, uid: %s", service.UID) + + return service, nil +} + +func (c *Cluster) deleteService(service *v1.Service) error { + err := c.config.KubeClient.Services(service.Namespace).Delete(service.Name, deleteOptions) + if err != nil { + return err + } + delete(c.Services, service.UID) + + return nil +} + +func (c *Cluster) createUsers() error { + for username, user := range c.pgUsers { + if username == constants.SuperuserName || username == constants.ReplicationUsername { + continue + } + + isHuman, err := c.createPgUser(user) + var userType string + if isHuman { + userType = "human" + } else { + userType = "robot" + } + if err != nil { + return fmt.Errorf("Can't create %s user '%s': %s", userType, username, err) + } + } + + return nil +} + +func (c *Cluster) applySecrets() error { + secrets, err := resources.UserSecrets(c.ClusterName(), c.pgUsers) + + if err != nil { + return fmt.Errorf("Can't get user secrets") + } + + for username, secret := range secrets { + secret, err := c.config.KubeClient.Secrets(secret.Namespace).Create(secret) + if k8sutil.ResourceAlreadyExists(err) { + curSecrets, err := c.config.KubeClient.Secrets(secret.Namespace).Get(secret.Name) + if err != nil { + return fmt.Errorf("Can't get current secret: %s", err) + } + pwdUser := c.pgUsers[username] + pwdUser.Password = string(curSecrets.Data["password"]) + c.pgUsers[username] = pwdUser + + continue + } else { + if err != nil { + return fmt.Errorf("Can't create secret for user '%s': %s", username, err) + } + c.Secrets[secret.UID] = secret + c.logger.Debugf("Created new secret, uid: %s", secret.UID) + } + } + + return nil +} + +func (c *Cluster) deleteSecret(secret *v1.Secret) error { + err := c.config.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, deleteOptions) + if err != nil { + return err + } + delete(c.Secrets, secret.UID) + + return err +} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go new file mode 100644 index 000000000..4ca66c684 --- /dev/null +++ b/pkg/cluster/util.go @@ -0,0 +1,188 @@ +package cluster + +import ( + "context" + "fmt" + "strings" + "time" + + etcdclient "github.com/coreos/etcd/client" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/labels" + + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil" +) + +func isValidUsername(username string) bool { + return alphaNumericRegexp.MatchString(username) +} + +func normalizeUserFlags(userFlags []string) (flags []string, err error) { + uniqueFlags := make(map[string]bool) + + for _, flag := range userFlags { + if !alphaNumericRegexp.MatchString(flag) { + err = fmt.Errorf("User flag '%s' is not alphanumeric", flag) + return + } else { + flag = strings.ToUpper(flag) + if _, ok := uniqueFlags[flag]; !ok { + uniqueFlags[flag] = true + } + } + } + + flags = []string{} + for k := range uniqueFlags { + flags = append(flags, k) + } + + return +} + +func (c *Cluster) getTeamMembers() ([]string, error) { + teamInfo, err := c.config.TeamsAPIClient.TeamInfo(c.Spec.TeamId) + if err != nil { + return nil, fmt.Errorf("Can't get team info: %s", err) + } + + return teamInfo.Members, nil +} + +func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, spiloRole string) error { + for { + select { + case podEvent := <-podEvents: + podLabels := podEvent.CurPod.Labels + c.logger.Debugf("Pod has following labels: %+v", podLabels) + val, ok := podLabels["spilo-role"] + if ok && val == spiloRole { + return nil + } + case <-time.After(constants.PodLabelWaitTimeout): + return fmt.Errorf("Pod label wait timeout") + } + } +} + +func (c *Cluster) waitForPodDeletion(podEvents chan spec.PodEvent) error { + for { + select { + case podEvent := <-podEvents: + if podEvent.EventType == spec.PodEventDelete { + return nil + } + case <-time.After(constants.PodDeletionWaitTimeout): + return fmt.Errorf("Pod deletion wait timeout") + } + } +} + +func (c *Cluster) waitStatefulsetReady() error { + return retryutil.Retry(constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval), + func() (bool, error) { + listOptions := v1.ListOptions{ + LabelSelector: c.labelsSet().String(), + } + ss, err := c.config.KubeClient.StatefulSets(c.Metadata.Namespace).List(listOptions) + if err != nil { + return false, err + } + + if len(ss.Items) != 1 { + return false, fmt.Errorf("StatefulSet is not found") + } + + return *ss.Items[0].Spec.Replicas == ss.Items[0].Status.Replicas, nil + }) +} + +func (c *Cluster) waitPodLabelsReady() error { + ls := c.labelsSet() + namespace := c.Metadata.Namespace + + listOptions := v1.ListOptions{ + LabelSelector: ls.String(), + } + masterListOption := v1.ListOptions{ + LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "master"}).String(), + } + replicaListOption := v1.ListOptions{ + LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "replica"}).String(), + } + pods, err := c.config.KubeClient.Pods(namespace).List(listOptions) + if err != nil { + return err + } + podsNumber := len(pods.Items) + + return retryutil.Retry( + constants.ResourceCheckInterval, int(constants.ResourceCheckTimeout/constants.ResourceCheckInterval), + func() (bool, error) { + masterPods, err := c.config.KubeClient.Pods(namespace).List(masterListOption) + if err != nil { + return false, err + } + replicaPods, err := c.config.KubeClient.Pods(namespace).List(replicaListOption) + if err != nil { + return false, err + } + if len(masterPods.Items) > 1 { + return false, fmt.Errorf("Too many masters") + } + + return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil + }) +} + +func (c *Cluster) waitClusterReady() error { + // TODO: wait for the first Pod only + err := c.waitStatefulsetReady() + if err != nil { + return fmt.Errorf("Statuful set error: %s", err) + } + + // TODO: wait only for master + err = c.waitPodLabelsReady() + if err != nil { + return fmt.Errorf("Pod labels error: %s", err) + } + + return nil +} + +func (c *Cluster) labelsSet() labels.Set { + return labels.Set{ + "application": "spilo", + "spilo-cluster": c.Metadata.Name, + } +} + +func (c *Cluster) credentialSecretName(username string) string { + return fmt.Sprintf(constants.UserSecretTemplate, + username, + c.Metadata.Name, + constants.TPRName, + constants.TPRVendor) +} + +func (c *Cluster) deleteEtcdKey() error { + etcdKey := fmt.Sprintf("/service/%s", c.Metadata.Name) + + //TODO: retry multiple times + resp, err := c.config.EtcdClient.Delete(context.Background(), + etcdKey, + &etcdclient.DeleteOptions{Recursive: true}) + + if err != nil { + return fmt.Errorf("Can't delete etcd key: %s", err) + } + + if resp == nil { + return fmt.Errorf("No response from etcd cluster") + } + + return nil +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 578d94362..eb7942929 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -1,23 +1,18 @@ package controller import ( - "fmt" "sync" "github.com/Sirupsen/logrus" etcdclient "github.com/coreos/etcd/client" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" - v1beta1extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" - "k8s.io/client-go/pkg/fields" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "github.bus.zalan.do/acid/postgres-operator/pkg/cluster" "github.bus.zalan.do/acid/postgres-operator/pkg/spec" - "github.bus.zalan.do/acid/postgres-operator/pkg/util" "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" - "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" "github.bus.zalan.do/acid/postgres-operator/pkg/util/teams" ) @@ -32,25 +27,23 @@ type Config struct { type Controller struct { config Config logger *logrus.Entry - events chan *Event - clusters map[string]*cluster.Cluster - stopChMap map[string]chan struct{} + clusters map[spec.ClusterName]*cluster.Cluster + stopChMap map[spec.ClusterName]chan struct{} waitCluster sync.WaitGroup postgresqlInformer cache.SharedIndexInformer -} + podInformer cache.SharedIndexInformer -type Event struct { - Type string - Object *spec.Postgresql + podCh chan spec.PodEvent } func New(cfg *Config) *Controller { return &Controller{ config: *cfg, logger: logrus.WithField("pkg", "controller"), - clusters: make(map[string]*cluster.Cluster), - stopChMap: make(map[string]chan struct{}), + clusters: make(map[spec.ClusterName]*cluster.Cluster), + stopChMap: make(map[spec.ClusterName]chan struct{}), + podCh: make(chan spec.PodEvent), } } @@ -65,127 +58,65 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { return } - go c.watchTpr(stopCh) + c.logger.Infof("'%s' namespace will be watched", c.config.PodNamespace) + go c.runInformers(stopCh) c.logger.Info("Started working in background") } -func (c *Controller) watchTpr(stopCh <-chan struct{}) { - go c.postgresqlInformer.Run(stopCh) - - <-stopCh -} - -func (c *Controller) createTPR() error { - TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) - tpr := &v1beta1extensions.ThirdPartyResource{ - ObjectMeta: v1.ObjectMeta{ - Name: TPRName, - //PodNamespace: c.config.PodNamespace, //ThirdPartyResources are cluster-wide - }, - Versions: []v1beta1extensions.APIVersion{ - {Name: constants.TPRApiVersion}, - }, - Description: constants.TPRDescription, - } - - _, err := c.config.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) - if err != nil { - if !k8sutil.IsKubernetesResourceAlreadyExistError(err) { - return err - } else { - c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName) - } - } else { - c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName) - } - - restClient := c.config.RestClient - - return k8sutil.WaitTPRReady(restClient, constants.TPRReadyWaitInterval, constants.TPRReadyWaitTimeout, c.config.PodNamespace) -} - -func (c *Controller) makeClusterConfig() cluster.Config { - return cluster.Config{ - ControllerNamespace: c.config.PodNamespace, - KubeClient: c.config.KubeClient, - RestClient: c.config.RestClient, - EtcdClient: c.config.EtcdClient, - TeamsAPIClient: c.config.TeamsAPIClient, - } -} - func (c *Controller) initController() { err := c.createTPR() if err != nil { c.logger.Fatalf("Can't register ThirdPartyResource: %s", err) } + token, err := c.getOAuthToken() + if err != nil { + c.logger.Errorf("Can't get OAuth token: %s", err) + } else { + c.config.TeamsAPIClient.OAuthToken = token + } + + // Postgresqls + clusterLw := &cache.ListWatch{ + ListFunc: c.clusterListFunc, + WatchFunc: c.clusterWatchFunc, + } c.postgresqlInformer = cache.NewSharedIndexInformer( - cache.NewListWatchFromClient(c.config.RestClient, constants.ResourceName, v1.NamespaceAll, fields.Everything()), + clusterLw, &spec.Postgresql{}, - constants.ResyncPeriod, + constants.ResyncPeriodTPR, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.clusterAdd, - UpdateFunc: c.clusterUpdate, - DeleteFunc: c.clusterDelete, + AddFunc: c.postgresqlAdd, + UpdateFunc: c.postgresqlUpdate, + DeleteFunc: c.postgresqlDelete, + }) + + // Pods + podLw := &cache.ListWatch{ + ListFunc: c.podListFunc, + WatchFunc: c.podWatchFunc, + } + + c.podInformer = cache.NewSharedIndexInformer( + podLw, + &v1.Pod{}, + constants.ResyncPeriodPod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.podAdd, + UpdateFunc: c.podUpdate, + DeleteFunc: c.podDelete, }) } -func (c *Controller) clusterAdd(obj interface{}) { - pg := obj.(*spec.Postgresql) +func (c *Controller) runInformers(stopCh <-chan struct{}) { + go c.postgresqlInformer.Run(stopCh) + go c.podInformer.Run(stopCh) + go c.podEventsDispatcher(stopCh) - //TODO: why do we need to have this check - if pg.Spec == nil { - return - } - - clusterName := (*pg).Metadata.Name - - cl := cluster.New(c.makeClusterConfig(), pg) - err := cl.Create() - if err != nil { - c.logger.Errorf("Can't create cluster: %s", err) - return - } - c.stopChMap[clusterName] = make(chan struct{}) - c.clusters[clusterName] = cl - - c.logger.Infof("Postgresql cluster '%s' has been created", util.FullObjectNameFromMeta((*pg).Metadata)) -} - -func (c *Controller) clusterUpdate(prev, cur interface{}) { - pgPrev := prev.(*spec.Postgresql) - pgCur := cur.(*spec.Postgresql) - - if pgPrev.Spec == nil || pgCur.Spec == nil { - return - } - if pgPrev.Metadata.ResourceVersion == pgCur.Metadata.ResourceVersion { - return - } - - c.logger.Infof("Update: %+v -> %+v", *pgPrev, *pgCur) -} - -func (c *Controller) clusterDelete(obj interface{}) { - pg := obj.(*spec.Postgresql) - if pg.Spec == nil { - return - } - clusterName := (*pg).Metadata.Name - - cluster := cluster.New(c.makeClusterConfig(), pg) - err := cluster.Delete() - if err != nil { - c.logger.Errorf("Can't delete cluster '%s': %s", util.FullObjectNameFromMeta((*pg).Metadata), err) - return - } - - close(c.stopChMap[clusterName]) - delete(c.clusters, clusterName) - - c.logger.Infof("Cluster has been deleted: '%s'", util.FullObjectNameFromMeta((*pg).Metadata)) + <-stopCh } diff --git a/pkg/controller/etcd.go b/pkg/controller/etcd.go index 78dbbb617..127b10231 100644 --- a/pkg/controller/etcd.go +++ b/pkg/controller/etcd.go @@ -2,9 +2,10 @@ package controller import ( "fmt" + "time" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" etcdclient "github.com/coreos/etcd/client" - "time" ) func (c *Controller) initEtcdClient() error { diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go new file mode 100644 index 000000000..d4a8d2313 --- /dev/null +++ b/pkg/controller/pod.go @@ -0,0 +1,134 @@ +package controller + +import ( + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/runtime" + "k8s.io/client-go/pkg/watch" + + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util" +) + +func (c *Controller) podListFunc(options api.ListOptions) (runtime.Object, error) { + var labelSelector string + var fieldSelector string + + if options.LabelSelector != nil { + labelSelector = options.LabelSelector.String() + } + + if options.FieldSelector != nil { + fieldSelector = options.FieldSelector.String() + } + opts := v1.ListOptions{ + LabelSelector: labelSelector, + FieldSelector: fieldSelector, + Watch: options.Watch, + ResourceVersion: options.ResourceVersion, + TimeoutSeconds: options.TimeoutSeconds, + } + + return c.config.KubeClient.CoreV1().Pods(c.config.PodNamespace).List(opts) +} + +func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, error) { + var labelSelector string + var fieldSelector string + + if options.LabelSelector != nil { + labelSelector = options.LabelSelector.String() + } + + if options.FieldSelector != nil { + fieldSelector = options.FieldSelector.String() + } + + opts := v1.ListOptions{ + LabelSelector: labelSelector, + FieldSelector: fieldSelector, + Watch: options.Watch, + ResourceVersion: options.ResourceVersion, + TimeoutSeconds: options.TimeoutSeconds, + } + + return c.config.KubeClient.CoreV1Client.Pods(c.config.PodNamespace).Watch(opts) +} + +func PodNameFromMeta(meta v1.ObjectMeta) spec.PodName { + return spec.PodName{ + Namespace: meta.Namespace, + Name: meta.Name, + } +} + +func (c *Controller) podAdd(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + return + } + + podEvent := spec.PodEvent{ + ClusterName: util.PodClusterName(pod), + PodName: PodNameFromMeta(pod.ObjectMeta), + CurPod: pod, + EventType: spec.PodEventAdd, + } + + c.podCh <- podEvent +} + +func (c *Controller) podUpdate(prev, cur interface{}) { + prevPod, ok := prev.(*v1.Pod) + if !ok { + return + } + + curPod, ok := cur.(*v1.Pod) + if !ok { + return + } + + podEvent := spec.PodEvent{ + ClusterName: util.PodClusterName(curPod), + PodName: PodNameFromMeta(curPod.ObjectMeta), + PrevPod: prevPod, + CurPod: curPod, + EventType: spec.PodEventUpdate, + } + + c.podCh <- podEvent +} + +func (c *Controller) podDelete(obj interface{}) { + pod, ok := obj.(*v1.Pod) + if !ok { + return + } + + podEvent := spec.PodEvent{ + ClusterName: util.PodClusterName(pod), + PodName: PodNameFromMeta(pod.ObjectMeta), + CurPod: pod, + EventType: spec.PodEventDelete, + } + + c.podCh <- podEvent +} + +func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { + c.logger.Infof("Watching all pod events") + for { + select { + case event := <-c.podCh: + if subscriber, ok := c.clusters[event.ClusterName]; ok { + c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) + go subscriber.ReceivePodEvent(event) + } else { + c.logger.Debugf("Skipping pods unrelated to clusters: %s", event.PodName) + } + case <-stopCh: + return + } + } +} diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go new file mode 100644 index 000000000..d3f66de91 --- /dev/null +++ b/pkg/controller/postgresql.go @@ -0,0 +1,189 @@ +package controller + +import ( + "fmt" + "reflect" + + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/meta" + "k8s.io/client-go/pkg/fields" + "k8s.io/client-go/pkg/runtime" + "k8s.io/client-go/pkg/watch" + + "github.bus.zalan.do/acid/postgres-operator/pkg/cluster" + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" +) + +func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) { + c.logger.Info("Getting list of currently running clusters") + object, err := c.config.RestClient.Get(). + Namespace(c.config.PodNamespace). + Resource(constants.ResourceName). + VersionedParams(&options, api.ParameterCodec). + FieldsSelectorParam(fields.Everything()). + Do(). + Get() + + if err != nil { + return nil, fmt.Errorf("Can't get list of postgresql objects: %s", err) + } + + objList, err := meta.ExtractList(object) + if err != nil { + return nil, fmt.Errorf("Can't extract list of postgresql objects: %s", err) + } + + clusterConfig := c.makeClusterConfig() + for _, obj := range objList { + pg, ok := obj.(*spec.Postgresql) + if !ok { + return nil, fmt.Errorf("Can't cast object to postgresql") + } + clusterName := spec.ClusterName{ + Namespace: pg.Metadata.Namespace, + Name: pg.Metadata.Name, + } + + cl := cluster.New(clusterConfig, *pg) + + stopCh := make(chan struct{}) + c.stopChMap[clusterName] = stopCh + c.clusters[clusterName] = cl + cl.LoadResources() + cl.ListResources() + + go cl.Run(stopCh) + } + if len(c.clusters) > 0 { + c.logger.Infof("There are %d clusters currently running", len(c.clusters)) + } else { + c.logger.Infof("No clusters running") + } + + return object, err +} + +func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) { + return c.config.RestClient.Get(). + Prefix("watch"). + Namespace(c.config.PodNamespace). + Resource(constants.ResourceName). + VersionedParams(&options, api.ParameterCodec). + FieldsSelectorParam(fields.Everything()). + Watch() +} + +func (c *Controller) postgresqlAdd(obj interface{}) { + pg, ok := obj.(*spec.Postgresql) + if !ok { + c.logger.Errorf("Can't cast to postgresql spec") + return + } + + clusterName := spec.ClusterName{ + Namespace: pg.Metadata.Namespace, + Name: pg.Metadata.Name, + } + + _, ok = c.clusters[clusterName] + if ok { + c.logger.Infof("Cluster '%s' already exists", clusterName) + return + } + + c.logger.Infof("Creation of a new Postgresql cluster '%s' started", clusterName) + cl := cluster.New(c.makeClusterConfig(), *pg) + cl.MustSetStatus(spec.ClusterStatusCreating) + err := cl.Create() + if err != nil { + c.logger.Errorf("Can't create cluster: %s", err) + cl.MustSetStatus(spec.ClusterStatusAddFailed) + return + } + cl.MustSetStatus(spec.ClusterStatusRunning) //TODO: are you sure it's running? + + stopCh := make(chan struct{}) + c.stopChMap[clusterName] = stopCh + c.clusters[clusterName] = cl + go cl.Run(stopCh) + + c.logger.Infof("Postgresql cluster '%s' has been created", clusterName) +} + +func (c *Controller) postgresqlUpdate(prev, cur interface{}) { + pgPrev, ok := prev.(*spec.Postgresql) + if !ok { + c.logger.Errorf("Can't cast to postgresql spec") + } + pgNew, ok := cur.(*spec.Postgresql) + if !ok { + c.logger.Errorf("Can't cast to postgresql spec") + } + + clusterName := spec.ClusterName{ + Namespace: pgNew.Metadata.Namespace, + Name: pgNew.Metadata.Name, + } + + //TODO: Do not update cluster which is currently creating + + if pgPrev.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion { + c.logger.Debugf("Skipping update with no resource version change") + return + } + pgCluster := c.clusters[clusterName] // current + + if reflect.DeepEqual(pgPrev.Spec, pgNew.Spec) { + c.logger.Infof("Skipping update with no spec change") + return + } + + c.logger.Infof("Cluster update: %s(version: %s) -> %s(version: %s)", + util.NameFromMeta(pgPrev.Metadata), pgPrev.Metadata.ResourceVersion, + util.NameFromMeta(pgNew.Metadata), pgNew.Metadata.ResourceVersion) + + rollingUpdate := pgCluster.NeedsRollingUpdate(pgNew) + if rollingUpdate { + c.logger.Infof("Pods need to be recreated") + } + + pgCluster.MustSetStatus(spec.ClusterStatusUpdating) + err := pgCluster.Update(pgNew, rollingUpdate) + if err != nil { + pgCluster.MustSetStatus(spec.ClusterStatusUpdateFailed) + c.logger.Errorf("Can't update cluster: %s", err) + } else { + c.logger.Infof("Cluster has been updated") + } +} + +func (c *Controller) postgresqlDelete(obj interface{}) { + pgCur, ok := obj.(*spec.Postgresql) + if !ok { + c.logger.Errorf("Can't cast to postgresql spec") + return + } + clusterName := spec.ClusterName{ + Namespace: pgCur.Metadata.Namespace, + Name: pgCur.Metadata.Name, + } + pgCluster, ok := c.clusters[clusterName] + if !ok { + c.logger.Errorf("Unknown cluster: %s", clusterName) + return + } + + c.logger.Infof("Cluster delete: %s", util.NameFromMeta(pgCur.Metadata)) + err := pgCluster.Delete() + if err != nil { + c.logger.Errorf("Can't delete cluster '%s': %s", clusterName, err) + return + } + + close(c.stopChMap[clusterName]) + delete(c.clusters, clusterName) + + c.logger.Infof("Cluster '%s' has been successfully deleted", clusterName) +} diff --git a/pkg/controller/util.go b/pkg/controller/util.go new file mode 100644 index 000000000..efef0b8dc --- /dev/null +++ b/pkg/controller/util.go @@ -0,0 +1,57 @@ +package controller + +import ( + "fmt" + + "github.bus.zalan.do/acid/postgres-operator/pkg/cluster" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/resources" + "k8s.io/client-go/pkg/api" +) + +func (c *Controller) makeClusterConfig() cluster.Config { + return cluster.Config{ + ControllerNamespace: c.config.PodNamespace, + KubeClient: c.config.KubeClient, + RestClient: c.config.RestClient, + EtcdClient: c.config.EtcdClient, + TeamsAPIClient: c.config.TeamsAPIClient, + } +} + +func (c *Controller) getOAuthToken() (string, error) { + // Temporary getting postgresql-operator secret from the NamespaceDefault + credentialsSecret, err := c.config.KubeClient.Secrets(api.NamespaceDefault).Get(constants.OAuthTokenSecretName) + + if err != nil { + return "", fmt.Errorf("Can't get credentials secret: %s", err) + } + data := credentialsSecret.Data + + if string(data["read-only-token-type"]) != "Bearer" { + return "", fmt.Errorf("Wrong token type: %s", data["read-only-token-type"]) + } + + return string(data["read-only-token-secret"]), nil +} + +func (c *Controller) createTPR() error { + TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) + tpr := resources.ThirdPartyResource(TPRName) + + _, err := c.config.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) + if err != nil { + if !k8sutil.ResourceAlreadyExists(err) { + return err + } else { + c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName) + } + } else { + c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName) + } + + restClient := c.config.RestClient + + return k8sutil.WaitTPRReady(restClient, constants.TPRReadyWaitInterval, constants.TPRReadyWaitTimeout, c.config.PodNamespace) +} diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index a63b03b46..fb4266735 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -2,20 +2,24 @@ package spec import ( "encoding/json" + "fmt" + "regexp" + "strings" + "time" "k8s.io/client-go/pkg/api/meta" "k8s.io/client-go/pkg/api/unversioned" "k8s.io/client-go/pkg/api/v1" ) +var alphaRegexp = regexp.MustCompile("^[a-zA-Z]*$") + type MaintenanceWindow struct { - StartTime string - EndTime string - //StartTime time.Time // Start time - //StartWeekday time.Weekday // Start weekday - // - //EndTime time.Time // End time - //EndWeekday time.Weekday // End weekday + StartTime time.Time // Start time + StartWeekday time.Weekday // Start weekday + + EndTime time.Time // End time + EndWeekday time.Weekday // End weekday } type Volume struct { @@ -44,47 +48,37 @@ type Patroni struct { type UserFlags []string -type PostgresSpec struct { - Resources `json:"resources,omitempty"` - Patroni `json:"patroni,omitempty"` - PostgresqlParam `json:"postgresql"` - Volume `json:"volume,omitempty"` +type PostgresStatus string - TeamId string `json:"teamId"` - AllowedSourceRanges []string `json:"allowedSourceRanges"` - NumberOfInstances int32 `json:"numberOfInstances"` - Users map[string]UserFlags `json:"users"` - MaintenanceWindows []string `json:"maintenanceWindows,omitempty"` - PamUsersSecret string `json:"pamUsersSecret,omitempty"` - - EtcdHost string - DockerImage string -} - -type PostgresStatus struct { - // Phase is the cluster running phase - Phase string `json:"phase"` - Reason string `json:"reason"` - - // ControlPuased indicates the operator pauses the control of the cluster. - ControlPaused bool `json:"controlPaused"` - - // Size is the current size of the cluster - Size int `json:"size"` - // CurrentVersion is the current cluster version - CurrentVersion string `json:"currentVersion"` - // TargetVersion is the version the cluster upgrading to. - // If the cluster is not upgrading, TargetVersion is empty. - TargetVersion string `json:"targetVersion"` -} +const ( + ClusterStatusUnknown PostgresStatus = "" + ClusterStatusCreating = "Creating" + ClusterStatusUpdating = "Updating" + ClusterStatusUpdateFailed = "UpdateFailed" + ClusterStatusAddFailed = "CreateFailed" + ClusterStatusRunning = "Running" +) // PostgreSQL Third Party (resource) Object type Postgresql struct { unversioned.TypeMeta `json:",inline"` Metadata v1.ObjectMeta `json:"metadata"` - Spec *PostgresSpec `json:"spec"` - Status *PostgresStatus `json:"status"` + Spec PostgresSpec `json:"spec"` + Status PostgresStatus `json:"status"` +} + +type PostgresSpec struct { + PostgresqlParam `json:"postgresql"` + Volume `json:"volume,omitempty"` + Patroni `json:"patroni,omitempty"` + Resources `json:"resources,omitempty"` + + TeamId string `json:"teamId"` + AllowedSourceRanges []string `json:"allowedSourceRanges"` + NumberOfInstances int32 `json:"numberOfInstances"` + Users map[string]UserFlags `json:"users"` + MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"` } type PostgresqlList struct { @@ -94,6 +88,85 @@ type PostgresqlList struct { Items []Postgresql `json:"items"` } +func parseTime(s string) (t time.Time, wd time.Weekday, wdProvided bool, err error) { + var timeLayout string + + parts := strings.Split(s, ":") + if len(parts) == 3 { + if len(parts[0]) != 3 || !alphaRegexp.MatchString(parts[0]) { + err = fmt.Errorf("Weekday must be 3 characters length") + return + } + timeLayout = "Mon:15:04" + wdProvided = true + } else { + wdProvided = false + timeLayout = "15:04" + } + + tp, err := time.Parse(timeLayout, s) + if err != nil { + return + } + + wd = tp.Weekday() + t = tp.UTC() + + return +} + +func (m *MaintenanceWindow) MarshalJSON() ([]byte, error) { + var startWd, endWd string + if m.StartWeekday == time.Sunday && m.EndWeekday == time.Saturday { + startWd = "" + endWd = "" + } else { + startWd = m.StartWeekday.String()[:3] + ":" + endWd = m.EndWeekday.String()[:3] + ":" + } + + return []byte(fmt.Sprintf("\"%s%s-%s%s\"", + startWd, m.StartTime.Format("15:04"), + endWd, m.EndTime.Format("15:04"))), nil +} + +func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error { + var ( + got MaintenanceWindow + weekdayProvidedFrom bool + weekdayProvidedTo bool + err error + ) + + parts := strings.Split(string(data[1:len(data)-1]), "-") + if len(parts) != 2 { + return fmt.Errorf("Incorrect maintenance window format") + } + + got.StartTime, got.StartWeekday, weekdayProvidedFrom, err = parseTime(parts[0]) + if err != nil { + return err + } + + got.EndTime, got.EndWeekday, weekdayProvidedTo, err = parseTime(parts[1]) + if err != nil { + return err + } + + if got.EndTime.Before(got.StartTime) { + return fmt.Errorf("'From' time must be prior to the 'To' time.") + } + + if !weekdayProvidedFrom || !weekdayProvidedTo { + got.StartWeekday = time.Sunday + got.EndWeekday = time.Saturday + } + + *m = got + + return nil +} + func (p *Postgresql) GetObjectKind() unversioned.ObjectKind { return &p.TypeMeta } diff --git a/pkg/spec/types.go b/pkg/spec/types.go new file mode 100644 index 000000000..656e9fa5b --- /dev/null +++ b/pkg/spec/types.go @@ -0,0 +1,40 @@ +package spec + +import ( + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/types" +) + +type PodEventType string + +type PodName types.NamespacedName + +const ( + PodEventAdd PodEventType = "ADD" + PodEventUpdate PodEventType = "UPDATE" + PodEventDelete PodEventType = "DELETE" +) + +type PodEvent struct { + ClusterName ClusterName + PodName PodName + PrevPod *v1.Pod + CurPod *v1.Pod + EventType PodEventType +} + +func (p PodName) String() string { + return types.NamespacedName(p).String() +} + +type ClusterName types.NamespacedName + +func (c ClusterName) String() string { + return types.NamespacedName(c).String() +} + +type PgUser struct { + Name string + Password string + Flags []string +} diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go index b2d58093e..fa330b81c 100644 --- a/pkg/util/constants/constants.go +++ b/pkg/util/constants/constants.go @@ -12,15 +12,25 @@ const ( ResourceCheckInterval = 3 * time.Second ResourceCheckTimeout = 10 * time.Minute - ResourceName = TPRName + "s" - ResyncPeriod = 5 * time.Minute + PodLabelWaitTimeout = 10 * time.Minute + PodDeletionWaitTimeout = 10 * time.Minute + + ResourceName = TPRName + "s" + ResyncPeriodTPR = 5 * time.Minute + ResyncPeriodPod = 5 * time.Minute + + SuperuserName = "postgres" + ReplicationUsername = "replication" //TODO: move to the operator spec - EtcdHost = "etcd-client.default.svc.cluster.local:2379" - SpiloImage = "registry.opensource.zalan.do/acid/spilo-9.6:1.2-p12" - PamRoleName = "zalandos" - PamConfiguration = "https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees" + EtcdHost = "etcd-client.default.svc.cluster.local:2379" + SpiloImage = "registry.opensource.zalan.do/acid/spilo-9.6:1.2-p12" + PamRoleName = "zalandos" + PamConfiguration = "https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees" + PasswordLength = 64 + TeamsAPIUrl = "https://teams.example.com/api/" + UserSecretTemplate = "%s.%s.credentials.%s.%s" - PasswordLength = 64 - TeamsAPIUrl = "https://teams.example.com/api/" + OAuthTokenSecretName = "postgresql-operator" + ServiceAccountName = "operator" ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index b118df506..e797c4282 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -33,7 +33,7 @@ func KubernetesClient(config *rest.Config) (client *kubernetes.Clientset, err er return kubernetes.NewForConfig(config) } -func IsKubernetesResourceAlreadyExistError(err error) bool { +func ResourceAlreadyExists(err error) bool { return apierrors.IsAlreadyExists(err) } diff --git a/pkg/util/resources/factory.go b/pkg/util/resources/factory.go new file mode 100644 index 000000000..eabeffc9c --- /dev/null +++ b/pkg/util/resources/factory.go @@ -0,0 +1,265 @@ +package resources + +import ( + "fmt" + + "k8s.io/client-go/pkg/api/resource" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/apps/v1beta1" + extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1" + "k8s.io/client-go/pkg/labels" + "k8s.io/client-go/pkg/util/intstr" + + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" +) + +const ( + superuserName = "postgres" + replicationUsername = "replication" +) + +func credentialSecretName(clusterName, username string) string { + return fmt.Sprintf( + constants.UserSecretTemplate, + username, + clusterName, + constants.TPRName, + constants.TPRVendor) +} + +func labelsSet(clusterName string) labels.Set { + return labels.Set{ + "application": "spilo", + "spilo-cluster": clusterName, + } +} + +func ResourceList(resources spec.Resources) *v1.ResourceList { + resourceList := v1.ResourceList{} + if resources.Cpu != "" { + resourceList[v1.ResourceCPU] = resource.MustParse(resources.Cpu) + } + + if resources.Memory != "" { + resourceList[v1.ResourceMemory] = resource.MustParse(resources.Memory) + } + + return &resourceList +} + +func PodTemplate(cluster spec.ClusterName, resourceList *v1.ResourceList, dockerImage, pgVersion, etcdHost string) *v1.PodTemplateSpec { + envVars := []v1.EnvVar{ + { + Name: "SCOPE", + Value: cluster.Name, + }, + { + Name: "PGROOT", + Value: "/home/postgres/pgdata/pgroot", + }, + { + Name: "ETCD_HOST", + Value: etcdHost, + }, + { + Name: "POD_IP", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.podIP", + }, + }, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "PGPASSWORD_SUPERUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: credentialSecretName(cluster.Name, superuserName), + }, + Key: "password", + }, + }, + }, + { + Name: "PGPASSWORD_STANDBY", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: credentialSecretName(cluster.Name, replicationUsername), + }, + Key: "password", + }, + }, + }, + { + Name: "PAM_OAUTH2", //TODO: get from the operator tpr spec + Value: constants.PamConfiguration, //space before uid is obligatory + }, + { + Name: "SPILO_CONFIGURATION", //TODO: get from the operator tpr spec + Value: fmt.Sprintf(` +postgresql: + bin_dir: /usr/lib/postgresql/%s/bin +bootstrap: + initdb: + - auth-host: md5 + - auth-local: trust + users: + %s: + password: NULL + options: + - createdb + - nologin + pg_hba: + - hostnossl all all all reject + - hostssl all +%s all pam + - hostssl all all all md5`, pgVersion, constants.PamRoleName, constants.PamRoleName), + }, + } + + container := v1.Container{ + Name: cluster.Name, + Image: dockerImage, + ImagePullPolicy: v1.PullAlways, + Resources: v1.ResourceRequirements{ + Requests: *resourceList, + }, + Ports: []v1.ContainerPort{ + { + ContainerPort: 8008, + Protocol: v1.ProtocolTCP, + }, + { + ContainerPort: 5432, + Protocol: v1.ProtocolTCP, + }, + }, + VolumeMounts: []v1.VolumeMount{ + { + Name: "pgdata", + MountPath: "/home/postgres/pgdata", //TODO: fetch from manifesto + }, + }, + Env: envVars, + } + terminateGracePeriodSeconds := int64(30) + + podSpec := v1.PodSpec{ + ServiceAccountName: constants.ServiceAccountName, + TerminationGracePeriodSeconds: &terminateGracePeriodSeconds, + Volumes: []v1.Volume{ + { + Name: "pgdata", + VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}, + }, + }, + Containers: []v1.Container{container}, + } + + template := v1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: labelsSet(cluster.Name), + Namespace: cluster.Namespace, + Annotations: map[string]string{"pod.alpha.kubernetes.io/initialized": "true"}, + }, + Spec: podSpec, + } + + return &template +} + +func StatefulSet(cluster spec.ClusterName, podTemplate *v1.PodTemplateSpec, numberOfInstances int32) *v1beta1.StatefulSet { + statefulSet := &v1beta1.StatefulSet{ + ObjectMeta: v1.ObjectMeta{ + Name: cluster.Name, + Namespace: cluster.Namespace, + Labels: labelsSet(cluster.Name), + }, + Spec: v1beta1.StatefulSetSpec{ + Replicas: &numberOfInstances, + ServiceName: cluster.Name, + Template: *podTemplate, + }, + } + + return statefulSet +} + +func UserSecrets(cluster spec.ClusterName, pgUsers map[string]spec.PgUser) (secrets map[string]*v1.Secret, err error) { + secrets = make(map[string]*v1.Secret, len(pgUsers)) + namespace := cluster.Namespace + for username, pgUser := range pgUsers { + //Skip users with no password i.e. human users (they'll be authenticated using pam) + if pgUser.Password == "" { + continue + } + secret := v1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: credentialSecretName(cluster.Name, username), + Namespace: namespace, + Labels: labelsSet(cluster.Name), + }, + Type: v1.SecretTypeOpaque, + Data: map[string][]byte{ + "username": []byte(pgUser.Name), + "password": []byte(pgUser.Password), + }, + } + secrets[username] = &secret + } + + return +} + +func Service(cluster spec.ClusterName, allowedSourceRanges []string) *v1.Service { + service := &v1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: cluster.Name, + Namespace: cluster.Namespace, + Labels: labelsSet(cluster.Name), + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeLoadBalancer, + Ports: []v1.ServicePort{{Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, + LoadBalancerSourceRanges: allowedSourceRanges, + }, + } + + return service +} + +func Endpoint(cluster spec.ClusterName) *v1.Endpoints { + endpoints := &v1.Endpoints{ + ObjectMeta: v1.ObjectMeta{ + Name: cluster.Name, + Namespace: cluster.Namespace, + Labels: labelsSet(cluster.Name), + }, + } + + return endpoints +} + +func ThirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource { + return &extv1beta.ThirdPartyResource{ + ObjectMeta: v1.ObjectMeta{ + //ThirdPartyResources are cluster-wide + Name: TPRName, + }, + Versions: []extv1beta.APIVersion{ + {Name: constants.TPRApiVersion}, + }, + Description: constants.TPRDescription, + } +} diff --git a/pkg/util/teams/teams.go b/pkg/util/teams/teams.go index 77e3d1acb..6811c48fb 100644 --- a/pkg/util/teams/teams.go +++ b/pkg/util/teams/teams.go @@ -38,7 +38,7 @@ type Team struct { type TeamsAPI struct { url string httpClient *http.Client - OauthToken string + OAuthToken string } func NewTeamsAPI(url string) *TeamsAPI { @@ -57,7 +57,7 @@ func (t *TeamsAPI) TeamInfo(teamId string) (*Team, error) { return nil, err } - req.Header.Add("Authorization", "Bearer "+t.OauthToken) + req.Header.Add("Authorization", "Bearer "+t.OAuthToken) resp, err := t.httpClient.Do(req) if err != nil { return nil, err diff --git a/pkg/util/util.go b/pkg/util/util.go index 6410d4ad7..63f5984f4 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -1,11 +1,12 @@ package util import ( - "fmt" "math/rand" "time" + "github.bus.zalan.do/acid/postgres-operator/pkg/spec" "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/types" ) var passwordChars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") @@ -23,15 +24,22 @@ func RandomPassword(n int) string { return string(b) } -func FullObjectNameFromMeta(meta v1.ObjectMeta) string { - return FullObjectName(meta.Namespace, meta.Name) -} - -//TODO: Remove in favour of FullObjectNameFromMeta -func FullObjectName(ns, name string) string { - if ns == "" { - ns = "default" +func NameFromMeta(meta v1.ObjectMeta) types.NamespacedName { + obj := types.NamespacedName{ + Namespace: meta.Namespace, + Name: meta.Name, } - return fmt.Sprintf("%s / %s", ns, name) + return obj +} + +func PodClusterName(pod *v1.Pod) spec.ClusterName { + if name, ok := pod.Labels["spilo-cluster"]; ok { + return spec.ClusterName{ + Namespace: pod.Namespace, + Name: name, + } + } + + return spec.ClusterName{} }