From 51d73fb1724cc740259630b124065c57fac14a24 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Fri, 9 Jun 2017 21:20:29 +0200 Subject: [PATCH 01/11] Replace service annotations when updating services. In case the whole annotation changes (like the external DNS) we don't want to keep the old one hanging around. Unline specs, we don't expect anyone except the operator to change the annotations. Use StrategicMergePatchType in order to replace the annotations map completely. --- pkg/cluster/resources.go | 15 +++++++++++++++ pkg/cluster/util.go | 10 ++++++++++ 2 files changed, 25 insertions(+) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index aa5964302..d9318ed04 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -250,6 +250,20 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error } serviceName := util.NameFromMeta(c.Service[role].ObjectMeta) + if len(newService.ObjectMeta.Annotations) > 0 { + + annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations) + + _, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( + c.Service[role].Name, + api.StrategicMergePatchType, + []byte(annotationsPatchData), "") + + if err != nil { + return fmt.Errorf("could not replace annotations for the service '%s': %v", serviceName, err) + } + } + patchData, err := specPatch(newService.Spec) if err != nil { return fmt.Errorf("could not form patch for the service '%s': %v", serviceName, err) @@ -267,6 +281,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error return nil } + func (c *Cluster) deleteService(role PostgresRole) error { c.logger.Debugf("Deleting service %s", role) if c.Service[role] == nil { diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index b6416cd7b..71986aa57 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -63,6 +63,16 @@ func specPatch(spec interface{}) ([]byte, error) { }{spec}) } +func metadataAnnotationsPatch(annotations map[string]string) (string) { + annotationsList := make([]string, 0, len(annotations)) + + for name, value := range(annotations) { + annotationsList = append(annotationsList, fmt.Sprintf(`"%s":"%s"`, name, value)) + } + annotationsString := strings.Join(annotationsList, ",") + return fmt.Sprintf(`{"metadata":{"annotations": {"$patch":"replace", %s}}}`, annotationsString) +} + func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { if isUpdate { c.logger.Infof("statefulset '%s' has been changed", From 17826ee4342683e3bc5f49c625e0845431044e9b Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Fri, 9 Jun 2017 21:27:17 +0200 Subject: [PATCH 02/11] Go fmt run. --- pkg/cluster/resources.go | 1 - pkg/cluster/util.go | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index d9318ed04..7fad9f3a1 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -281,7 +281,6 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error return nil } - func (c *Cluster) deleteService(role PostgresRole) error { c.logger.Debugf("Deleting service %s", role) if c.Service[role] == nil { diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 71986aa57..42917fd24 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -63,10 +63,10 @@ func specPatch(spec interface{}) ([]byte, error) { }{spec}) } -func metadataAnnotationsPatch(annotations map[string]string) (string) { +func metadataAnnotationsPatch(annotations map[string]string) string { annotationsList := make([]string, 0, len(annotations)) - for name, value := range(annotations) { + for name, value := range annotations { annotationsList = append(annotationsList, fmt.Sprintf(`"%s":"%s"`, name, value)) } annotationsString := strings.Join(annotationsList, ",") From 987990fb0e3b0e06e03f57f6b401f8e9bda3d754 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Mon, 12 Jun 2017 10:02:13 +0200 Subject: [PATCH 03/11] Move service annotation patch template into the constants. --- pkg/cluster/util.go | 3 ++- pkg/util/constants/annotations.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 42917fd24..81b715801 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -70,7 +70,8 @@ func metadataAnnotationsPatch(annotations map[string]string) string { annotationsList = append(annotationsList, fmt.Sprintf(`"%s":"%s"`, name, value)) } annotationsString := strings.Join(annotationsList, ",") - return fmt.Sprintf(`{"metadata":{"annotations": {"$patch":"replace", %s}}}`, annotationsString) + // TODO: perhaps use patchStrategy:"replace" json annotation instead of constructing the patch literally. + return fmt.Sprintf(constants.ServiceMetadataAnnotationFormat, annotationsString) } func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { diff --git a/pkg/util/constants/annotations.go b/pkg/util/constants/annotations.go index 0b93fc2e1..3b276cc3d 100644 --- a/pkg/util/constants/annotations.go +++ b/pkg/util/constants/annotations.go @@ -7,4 +7,5 @@ const ( ElbTimeoutAnnotationValue = "3600" KubeIAmAnnotation = "iam.amazonaws.com/role" VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" + ServiceMetadataAnnotationFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}` ) From c773a4e43d1aff9962e583bed17f398cde204f83 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Mon, 12 Jun 2017 16:31:25 +0200 Subject: [PATCH 04/11] A bare bones .travis.yml (#48) --- .travis.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 000000000..553c85d60 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,16 @@ +dist: trusty + +language: go + +go: + - 1.8 + +before_install: + - go get github.com/Masterminds/glide + - go get github.com/mattn/goveralls + +install: + - make deps + +script: + - goverals -service=travis-ci From 9a6b0b8c37d1fce768914d1b6a69b10c48934880 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Mon, 12 Jun 2017 17:29:33 +0200 Subject: [PATCH 05/11] Tests for teams API (#46) --- pkg/spec/types.go | 2 +- pkg/util/teams/teams.go | 52 +++++++++++++++++++++++------------- pkg/util/teams/teams_test.go | 43 ++++++++++++++++++++++++++++- 3 files changed, 76 insertions(+), 21 deletions(-) diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 9d189d906..822395ce9 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -1,9 +1,9 @@ package spec import ( - "database/sql" "fmt" "strings" + "database/sql" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/types" diff --git a/pkg/util/teams/teams.go b/pkg/util/teams/teams.go index 566dbb517..b3d80349c 100644 --- a/pkg/util/teams/teams.go +++ b/pkg/util/teams/teams.go @@ -39,11 +39,14 @@ type team struct { InfrastructureAccounts []infrastructureAccount `json:"infrastructure-accounts"` } -// +type httpClient interface { + Do(req *http.Request) (*http.Response, error) +} + type API struct { - url string - httpClient *http.Client - logger *logrus.Entry + httpClient + url string + logger *logrus.Entry } // NewTeamsAPI creates an object to query the team API. @@ -58,23 +61,28 @@ func NewTeamsAPI(url string, log *logrus.Logger) *API { } // TeamInfo returns information about a given team using its ID and a token to authenticate to the API service. -func (t *API) TeamInfo(teamID, token string) (tm *team, er error) { +func (t *API) TeamInfo(teamID, token string) (tm *team, err error) { + var ( + req *http.Request + resp *http.Response + ) + url := fmt.Sprintf("%s/teams/%s", t.url, teamID) t.logger.Debugf("Request url: %s", url) - req, err := http.NewRequest("GET", url, nil) + req, err = http.NewRequest("GET", url, nil) if err != nil { - return nil, err + return } req.Header.Add("Authorization", "Bearer "+token) - resp, err := t.httpClient.Do(req) + resp, err = t.httpClient.Do(req) if err != nil { - return nil, err + return } defer func() { - if err := resp.Body.Close(); err != nil { - er = fmt.Errorf("error when closing response; %v", err) - tm = nil + closeErr := resp.Body.Close() + if closeErr != nil { + err = fmt.Errorf("error when closing response: %v", closeErr) } }() if resp.StatusCode != 200 { @@ -82,21 +90,27 @@ func (t *API) TeamInfo(teamID, token string) (tm *team, er error) { d := json.NewDecoder(resp.Body) err = d.Decode(&raw) if err != nil { - return nil, fmt.Errorf("team API query failed with status code %d and malformed response: %v", resp.StatusCode, err) + err = fmt.Errorf("team API query failed with status code %d and malformed response: %v", resp.StatusCode, err) + return } if errMessage, ok := raw["error"]; ok { - return nil, fmt.Errorf("team API query failed with status code %d and message: '%v'", resp.StatusCode, string(errMessage)) + err = fmt.Errorf("team API query failed with status code %d and message: '%v'", resp.StatusCode, string(errMessage)) + return } + err = fmt.Errorf("team API query failed with status code %d", resp.StatusCode) - return nil, fmt.Errorf("team API query failed with status code %d", resp.StatusCode) + return } - teamInfo := &team{} + + tm = &team{} d := json.NewDecoder(resp.Body) - err = d.Decode(teamInfo) + err = d.Decode(tm) if err != nil { - return nil, fmt.Errorf("could not parse team API response: %v", err) + err = fmt.Errorf("could not parse team API response: %v", err) + tm = nil + return } - return teamInfo, nil + return } diff --git a/pkg/util/teams/teams_test.go b/pkg/util/teams/teams_test.go index eac29a5d3..b23bd5622 100644 --- a/pkg/util/teams/teams_test.go +++ b/pkg/util/teams/teams_test.go @@ -2,11 +2,12 @@ package teams import ( "fmt" - "github.com/Sirupsen/logrus" "net/http" "net/http/httptest" "reflect" "testing" + + "github.com/Sirupsen/logrus" ) var ( @@ -168,6 +169,46 @@ func TestInfo(t *testing.T) { } } +type mockHttpClient struct { +} + +type mockBody struct { +} + +func (b *mockBody) Read(p []byte) (n int, err error) { + return 2, nil +} + +func (b *mockBody) Close() error { + return fmt.Errorf("close error") +} + +func (c *mockHttpClient) Do(req *http.Request) (*http.Response, error) { + resp := http.Response{ + Status: "200 OK", + StatusCode: 200, + ContentLength: 2, + Close: false, + Request: req, + } + resp.Body = &mockBody{} + + return &resp, nil +} + +func TestHttpClientClose(t *testing.T) { + ts := httptest.NewServer(nil) + + api := NewTeamsAPI(ts.URL, logger) + api.httpClient = &mockHttpClient{} + + _, err := api.TeamInfo("acid", token) + expError := fmt.Errorf("error when closing response: close error") + if err.Error() != expError.Error() { + t.Errorf("Expected error: %v, got: %v", expError, err) + } +} + func TestRequest(t *testing.T) { for _, tc := range requestsURLtc { api := NewTeamsAPI(tc.url, logger) From 0de1537c4e14f4c0566c03fe200539edc74c7032 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Mon, 12 Jun 2017 17:39:41 +0200 Subject: [PATCH 06/11] Travis CI (#49) --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 553c85d60..26bdc4d78 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,4 +13,4 @@ install: - make deps script: - - goverals -service=travis-ci + - goveralls -service=travis-ci From fe6c162e5914bb768eabab07c402f243b008237a Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Mon, 12 Jun 2017 18:10:23 +0200 Subject: [PATCH 07/11] Add a few badges (#50) #6 --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 3d2c01b39..1464f7b14 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # postgres operator +[![Build Status](https://travis-ci.org/zalando-incubator/postgres-operator.svg?branch=master)](https://travis-ci.org/zalando-incubator/postgres-operator) +[![Coverage Status](https://coveralls.io/repos/github/zalando-incubator/postgres-operator/badge.svg)](https://coveralls.io/github/zalando-incubator/postgres-operator) +[![Go Report Card](https://goreportcard.com/badge/github.com/zalando-incubator/postgres-operator)](https://goreportcard.com/report/github.com/zalando-incubator/postgres-operator) + The Postgres operator manages Postgres clusters in Kubernetes using the [operator pattern](https://coreos.com/blog/introducing-operators.html). During the initial run it registers the [Third Party Resource (TPR)](https://kubernetes.io/docs/user-guide/thirdpartyresources/) for Postgres. The Postgresql TPR is essentially the schema that describes the contents of the manifests for deploying individual Postgres clusters using Statefulsets and Patroni. From 2f5a3494774cc98eb89aad67bed8c6a8ab5f566e Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Tue, 13 Jun 2017 18:12:04 +0200 Subject: [PATCH 08/11] CDP integration (#51) Build docker images from the master branch via CDP and push them to the registry.opensource.zalan.do --- delivery.yaml | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 delivery.yaml diff --git a/delivery.yaml b/delivery.yaml new file mode 100644 index 000000000..efa9c23a9 --- /dev/null +++ b/delivery.yaml @@ -0,0 +1,23 @@ +build_steps: + - desc: 'Install required build software' + cmd: | + apt-get install -y make git apt-transport-https ca-certificates curl + - desc: 'Install go' + cmd: | + add-apt-repository ppa:longsleep/golang-backports + apt-get update + apt-get install -y golang-go + - desc: 'Install Docker' + cmd: | + curl -sSL https://get.docker.com/ | sh + - desc: 'Symlink sources into the GOPATH' + cmd: | + export GOPATH=$HOME/go + export OPERATOR_TOP_DIR=$GOPATH/src/github.com/zalando-incubator + mkdir -p $OPERATOR_TOP_DIR + ln -s $(pwd) $OPERATOR_TOP_DIR/postgres-operator + - desc: 'Build & push' + cmd: | + export PATH=$PATH:$HOME/go/bin + export IMAGE=registry-write.opensource.zalan.do/acid/postgres-operator + make tools deps docker push From ba6529bec994a035856a2d8af4fa542f7820ea72 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Wed, 14 Jun 2017 14:56:04 +0200 Subject: [PATCH 09/11] Avoid pushing postgres-operator image on PRs. (#52) Do push as postgres-operator from the master branch, and as a postgres-operator-test when evaluating pull-requests. --- delivery.yaml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/delivery.yaml b/delivery.yaml index efa9c23a9..4b597c482 100644 --- a/delivery.yaml +++ b/delivery.yaml @@ -16,8 +16,15 @@ build_steps: export OPERATOR_TOP_DIR=$GOPATH/src/github.com/zalando-incubator mkdir -p $OPERATOR_TOP_DIR ln -s $(pwd) $OPERATOR_TOP_DIR/postgres-operator - - desc: 'Build & push' + - desc: 'Build & push docker image' cmd: | export PATH=$PATH:$HOME/go/bin - export IMAGE=registry-write.opensource.zalan.do/acid/postgres-operator + IS_PR_BUILD=${CDP_PULL_REQUEST_NUMBER+"true"} + if [[ ${CDP_TARGET_BRANCH} == "master" && ${IS_PR_BUILD} != "true" ]] + then + IMAGE=registry-write.opensource.zalan.do/acid/postgres-operator + else + IMAGE=registry-write.opensource.zalan.do/acid/postgres-operator-test + fi + export IMAGE make tools deps docker push From 00150711e467a9437a68b5c81d44c7ff6daa0a8c Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Fri, 30 Jun 2017 13:38:49 +0200 Subject: [PATCH 10/11] Configure load balancer on a per-cluster and operator-wide level (#57) * Deny all requests to the load balancer by default. * Operator-wide toggle for the load-balancer. * Define per-cluster useLoadBalancer option. If useLoadBalancer is not set - then operator-wide defaults take place. If it is true - the load balancer is created, otherwise a service type clusterIP is created. Internally, we have to completely replace the service if the service type changes. We cannot patch, since some fields from the old service that will remain after patch are incompatible with the new one, and handling them explicitly when updating the service is ugly and error-prone. We cannot update the service because of the immutable fields, that leaves us the only option of deleting the old service and creating the new one. Unfortunately, there is still an issue of unnecessary removal of endpoints associated with the service, it will be addressed in future commits. * Revert the unintended effect of go fmt * Recreate endpoints on service update. When the service type is changed, the service is deleted and then the one with the new type is created. Unfortnately, endpoints are deleted as well. Re-create them afterwards, preserving the original addresses stored in them. * Improve error messages and comments. Use generate instead of gen in names. --- manifests/configmap.yaml | 1 + manifests/testpostgresql.yaml | 1 + pkg/cluster/cluster.go | 10 ++-- pkg/cluster/k8sres.go | 78 ++++++++++++++++++++----------- pkg/cluster/resources.go | 48 +++++++++++++++++-- pkg/cluster/sync.go | 4 +- pkg/cluster/util.go | 4 +- pkg/spec/postgresql.go | 6 ++- pkg/spec/types.go | 1 + pkg/util/config/config.go | 1 + pkg/util/constants/annotations.go | 12 ++--- 11 files changed, 120 insertions(+), 46 deletions(-) diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index f5f1920f7..270df6b0e 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -28,3 +28,4 @@ data: super_username: postgres teams_api_url: http://fake-teams-api.default.svc.cluster.local workers: "4" + enable_load_balancer: "true" diff --git a/manifests/testpostgresql.yaml b/manifests/testpostgresql.yaml index 1728f46a7..e80d667ec 100644 --- a/manifests/testpostgresql.yaml +++ b/manifests/testpostgresql.yaml @@ -41,6 +41,7 @@ spec: loop_wait: &loop_wait 10 retry_timeout: 10 maximum_lag_on_failover: 33554432 + useLoadBalancer: true maintenanceWindows: - 01:00-06:00 #UTC - Sat:00:00-04:00 diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index eaa56ec46..e62f0b16e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -242,6 +242,10 @@ func (c *Cluster) Create() error { func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match bool, reason string) { //TODO: improve comparison match = true + if c.Service[role].Spec.Type != service.Spec.Type { + return false, fmt.Sprintf("new %s service's type %s doesn't match the current one %s", + role, service.Spec.Type, c.Service[role].Spec.Type) + } oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges newSourceRanges := service.Spec.LoadBalancerSourceRanges /* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */ @@ -292,7 +296,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp // In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through // and the combined effect of all the changes should be applied. // TODO: log all reasons for changing the statefulset, not just the last one. - // TODO: make sure this is in sync with genPodTemplate, ideally by using the same list of fields to generate + // TODO: make sure this is in sync with generatePodTemplate, ideally by using the same list of fields to generate // the template and the diff if c.Statefulset.Spec.Template.Spec.ServiceAccountName != statefulSet.Spec.Template.Spec.ServiceAccountName { needsReplace = true @@ -435,7 +439,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { continue } } - newService := c.genService(role, newSpec.Spec.AllowedSourceRanges) + newService := c.generateService(role, &newSpec.Spec) if match, reason := c.sameServiceWith(role, newService); !match { c.logServiceChanges(role, c.Service[role], newService, true, reason) if err := c.updateService(role, newService); err != nil { @@ -446,7 +450,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } } - newStatefulSet, err := c.genStatefulSet(newSpec.Spec) + newStatefulSet, err := c.generateStatefulSet(newSpec.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index fd71661ad..0cba32837 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -18,6 +18,7 @@ const ( pgBinariesLocationTemplate = "/usr/lib/postgresql/%s/bin" patroniPGBinariesParameterName = "bin_dir" patroniPGParametersParameterName = "parameters" + localHost = "127.0.0.1/32" ) type pgUser struct { @@ -203,7 +204,7 @@ PATRONI_INITDB_PARAMS: return string(result) } -func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec { +func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec { spiloConfiguration := c.generateSpiloJSONConfiguration(pgParameters, patroniParameters) envVars := []v1.EnvVar{ @@ -323,14 +324,14 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements, return &template } -func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) { +func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) { resourceRequirements, err := c.resourceRequirements(spec.Resources) if err != nil { return nil, err } - podTemplate := c.genPodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni) - volumeClaimTemplate, err := persistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass) + podTemplate := c.generatePodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni) + volumeClaimTemplate, err := generatePersistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass) if err != nil { return nil, err } @@ -352,7 +353,7 @@ func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, return statefulSet, nil } -func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { +func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { metadata := v1.ObjectMeta{ Name: constants.DataVolumeName, } @@ -383,19 +384,19 @@ func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.P return volumeClaim, nil } -func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret) { +func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) { secrets = make(map[string]*v1.Secret, len(c.pgUsers)) namespace := c.Metadata.Namespace for username, pgUser := range c.pgUsers { //Skip users with no password i.e. human users (they'll be authenticated using pam) - secret := c.genSingleUserSecret(namespace, pgUser) + secret := c.generateSingleUserSecret(namespace, pgUser) if secret != nil { secrets[username] = secret } } /* special case for the system user */ for _, systemUser := range c.systemUsers { - secret := c.genSingleUserSecret(namespace, systemUser) + secret := c.generateSingleUserSecret(namespace, systemUser) if secret != nil { secrets[systemUser.Name] = secret } @@ -404,7 +405,7 @@ func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret) { return } -func (c *Cluster) genSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret { +func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret { //Skip users with no password i.e. human users (they'll be authenticated using pam) if pgUser.Password == "" { return nil @@ -425,7 +426,7 @@ func (c *Cluster) genSingleUserSecret(namespace string, pgUser spec.PgUser) *v1. return &secret } -func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v1.Service { +func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service { dnsNameFunction := c.masterDnsName name := c.Metadata.Name @@ -434,30 +435,52 @@ func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v name = name + "-repl" } + serviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, + Type: v1.ServiceTypeClusterIP, + } + + if role == Replica { + serviceSpec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)} + } + + var annotations map[string]string + + // Examine the per-cluster load balancer setting, if it is not defined - check the operator configuration. + if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) || + (newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) { + + // safe default value: lock load balancer to only local address unless overriden explicitely. + sourceRanges := []string{localHost} + allowedSourceRanges := newSpec.AllowedSourceRanges + if len(allowedSourceRanges) >= 0 { + sourceRanges = allowedSourceRanges + } + + serviceSpec.Type = v1.ServiceTypeLoadBalancer + serviceSpec.LoadBalancerSourceRanges = sourceRanges + + annotations = map[string]string{ + constants.ZalandoDNSNameAnnotation: dnsNameFunction(), + constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, + } + + } + service := &v1.Service{ ObjectMeta: v1.ObjectMeta{ - Name: name, - Namespace: c.Metadata.Namespace, - Labels: c.roleLabelsSet(role), - Annotations: map[string]string{ - constants.ZalandoDNSNameAnnotation: dnsNameFunction(), - constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, - }, + Name: name, + Namespace: c.Metadata.Namespace, + Labels: c.roleLabelsSet(role), + Annotations: annotations, }, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeLoadBalancer, - Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, - LoadBalancerSourceRanges: allowedSourceRanges, - }, - } - if role == Replica { - service.Spec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)} + Spec: serviceSpec, } return service } -func (c *Cluster) genMasterEndpoints() *v1.Endpoints { +func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints { endpoints := &v1.Endpoints{ ObjectMeta: v1.ObjectMeta{ Name: c.Metadata.Name, @@ -465,6 +488,9 @@ func (c *Cluster) genMasterEndpoints() *v1.Endpoints { Labels: c.roleLabelsSet(Master), }, } + if len(subsets) > 0 { + endpoints.Subsets = subsets + } return endpoints } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 7fad9f3a1..b71f8355f 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -119,7 +119,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { if c.Statefulset != nil { return nil, fmt.Errorf("statefulset already exists in the cluster") } - statefulSetSpec, err := c.genStatefulSet(c.Spec) + statefulSetSpec, err := c.generateStatefulSet(c.Spec) if err != nil { return nil, fmt.Errorf("could not generate statefulset: %v", err) } @@ -233,7 +233,7 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { if c.Service[role] != nil { return nil, fmt.Errorf("service already exists in the cluster") } - serviceSpec := c.genService(role, c.Spec.AllowedSourceRanges) + serviceSpec := c.generateService(role, &c.Spec) service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec) if err != nil { @@ -249,9 +249,47 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error return fmt.Errorf("there is no service in the cluster") } serviceName := util.NameFromMeta(c.Service[role].ObjectMeta) + endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta) + // TODO: check if it possible to change the service type with a patch in future versions of Kubernetes + if newService.Spec.Type != c.Service[role].Spec.Type { + // service type has changed, need to replace the service completely. + // we cannot use just pach the current service, since it may contain attributes incompatible with the new type. + var ( + currentEndpoint *v1.Endpoints + err error + ) + + if role == Master { + // for the master service we need to re-create the endpoint as well. Get the up-to-date version of + // the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) + currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name) + if err != nil { + return fmt.Errorf("could not get current cluster endpoints: %v", err) + } + } + err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions) + if err != nil { + return fmt.Errorf("could not delete service '%s': '%v'", serviceName, err) + } + c.Endpoint = nil + svc, err := c.KubeClient.Services(newService.Namespace).Create(newService) + if err != nil { + return fmt.Errorf("could not create service '%s': '%v'", serviceName, err) + } + c.Service[role] = svc + if role == Master { + // create the new endpoint using the addresses obtained from the previous one + endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) + ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) + if err != nil { + return fmt.Errorf("could not create endpoint '%s': '%v'", endpointName, err) + } + c.Endpoint = ep + } + return nil + } if len(newService.ObjectMeta.Annotations) > 0 { - annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations) _, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( @@ -300,7 +338,7 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { if c.Endpoint != nil { return nil, fmt.Errorf("endpoint already exists in the cluster") } - endpointsSpec := c.genMasterEndpoints() + endpointsSpec := c.generateMasterEndpoints(nil) endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec) if err != nil { @@ -327,7 +365,7 @@ func (c *Cluster) deleteEndpoint() error { } func (c *Cluster) applySecrets() error { - secrets := c.genUserSecrets() + secrets := c.generateUserSecrets() for secretUsername, secretSpec := range secrets { secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 7dd2b9ee4..625eb34f5 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -100,7 +100,7 @@ func (c *Cluster) syncService(role PostgresRole) error { return nil } - desiredSvc := c.genService(role, cSpec.AllowedSourceRanges) + desiredSvc := c.generateService(role, &cSpec) match, reason := c.sameServiceWith(role, desiredSvc) if match { return nil @@ -158,7 +158,7 @@ func (c *Cluster) syncStatefulSet() error { } /* TODO: should check that we need to replace the statefulset */ if !rollUpdate { - desiredSS, err := c.genStatefulSet(cSpec) + desiredSS, err := c.generateStatefulSet(cSpec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 81b715801..31d46cc42 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -70,8 +70,8 @@ func metadataAnnotationsPatch(annotations map[string]string) string { annotationsList = append(annotationsList, fmt.Sprintf(`"%s":"%s"`, name, value)) } annotationsString := strings.Join(annotationsList, ",") - // TODO: perhaps use patchStrategy:"replace" json annotation instead of constructing the patch literally. - return fmt.Sprintf(constants.ServiceMetadataAnnotationFormat, annotationsString) + // TODO: perhaps use patchStrategy:action json annotation instead of constructing the patch literally. + return fmt.Sprintf(constants.ServiceMetadataAnnotationReplaceFormat, annotationsString) } func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index 172fe06f8..3649eff50 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -86,8 +86,10 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` - TeamID string `json:"teamId"` - AllowedSourceRanges []string `json:"allowedSourceRanges"` + TeamID string `json:"teamId"` + AllowedSourceRanges []string `json:"allowedSourceRanges"` + // EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omited from the manifest + UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"` ReplicaLoadBalancer bool `json:"replicaLoadBalancer,omitempty"` NumberOfInstances int32 `json:"numberOfInstances"` Users map[string]userFlags `json:"users"` diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 822395ce9..398003841 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -3,6 +3,7 @@ package spec import ( "fmt" "strings" + "database/sql" "k8s.io/client-go/pkg/api/v1" diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 7676e3e6a..cd038e9a8 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -53,6 +53,7 @@ type Config struct { DebugLogging bool `name:"debug_logging" default:"true"` EnableDBAccess bool `name:"enable_database_access" default:"true"` EnableTeamsAPI bool `name:"enable_teams_api" default:"true"` + EnableLoadBalancer bool `name:"enable_load_balancer" default:"true"` MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` Workers uint32 `name:"workers" default:"4"` diff --git a/pkg/util/constants/annotations.go b/pkg/util/constants/annotations.go index 3b276cc3d..48e41cb16 100644 --- a/pkg/util/constants/annotations.go +++ b/pkg/util/constants/annotations.go @@ -2,10 +2,10 @@ package constants // Names and values in Kubernetes annotation for services, statefulsets and volumes const ( - ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" - ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" - ElbTimeoutAnnotationValue = "3600" - KubeIAmAnnotation = "iam.amazonaws.com/role" - VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" - ServiceMetadataAnnotationFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}` + ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" + ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" + ElbTimeoutAnnotationValue = "3600" + KubeIAmAnnotation = "iam.amazonaws.com/role" + VolumeStorateProvisionerAnnotation = "pv.kubernetes.io/provisioned-by" + ServiceMetadataAnnotationReplaceFormat = `{"metadata":{"annotations": {"$patch":"replace", %s}}}` ) From a8ed1e25b4f53453ad6f5c99573b6a48c0e27a8c Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Wed, 12 Jul 2017 10:57:20 +0200 Subject: [PATCH 11/11] Avoid re-creating master pod if it is empty during sync. (#58) Fixes #59 --- pkg/cluster/pod.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 49497d929..52ecdfffe 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -140,14 +140,14 @@ func (c *Cluster) recreatePods() error { } if masterPod.Name == "" { c.logger.Warningln("No master pod in the cluster") - } + } else { + //TODO: do manual failover + //TODO: specify master, leave new master empty + c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) - //TODO: do manual failover - //TODO: specify master, leave new master empty - c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) - - if err := c.recreatePod(masterPod); err != nil { - return fmt.Errorf("could not recreate master pod '%s': %v", util.NameFromMeta(masterPod.ObjectMeta), err) + if err := c.recreatePod(masterPod); err != nil { + return fmt.Errorf("could not recreate master pod '%s': %v", util.NameFromMeta(masterPod.ObjectMeta), err) + } } return nil