From 86803406db39c5ced5dcf5063fb6604d132911f8 Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Fri, 3 Nov 2017 12:00:43 +0100 Subject: [PATCH] use sync methods while updating the cluster --- pkg/cluster/cluster.go | 248 ++++++++++++++----------------- pkg/cluster/k8sres.go | 35 +++-- pkg/cluster/resources.go | 214 +++++++++++---------------- pkg/cluster/sync.go | 275 +++++++++++++++++++++++++---------- pkg/cluster/util.go | 5 +- pkg/cluster/volumes.go | 8 +- pkg/controller/postgresql.go | 5 +- pkg/spec/types.go | 3 +- pkg/util/k8sutil/k8sutil.go | 45 +++++- 9 files changed, 456 insertions(+), 382 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 3f7b38007..e7160ff01 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -28,7 +28,6 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/patroni" "github.com/zalando-incubator/postgres-operator/pkg/util/teams" "github.com/zalando-incubator/postgres-operator/pkg/util/users" - "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" ) var ( @@ -46,7 +45,7 @@ type Config struct { type kubeResources struct { Services map[PostgresRole]*v1.Service - Endpoint *v1.Endpoints + Endpoints map[PostgresRole]*v1.Endpoints Secrets map[types.UID]*v1.Secret Statefulset *v1beta1.StatefulSet PodDisruptionBudget *policybeta1.PodDisruptionBudget @@ -99,12 +98,15 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql }) cluster := &Cluster{ - Config: cfg, - Postgresql: pgSpec, - pgUsers: make(map[string]spec.PgUser), - systemUsers: make(map[string]spec.PgUser), - podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), - kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service)}, + Config: cfg, + Postgresql: pgSpec, + pgUsers: make(map[string]spec.PgUser), + systemUsers: make(map[string]spec.PgUser), + podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), + kubeResources: kubeResources{ + Secrets: make(map[types.UID]*v1.Secret), + Services: make(map[PostgresRole]*v1.Service), + Endpoints: make(map[PostgresRole]*v1.Endpoints)}, masterLess: false, userSyncStrategy: users.DefaultUserSyncStrategy{}, deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, @@ -203,17 +205,16 @@ func (c *Cluster) Create() error { c.setStatus(spec.ClusterStatusCreating) - //service will create endpoint implicitly - ep, err = c.createEndpoint() - if err != nil { - return fmt.Errorf("could not create endpoint: %v", err) - } - c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) - for _, role := range []PostgresRole{Master, Replica} { if role == Replica && !c.Spec.ReplicaLoadBalancer { continue } + ep, err = c.createEndpoint(role) + if err != nil { + return fmt.Errorf("could not create %s endpoint: %v", role, err) + } + c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) + service, err = c.createService(role) if err != nil { return fmt.Errorf("could not create %s service: %v", role, err) @@ -226,7 +227,7 @@ func (c *Cluster) Create() error { } c.logger.Infof("users have been initialized") - if err = c.applySecrets(); err != nil { + if err = c.syncSecrets(); err != nil { return fmt.Errorf("could not create secrets: %v", err) } c.logger.Infof("secrets have been successfully created") @@ -257,8 +258,8 @@ func (c *Cluster) Create() error { } c.logger.Infof("users have been successfully created") - if err = c.createDatabases(); err != nil { - return fmt.Errorf("could not create databases: %v", err) + if err = c.syncDatabases(); err != nil { + return fmt.Errorf("could not sync databases: %v", err) } c.logger.Infof("databases have been successfully created") } else { @@ -267,48 +268,13 @@ func (c *Cluster) Create() error { } } - err = c.listResources() - if err != nil { + if err := c.listResources(); err != nil { c.logger.Errorf("could not list resources: %v", err) } return nil } -func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match bool, reason string) { - //TODO: improve comparison - if c.Services[role].Spec.Type != service.Spec.Type { - return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q", - role, service.Spec.Type, c.Services[role].Spec.Type) - } - oldSourceRanges := c.Services[role].Spec.LoadBalancerSourceRanges - newSourceRanges := service.Spec.LoadBalancerSourceRanges - /* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */ - if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) { - return true, "" - } - if !reflect.DeepEqual(oldSourceRanges, newSourceRanges) { - return false, fmt.Sprintf("new %s service's LoadBalancerSourceRange doesn't match the current one", role) - } - - oldDNSAnnotation := c.Services[role].Annotations[constants.ZalandoDNSNameAnnotation] - newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation] - if oldDNSAnnotation != newDNSAnnotation { - return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) - } - - return true, "" -} - -func (c *Cluster) sameVolumeWith(volume spec.Volume) (match bool, reason string) { - if !reflect.DeepEqual(c.Spec.Volume, volume) { - reason = "new volume's specification doesn't match the current one" - } else { - match = true - } - return -} - func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *compareStatefulsetResult { reasons := make([]string, 0) var match, needsRollUpdate, needsReplace bool @@ -406,6 +372,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp if needsRollUpdate || needsReplace { match = false } + return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace} } @@ -417,12 +384,13 @@ func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) (e if equal && (b != nil) { equal = compareResoucesAssumeFirstNotNil(b, a) } + return } func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.ResourceRequirements) bool { if b == nil || (len(b.Requests) == 0) { - return (len(a.Requests) == 0) + return len(a.Requests) == 0 } for k, v := range a.Requests { if (&v).Cmp(b.Requests[k]) != 0 { @@ -440,108 +408,108 @@ func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resource // Update changes Kubernetes objects according to the new specification. Unlike the sync case, the missing object. // (i.e. service) is treated as an error. -func (c *Cluster) Update(newSpec *spec.Postgresql) error { +func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error { + updateFailed := false + c.mu.Lock() defer c.mu.Unlock() c.setStatus(spec.ClusterStatusUpdating) + c.Postgresql = *newSpec - /* Make sure we update when this function exits */ defer func() { - c.Postgresql = *newSpec + if updateFailed { + c.setStatus(spec.ClusterStatusUpdateFailed) + } else if c.Status != spec.ClusterStatusRunning { + c.setStatus(spec.ClusterStatusRunning) + } }() - for _, role := range []PostgresRole{Master, Replica} { - if role == Replica { - if !newSpec.Spec.ReplicaLoadBalancer { - // old spec had a load balancer, but the new one doesn't - if c.Spec.ReplicaLoadBalancer { - err := c.deleteService(role) - if err != nil { - return fmt.Errorf("could not delete obsolete %s service: %v", role, err) - } - c.logger.Infof("deleted obsolete %s service", role) - } - } else { - if !c.Spec.ReplicaLoadBalancer { - // old spec didn't have a load balancer, but the one does - service, err := c.createService(role) - if err != nil { - return fmt.Errorf("could not create new %s service: %v", role, err) - } - c.logger.Infof("%s service %q has been created", role, util.NameFromMeta(service.ObjectMeta)) - } - } - // only proceed further if both old and new load balancer were present - if !(newSpec.Spec.ReplicaLoadBalancer && c.Spec.ReplicaLoadBalancer) { - continue - } - } - newService := c.generateService(role, &newSpec.Spec) - if match, reason := c.sameServiceWith(role, newService); !match { - c.logServiceChanges(role, c.Services[role], newService, true, reason) - if err := c.updateService(role, newService); err != nil { - c.setStatus(spec.ClusterStatusUpdateFailed) - return fmt.Errorf("could not update %s service: %v", role, err) - } - c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Services[role].ObjectMeta)) + if oldSpec.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison + c.logger.Warningf("postgresql version change(%q -> %q) has no effect", oldSpec.Spec.PgVersion, newSpec.Spec.PgVersion) + //we need that hack to generate statefulset with the old version + newSpec.Spec.PgVersion = oldSpec.Spec.PgVersion + } + + // Service + if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) || + !reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) || + oldSpec.Spec.ReplicaLoadBalancer != newSpec.Spec.ReplicaLoadBalancer { + c.logger.Debugf("syncing services") + if err := c.syncServices(); err != nil { + c.logger.Errorf("could not sync services: %v", err) + updateFailed = true } } - newStatefulSet, err := c.generateStatefulSet(newSpec.Spec) - if err != nil { - return fmt.Errorf("could not generate statefulset: %v", err) - } - cmp := c.compareStatefulSetWith(newStatefulSet) + if !reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) { + c.logger.Debugf("syncing secrets") + if err := c.initUsers(); err != nil { + c.logger.Errorf("could not init users: %v", err) + updateFailed = true + } - if !cmp.match { - c.logStatefulSetChanges(c.Statefulset, newStatefulSet, true, cmp.reasons) - //TODO: mind the case of updating allowedSourceRanges - if !cmp.replace { - if err := c.updateStatefulSet(newStatefulSet); err != nil { - c.setStatus(spec.ClusterStatusUpdateFailed) - return fmt.Errorf("could not upate statefulset: %v", err) - } - } else { - if err := c.replaceStatefulSet(newStatefulSet); err != nil { - c.setStatus(spec.ClusterStatusUpdateFailed) - return fmt.Errorf("could not replace statefulset: %v", err) + c.logger.Debugf("syncing secrets") + + //TODO: mind the secrets of the deleted/new users + if err := c.syncSecrets(); err != nil { + c.logger.Errorf("could not sync secrets: %v", err) + updateFailed = true + } + + if !c.databaseAccessDisabled() { + c.logger.Debugf("syncing roles") + if err := c.syncRoles(true); err != nil { + c.logger.Errorf("could not sync roles: %v", err) + updateFailed = true } } - //TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted - c.logger.Infof("statefulset %q has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) } - if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison - c.logger.Warningf("postgresql version change(%q -> %q) is not allowed", - c.Spec.PgVersion, newSpec.Spec.PgVersion) - //TODO: rewrite pg version in tpr spec - } + // Volume + if oldSpec.Spec.Size != newSpec.Spec.Size { + c.logger.Debugf("syncing persistent volumes") + c.logVolumeChanges(oldSpec.Spec.Volume, newSpec.Spec.Volume) - if cmp.rollingUpdate { - c.logger.Infof("rolling update is needed") - // TODO: wait for actual streaming to the replica - if err := c.recreatePods(); err != nil { - c.setStatus(spec.ClusterStatusUpdateFailed) - return fmt.Errorf("could not recreate pods: %v", err) + if err := c.syncVolumes(); err != nil { + c.logger.Errorf("could not sync persistent volumes: %v", err) + updateFailed = true } - c.logger.Infof("rolling update has been finished") } - if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match { - c.logVolumeChanges(c.Spec.Volume, newSpec.Spec.Volume, reason) - if err := c.resizeVolumes(newSpec.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { - return fmt.Errorf("could not update volumes: %v", err) + // Statefulset + func() { + oldSs, err := c.generateStatefulSet(&oldSpec.Spec) + if err != nil { + c.logger.Errorf("could not generate old statefulset spec") + updateFailed = true + return } - c.logger.Infof("volumes have been updated successfully") - } - if err := c.syncPodDisruptionBudget(true); err != nil { - c.setStatus(spec.ClusterStatusUpdateFailed) - return fmt.Errorf("could not update pod disruption budget: %v", err) - } + newSs, err := c.generateStatefulSet(&newSpec.Spec) + if err != nil { + c.logger.Errorf("could not generate new statefulset spec") + updateFailed = true + return + } - c.setStatus(spec.ClusterStatusRunning) + if !reflect.DeepEqual(oldSs, newSs) { + c.logger.Debugf("syncing statefulsets") + if err := c.syncStatefulSet(); err != nil { + c.logger.Errorf("could not sync statefulsets: %v", err) + updateFailed = true + } + } + }() + + // Databases + if !reflect.DeepEqual(oldSpec.Spec.Databases, newSpec.Spec.Databases) { + c.logger.Infof("syncing databases") + if err := c.syncDatabases(); err != nil { + c.logger.Errorf("could not sync databases: %v", err) + updateFailed = true + } + } return nil } @@ -551,14 +519,15 @@ func (c *Cluster) Delete() error { c.mu.Lock() defer c.mu.Unlock() - if err := c.deleteEndpoint(); err != nil { - return fmt.Errorf("could not delete endpoint: %v", err) - } - for _, role := range []PostgresRole{Master, Replica} { if role == Replica && !c.Spec.ReplicaLoadBalancer { continue } + + if err := c.deleteEndpoint(role); err != nil { + return fmt.Errorf("could not delete %s endpoint: %v", role, err) + } + if err := c.deleteService(role); err != nil { return fmt.Errorf("could not delete %s service: %v", role, err) } @@ -715,7 +684,8 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus { MasterService: c.GetServiceMaster(), ReplicaService: c.GetServiceReplica(), - Endpoint: c.GetEndpoint(), + MasterEndpoint: c.GetEndpointMaster(), + ReplicaEndpoint: c.GetEndpointReplica(), StatefulSet: c.GetStatefulSet(), PodDisruptionBudget: c.GetPodDisruptionBudget(), CurrentProcess: c.GetCurrentProcess(), diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 1ed94cb2d..d0337c64b 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -55,8 +55,13 @@ func (c *Cluster) statefulSetName() string { return c.Name } -func (c *Cluster) endpointName() string { - return c.Name +func (c *Cluster) endpointName(role PostgresRole) string { + name := c.Name + if role == Replica { + name = name + "-repl" + } + + return name } func (c *Cluster) serviceName(role PostgresRole) string { @@ -149,7 +154,7 @@ func (c *Cluster) generateSpiloJSONConfiguration(pg *spec.PostgresqlParam, patro // maps and normal string items in the array of initdb options. We need // both to convert the initial key-value to strings when necessary, and // to de-duplicate the options supplied. -PATRONI_INITDB_PARAMS: +PatroniInitDBParams: for _, k := range initdbOptionNames { v := patroni.InitDB[k] for i, defaultParam := range config.Bootstrap.Initdb { @@ -159,7 +164,7 @@ PATRONI_INITDB_PARAMS: for k1 := range defaultParam.(map[string]string) { if k1 == k { (config.Bootstrap.Initdb[i]).(map[string]string)[k] = v - continue PATRONI_INITDB_PARAMS + continue PatroniInitDBParams } } } @@ -167,12 +172,12 @@ PATRONI_INITDB_PARAMS: { /* if the option already occurs in the list */ if defaultParam.(string) == v { - continue PATRONI_INITDB_PARAMS + continue PatroniInitDBParams } } default: c.logger.Warningf("unsupported type for initdb configuration item %s: %T", defaultParam, defaultParam) - continue PATRONI_INITDB_PARAMS + continue PatroniInitDBParams } } // The following options are known to have no parameters @@ -356,7 +361,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme if cloneDescription.ClusterName != "" { envVars = append(envVars, c.generateCloneEnvironment(cloneDescription)...) } - privilegedMode := bool(true) + privilegedMode := true container := v1.Container{ Name: c.containerName(), Image: c.OpConfig.DockerImage, @@ -411,7 +416,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme return &template } -func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) { +func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.StatefulSet, error) { resourceRequirements, err := c.resourceRequirements(spec.Resources) if err != nil { return nil, fmt.Errorf("could not generate resource requirements: %v", err) @@ -513,7 +518,7 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) return &secret } -func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service { +func (c *Cluster) generateService(role PostgresRole, spec *spec.PostgresSpec) *v1.Service { var dnsName string if role == Master { @@ -534,12 +539,12 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) var annotations map[string]string // Examine the per-cluster load balancer setting, if it is not defined - check the operator configuration. - if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) || - (newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) { + if (spec.UseLoadBalancer != nil && *spec.UseLoadBalancer) || + (spec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) { // safe default value: lock load balancer to only local address unless overridden explicitly. sourceRanges := []string{localHost} - allowedSourceRanges := newSpec.AllowedSourceRanges + allowedSourceRanges := spec.AllowedSourceRanges if len(allowedSourceRanges) >= 0 { sourceRanges = allowedSourceRanges } @@ -566,12 +571,12 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) return service } -func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints { +func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints { endpoints := &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: c.endpointName(), + Name: c.endpointName(role), Namespace: c.Namespace, - Labels: c.roleLabelsSet(Master), + Labels: c.roleLabelsSet(role), }, } if len(subsets) > 0 { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 273bd90d0..79c59bb47 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -11,67 +11,12 @@ import ( "k8s.io/client-go/pkg/apis/apps/v1beta1" policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" - "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) -func (c *Cluster) loadResources() error { - var err error - ns := c.Namespace - - masterService, err := c.KubeClient.Services(ns).Get(c.serviceName(Master), metav1.GetOptions{}) - if err == nil { - c.Services[Master] = masterService - } else if !k8sutil.ResourceNotFound(err) { - c.logger.Errorf("could not get master service: %v", err) - } - - replicaService, err := c.KubeClient.Services(ns).Get(c.serviceName(Replica), metav1.GetOptions{}) - if err == nil { - c.Services[Replica] = replicaService - } else if !k8sutil.ResourceNotFound(err) { - c.logger.Errorf("could not get replica service: %v", err) - } - - ep, err := c.KubeClient.Endpoints(ns).Get(c.endpointName(), metav1.GetOptions{}) - if err == nil { - c.Endpoint = ep - } else if !k8sutil.ResourceNotFound(err) { - c.logger.Errorf("could not get endpoint: %v", err) - } - - secrets, err := c.KubeClient.Secrets(ns).List(metav1.ListOptions{LabelSelector: c.labelsSet().String()}) - if err != nil { - c.logger.Errorf("could not get list of secrets: %v", err) - } - for i, secret := range secrets.Items { - if _, ok := c.Secrets[secret.UID]; ok { - continue - } - c.Secrets[secret.UID] = &secrets.Items[i] - c.logger.Debugf("secret loaded, uid: %q", secret.UID) - } - - ss, err := c.KubeClient.StatefulSets(ns).Get(c.statefulSetName(), metav1.GetOptions{}) - if err == nil { - c.Statefulset = ss - } else if !k8sutil.ResourceNotFound(err) { - c.logger.Errorf("could not get statefulset: %v", err) - } - - pdb, err := c.KubeClient.PodDisruptionBudgets(ns).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}) - if err == nil { - c.PodDisruptionBudget = pdb - } else if !k8sutil.ResourceNotFound(err) { - c.logger.Errorf("could not get pod disruption budget: %v", err) - } - - return nil -} - func (c *Cluster) listResources() error { if c.PodDisruptionBudget != nil { c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) @@ -85,8 +30,8 @@ func (c *Cluster) listResources() error { c.logger.Infof("found secret: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } - if c.Endpoint != nil { - c.logger.Infof("found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) + for role, endpoint := range c.Endpoints { + c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID) } for role, service := range c.Services { @@ -119,7 +64,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { if c.Statefulset != nil { return nil, fmt.Errorf("statefulset already exists in the cluster") } - statefulSetSpec, err := c.generateStatefulSet(c.Spec) + statefulSetSpec, err := c.generateStatefulSet(&c.Spec) if err != nil { return nil, fmt.Errorf("could not generate statefulset: %v", err) } @@ -307,11 +252,13 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error { c.setProcessName("updating %v service", role) + if c.Services[role] == nil { return fmt.Errorf("there is no service in the cluster") } + serviceName := util.NameFromMeta(c.Services[role].ObjectMeta) - endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta) + endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta) // TODO: check if it possible to change the service type with a patch in future versions of Kubernetes if newService.Spec.Type != c.Services[role].Spec.Type { // service type has changed, need to replace the service completely. @@ -324,38 +271,42 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error if role == Master { // for the master service we need to re-create the endpoint as well. Get the up-to-date version of // the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) - currentEndpoint, err = c.KubeClient.Endpoints(c.Services[role].Namespace).Get(c.Services[role].Name, metav1.GetOptions{}) + currentEndpoint, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}) if err != nil { - return fmt.Errorf("could not get current cluster endpoints: %v", err) + return fmt.Errorf("could not get current cluster %s endpoints: %v", role, err) } } - err = c.KubeClient.Services(c.Services[role].Namespace).Delete(c.Services[role].Name, c.deleteOptions) + err = c.KubeClient.Services(serviceName.Namespace).Delete(serviceName.Name, c.deleteOptions) if err != nil { return fmt.Errorf("could not delete service %q: %v", serviceName, err) } - c.Endpoint = nil - svc, err := c.KubeClient.Services(newService.Namespace).Create(newService) + + c.Endpoints[role] = nil + svc, err := c.KubeClient.Services(serviceName.Namespace).Create(newService) if err != nil { return fmt.Errorf("could not create service %q: %v", serviceName, err) } + c.Services[role] = svc if role == Master { // create the new endpoint using the addresses obtained from the previous one - endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) - ep, err := c.KubeClient.Endpoints(c.Services[role].Namespace).Create(endpointSpec) + endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets) + ep, err := c.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec) if err != nil { return fmt.Errorf("could not create endpoint %q: %v", endpointName, err) } - c.Endpoint = ep + + c.Endpoints[role] = ep } + return nil } if len(newService.ObjectMeta.Annotations) > 0 { annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations) - _, err := c.KubeClient.Services(c.Services[role].Namespace).Patch( - c.Services[role].Name, + _, err := c.KubeClient.Services(serviceName.Namespace).Patch( + serviceName.Name, types.StrategicMergePatchType, []byte(annotationsPatchData), "") @@ -369,8 +320,8 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) } - svc, err := c.KubeClient.Services(c.Services[role].Namespace).Patch( - c.Services[role].Name, + svc, err := c.KubeClient.Services(serviceName.Namespace).Patch( + serviceName.Name, types.MergePatchType, patchData, "") if err != nil { @@ -383,31 +334,36 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error func (c *Cluster) deleteService(role PostgresRole) error { c.logger.Debugf("deleting service %s", role) - if c.Services[role] == nil { - return fmt.Errorf("there is no %s service in the cluster", role) - } + service := c.Services[role] - err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions) - if err != nil { + + if err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions); err != nil { return err } + c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) c.Services[role] = nil + return nil } -func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { +func (c *Cluster) createEndpoint(role PostgresRole) (*v1.Endpoints, error) { c.setProcessName("creating endpoint") - if c.Endpoint != nil { - return nil, fmt.Errorf("endpoint already exists in the cluster") + if c.Endpoints[role] != nil { + return nil, fmt.Errorf("%s endpoint already exists in the cluster", role) } - endpointsSpec := c.generateMasterEndpoints(nil) + subsets := make([]v1.EndpointSubset, 0) + if role == Master { + //TODO: set subsets to the master + } + endpointsSpec := c.generateEndpoint(role, subsets) endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec) if err != nil { - return nil, err + return nil, fmt.Errorf("could not create %s endpoint: %v", role, err) } - c.Endpoint = endpoints + + c.Endpoints[role] = endpoints return endpoints, nil } @@ -430,13 +386,19 @@ func (c *Cluster) createPodDisruptionBudget() (*policybeta1.PodDisruptionBudget, } func (c *Cluster) updatePodDisruptionBudget(pdb *policybeta1.PodDisruptionBudget) error { - if c.podEventsQueue == nil { + if c.PodDisruptionBudget == nil { return fmt.Errorf("there is no pod disruption budget in the cluster") } - newPdb, err := c.KubeClient.PodDisruptionBudgets(pdb.Namespace).Update(pdb) + if err := c.deletePodDisruptionBudget(); err != nil { + return fmt.Errorf("could not delete pod disruption budget: %v", err) + } + + newPdb, err := c.KubeClient. + PodDisruptionBudgets(pdb.Namespace). + Create(pdb) if err != nil { - return fmt.Errorf("could not update pod disruption budget: %v", err) + return fmt.Errorf("could not create pod disruption budget: %v", err) } c.PodDisruptionBudget = newPdb @@ -448,69 +410,50 @@ func (c *Cluster) deletePodDisruptionBudget() error { if c.PodDisruptionBudget == nil { return fmt.Errorf("there is no pod disruption budget in the cluster") } + + pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta) err := c.KubeClient. PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). - Delete(c.PodDisruptionBudget.Namespace, c.deleteOptions) + Delete(c.PodDisruptionBudget.Name, c.deleteOptions) if err != nil { return fmt.Errorf("could not delete pod disruption budget: %v", err) } c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) c.PodDisruptionBudget = nil + err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + func() (bool, error) { + _, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(pdbName.Name, metav1.GetOptions{}) + if err2 == nil { + return false, nil + } + if k8sutil.ResourceNotFound(err2) { + return true, nil + } else { + return false, err2 + } + }) + if err != nil { + return fmt.Errorf("could not delete pod disruption budget: %v", err) + } + return nil } -func (c *Cluster) deleteEndpoint() error { +func (c *Cluster) deleteEndpoint(role PostgresRole) error { c.setProcessName("deleting endpoint") c.logger.Debugln("deleting endpoint") - if c.Endpoint == nil { - return fmt.Errorf("there is no endpoint in the cluster") + if c.Endpoints[role] == nil { + return fmt.Errorf("there is no %s endpoint in the cluster", role) } - err := c.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, c.deleteOptions) - if err != nil { + + if err := c.KubeClient.Endpoints(c.Endpoints[role].Namespace).Delete(c.Endpoints[role].Name, c.deleteOptions); err != nil { return fmt.Errorf("could not delete endpoint: %v", err) } - c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) - c.Endpoint = nil - return nil -} + c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoints[role].ObjectMeta)) -func (c *Cluster) applySecrets() error { - c.setProcessName("applying secrets") - secrets := c.generateUserSecrets() - - for secretUsername, secretSpec := range secrets { - secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec) - if k8sutil.ResourceAlreadyExists(err) { - var userMap map[string]spec.PgUser - curSecret, err2 := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}) - if err2 != nil { - return fmt.Errorf("could not get current secret: %v", err2) - } - c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) - if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name { - secretUsername = constants.SuperuserKeyName - userMap = c.systemUsers - } else if secretUsername == c.systemUsers[constants.ReplicationUserKeyName].Name { - secretUsername = constants.ReplicationUserKeyName - userMap = c.systemUsers - } else { - userMap = c.pgUsers - } - pwdUser := userMap[secretUsername] - pwdUser.Password = string(curSecret.Data["password"]) - userMap[secretUsername] = pwdUser - - continue - } else { - if err != nil { - return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) - } - c.Secrets[secret.UID] = secret - c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) - } - } + c.Endpoints[role] = nil return nil } @@ -543,9 +486,14 @@ func (c *Cluster) GetServiceReplica() *v1.Service { return c.Services[Replica] } -// GetEndpoint returns cluster's kubernetes Endpoint -func (c *Cluster) GetEndpoint() *v1.Endpoints { - return c.Endpoint +// GetEndpointMaster returns cluster's kubernetes master Endpoint +func (c *Cluster) GetEndpointMaster() *v1.Endpoints { + return c.Endpoints[Master] +} + +// GetEndpointReplica returns cluster's kubernetes master Endpoint +func (c *Cluster) GetEndpointReplica() *v1.Endpoints { + return c.Endpoints[Replica] } // GetStatefulSet returns cluster's kubernetes StatefulSet diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 6a9b681bc..1d4649917 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -4,10 +4,12 @@ import ( "fmt" "reflect" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" + "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" ) @@ -20,11 +22,6 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { c.Postgresql = *newSpec - err = c.loadResources() - if err != nil { - c.logger.Errorf("could not load resources: %v", err) - } - defer func() { if err != nil { c.setStatus(spec.ClusterStatusSyncFailed) @@ -41,39 +38,15 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { c.logger.Debugf("syncing secrets") //TODO: mind the secrets of the deleted/new users - if err = c.applySecrets(); err != nil { - if !k8sutil.ResourceAlreadyExists(err) { - err = fmt.Errorf("could not sync secrets: %v", err) - return - } - } - - c.logger.Debugf("syncing endpoints") - if err = c.syncEndpoint(); err != nil { - if !k8sutil.ResourceAlreadyExists(err) { - err = fmt.Errorf("could not sync endpoints: %v", err) - return - } + if err = c.syncSecrets(); err != nil { + err = fmt.Errorf("could not sync secrets: %v", err) + return } c.logger.Debugf("syncing services") - for _, role := range []PostgresRole{Master, Replica} { - if role == Replica && !c.Spec.ReplicaLoadBalancer { - if c.Services[role] != nil { - // delete the left over replica service - if err = c.deleteService(role); err != nil { - err = fmt.Errorf("could not delete obsolete %s service: %v", role, err) - return - } - } - continue - } - if err = c.syncService(role); err != nil { - if !k8sutil.ResourceAlreadyExists(err) { - err = fmt.Errorf("coud not sync %s service: %v", role, err) - return - } - } + if err = c.syncServices(); err != nil { + err = fmt.Errorf("could not sync services: %v", err) + return } c.logger.Debugf("syncing statefulsets") @@ -112,73 +85,170 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { return } -func (c *Cluster) syncService(role PostgresRole) error { - cSpec := c.Spec - if c.Services[role] == nil { - c.logger.Infof("could not find the cluster's %s service", role) - svc, err := c.createService(role) - if err != nil { - return fmt.Errorf("could not create missing %s service: %v", role, err) +func (c *Cluster) syncServices() error { + for _, role := range []PostgresRole{Master, Replica} { + c.logger.Debugf("syncing %s service", role) + + if err := c.syncEndpoint(role); err != nil { + return fmt.Errorf("could not sync %s endpont: %v", role, err) } - c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) - return nil + if err := c.syncService(role); err != nil { + return fmt.Errorf("could not sync %s service: %v", role, err) + } } - desiredSvc := c.generateService(role, &cSpec) - match, reason := c.sameServiceWith(role, desiredSvc) - if match { - return nil - } - c.logServiceChanges(role, c.Services[role], desiredSvc, false, reason) - - if err := c.updateService(role, desiredSvc); err != nil { - return fmt.Errorf("could not update %s service to match desired state: %v", role, err) - } - c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) - return nil } -func (c *Cluster) syncEndpoint() error { - if c.Endpoint == nil { - c.logger.Infof("could not find the cluster's endpoint") - ep, err := c.createEndpoint() - if err != nil { - return fmt.Errorf("could not create missing endpoint: %v", err) +func (c *Cluster) syncService(role PostgresRole) error { + c.setProcessName("syncing %s service", role) + + svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}) + if err == nil { + if role == Replica && !c.Spec.ReplicaLoadBalancer { + if err := c.deleteService(role); err != nil { + return fmt.Errorf("could not delete %s service", role) + } } - c.logger.Infof("created missing endpoint %q", util.NameFromMeta(ep.ObjectMeta)) + + desiredSvc := c.generateService(role, &c.Spec) + match, reason := k8sutil.SameService(svc, desiredSvc) + if match { + c.Services[role] = svc + return nil + } + c.logServiceChanges(role, svc, desiredSvc, false, reason) + + if err := c.updateService(role, desiredSvc); err != nil { + return fmt.Errorf("could not update %s service to match desired state: %v", role, err) + } + c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) + return nil + } else if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get %s service: %v", role, err) + } + c.Services[role] = nil + + // Service does not exist + if role == Replica && !c.Spec.ReplicaLoadBalancer { + return nil + } + + c.logger.Infof("could not find the cluster's %s service", role) + + if svc, err := c.createService(role); err != nil { + if k8sutil.ResourceAlreadyExists(err) { + c.logger.Infof("%s service %q already exists", role, util.NameFromMeta(svc.ObjectMeta)) + svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}) + if err == nil { + c.Services[role] = svc + } else { + c.logger.Infof("could not fetch existing %s service: %v", role, err) + } + } else { + return fmt.Errorf("could not create missing %s service: %v", role, err) + } + } else { + c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) + c.Services[role] = svc + } + + return nil +} + +func (c *Cluster) syncEndpoint(role PostgresRole) error { + c.setProcessName("syncing %s endpoint", role) + + ep, err := c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}) + if err == nil { + if role == Replica && !c.Spec.ReplicaLoadBalancer { + if err := c.deleteEndpoint(role); err != nil { + return fmt.Errorf("could not delete %s endpoint", role) + } + } + + c.Endpoints[role] = ep + return nil + } else if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get %s endpoint: %v", role, err) + } + c.Endpoints[role] = nil + + if role == Replica && !c.Spec.ReplicaLoadBalancer { + return nil + } + + c.logger.Infof("could not find the cluster's %s endpoint", role) + + if ep, err := c.createEndpoint(role); err != nil { + if k8sutil.ResourceAlreadyExists(err) { + c.logger.Infof("%s endpoint %q already exists", role, util.NameFromMeta(ep.ObjectMeta)) + ep, err := c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}) + if err == nil { + c.Endpoints[role] = ep + } else { + c.logger.Infof("could not fetch existing %s endpoint: %v", role, err) + } + } else { + return fmt.Errorf("could not create missing %s endpoint: %v", role, err) + } + } else { + c.logger.Infof("created missing %s endpoint %q", role, util.NameFromMeta(ep.ObjectMeta)) + c.Endpoints[role] = ep } return nil } func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { - if c.PodDisruptionBudget == nil { - c.logger.Infof("could not find the cluster's pod disruption budget") - pdb, err := c.createPodDisruptionBudget() - if err != nil { - return fmt.Errorf("could not create pod disruption budget: %v", err) - } - c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) - return nil - } else { + pdb, err := c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}) + if err == nil { + c.PodDisruptionBudget = pdb newPDB := c.generatePodDisruptionBudget() - if match, reason := c.samePDBWith(newPDB); !match { - c.logPDBChanges(c.PodDisruptionBudget, newPDB, isUpdate, reason) + if match, reason := k8sutil.SamePDB(pdb, newPDB); !match { + c.logPDBChanges(pdb, newPDB, isUpdate, reason) if err := c.updatePodDisruptionBudget(newPDB); err != nil { return err } + } else { + c.PodDisruptionBudget = pdb } + + return nil + } else if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get pod disruption budget: %v", err) } + c.PodDisruptionBudget = nil + + c.logger.Infof("could not find the cluster's pod disruption budget") + if pdb, err = c.createPodDisruptionBudget(); err != nil { + if k8sutil.ResourceAlreadyExists(err) { + c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) + } else { + return fmt.Errorf("could not create pod disruption budget: %v", err) + } + } else { + c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) + c.PodDisruptionBudget = pdb + } + return nil } func (c *Cluster) syncStatefulSet() error { - cSpec := c.Spec - var rollUpdate bool - if c.Statefulset == nil { + var ( + err error + rollUpdate bool + ) + c.Statefulset, err = c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{}) + + if err != nil && !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get statefulset: %v", err) + } + + if err != nil && k8sutil.ResourceNotFound(err) { c.logger.Infof("could not find the cluster's statefulset") pods, err := c.listPods() if err != nil { @@ -189,22 +259,25 @@ func (c *Cluster) syncStatefulSet() error { c.logger.Infof("found pods without the statefulset: trigger rolling update") rollUpdate = true } + ss, err := c.createStatefulSet() if err != nil { return fmt.Errorf("could not create missing statefulset: %v", err) } - err = c.waitStatefulsetPodsReady() - if err != nil { + + if err = c.waitStatefulsetPodsReady(); err != nil { return fmt.Errorf("cluster is not ready: %v", err) } + c.logger.Infof("created missing statefulset %q", util.NameFromMeta(ss.ObjectMeta)) if !rollUpdate { return nil } } + /* TODO: should check that we need to replace the statefulset */ if !rollUpdate { - desiredSS, err := c.generateStatefulSet(cSpec) + desiredSS, err := c.generateStatefulSet(&c.Spec) if err != nil { return fmt.Errorf("could not generate statefulset: %v", err) } @@ -239,6 +312,45 @@ func (c *Cluster) syncStatefulSet() error { return nil } +func (c *Cluster) syncSecrets() error { + c.setProcessName("syncing secrets") + secrets := c.generateUserSecrets() + + for secretUsername, secretSpec := range secrets { + secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec) + if k8sutil.ResourceAlreadyExists(err) { + var userMap map[string]spec.PgUser + curSecret, err2 := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}) + if err2 != nil { + return fmt.Errorf("could not get current secret: %v", err2) + } + c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) + if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name { + secretUsername = constants.SuperuserKeyName + userMap = c.systemUsers + } else if secretUsername == c.systemUsers[constants.ReplicationUserKeyName].Name { + secretUsername = constants.ReplicationUserKeyName + userMap = c.systemUsers + } else { + userMap = c.pgUsers + } + pwdUser := userMap[secretUsername] + pwdUser.Password = string(curSecret.Data["password"]) + userMap[secretUsername] = pwdUser + + continue + } else { + if err != nil { + return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) + } + c.Secrets[secret.UID] = secret + c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) + } + } + + return nil +} + func (c *Cluster) syncRoles(readFromDatabase bool) error { c.setProcessName("syncing roles") @@ -268,11 +380,14 @@ func (c *Cluster) syncRoles(readFromDatabase bool) error { if err = c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb); err != nil { return fmt.Errorf("error executing sync statements: %v", err) } + return nil } // syncVolumes reads all persistent volumes and checks that their size matches the one declared in the statefulset. func (c *Cluster) syncVolumes() error { + c.setProcessName("syncing volumes") + act, err := c.volumesNeedResizing(c.Spec.Volume) if err != nil { return fmt.Errorf("could not compare size of the volumes: %v", err) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index a479e3211..3f2f9fee8 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -144,12 +144,9 @@ func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isU } } -func (c *Cluster) logVolumeChanges(old, new spec.Volume, reason string) { +func (c *Cluster) logVolumeChanges(old, new spec.Volume) { c.logger.Infof("volume specification has been changed") c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old, new)) - if reason != "" { - c.logger.Infof("reason: %s", reason) - } } func (c *Cluster) getOAuthToken() (string, error) { diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index 1037fb3f7..976649e7a 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -146,11 +146,11 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume } func (c *Cluster) volumesNeedResizing(newVolume spec.Volume) (bool, error) { - volumes, manifestSize, err := c.listVolumesWithManifestSize(newVolume) + vols, manifestSize, err := c.listVolumesWithManifestSize(newVolume) if err != nil { return false, err } - for _, pv := range volumes { + for _, pv := range vols { currentSize := quantityToGigabyte(pv.Spec.Capacity[v1.ResourceStorage]) if currentSize != manifestSize { return true, nil @@ -165,11 +165,11 @@ func (c *Cluster) listVolumesWithManifestSize(newVolume spec.Volume) ([]*v1.Pers return nil, 0, fmt.Errorf("could not parse volume size from the manifest: %v", err) } manifestSize := quantityToGigabyte(newSize) - volumes, err := c.listPersistentVolumes() + vols, err := c.listPersistentVolumes() if err != nil { return nil, 0, fmt.Errorf("could not list persistent volumes: %v", err) } - return volumes, manifestSize, nil + return vols, manifestSize, nil } // getPodNameFromPersistentVolume returns a pod name that it extracts from the volume claim ref. diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 0b6114f8b..96b2052b5 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -193,7 +193,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { return } c.curWorkerCluster.Store(event.WorkerID, cl) - if err := cl.Update(event.NewSpec); err != nil { + if err := cl.Update(event.OldSpec, event.NewSpec); err != nil { cl.Error = fmt.Errorf("could not update cluster: %v", err) lg.Error(cl.Error) @@ -374,9 +374,6 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { if !ok { c.logger.Errorf("could not cast to postgresql spec") } - if pgOld.ResourceVersion == pgNew.ResourceVersion { - return - } if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { return } diff --git a/pkg/spec/types.go b/pkg/spec/types.go index d49ca7b97..d12bc084f 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -99,7 +99,8 @@ type ClusterStatus struct { Cluster string MasterService *v1.Service ReplicaService *v1.Service - Endpoint *v1.Endpoints + MasterEndpoint *v1.Endpoints + ReplicaEndpoint *v1.Endpoints StatefulSet *v1beta1.StatefulSet PodDisruptionBudget *policyv1beta1.PodDisruptionBudget diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index fbfeb0098..1b4dc9a00 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -2,6 +2,7 @@ package k8sutil import ( "fmt" + "reflect" apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1" @@ -9,10 +10,12 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/kubernetes" - v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" + "k8s.io/client-go/kubernetes/typed/apps/v1beta1" v1core "k8s.io/client-go/kubernetes/typed/core/v1" policyv1beta1 "k8s.io/client-go/kubernetes/typed/policy/v1beta1" "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/v1" + policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -99,5 +102,43 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.CustomResourceDefinitionsGetter = apiextClient.ApiextensionsV1beta1() return kubeClient, nil - +} + +// SameService compares the Services +func SameService(cur, new *v1.Service) (match bool, reason string) { + //TODO: improve comparison + if cur.Spec.Type != new.Spec.Type { + return false, fmt.Sprintf("new service's type %q doesn't match the current one %q", + new.Spec.Type, cur.Spec.Type) + } + + oldSourceRanges := cur.Spec.LoadBalancerSourceRanges + newSourceRanges := new.Spec.LoadBalancerSourceRanges + + /* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */ + if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) { + return true, "" + } + if !reflect.DeepEqual(oldSourceRanges, newSourceRanges) { + return false, "new service's LoadBalancerSourceRange doesn't match the current one" + } + + oldDNSAnnotation := cur.Annotations[constants.ZalandoDNSNameAnnotation] + newDNSAnnotation := new.Annotations[constants.ZalandoDNSNameAnnotation] + if oldDNSAnnotation != newDNSAnnotation { + return false, fmt.Sprintf("new service's %q annotation doesn't match the current one", constants.ZalandoDNSNameAnnotation) + } + + return true, "" +} + +// SamePDB compares the PodDisruptionBudgets +func SamePDB(cur, new *policybeta1.PodDisruptionBudget) (match bool, reason string) { + //TODO: improve comparison + match = reflect.DeepEqual(new.Spec, cur.Spec) + if !match { + reason = "new service spec doesn't match the current one" + } + + return }