From 63bca66187d4e759bccfc671a981b5c5e037e825 Mon Sep 17 00:00:00 2001 From: erthalion <9erthalion6@gmail.com> Date: Wed, 13 Jun 2018 11:55:48 +0200 Subject: [PATCH] Adjust actions usage over the sync code --- pkg/cluster/cluster.go | 29 ++++++++++++++++++----------- pkg/cluster/resources.go | 27 +++++++++++++++++++++++++++ pkg/cluster/sync.go | 8 ++++++-- 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 5a5c3f5b3..7532b13b9 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -239,9 +239,9 @@ func (c *Cluster) Create() error { var ( err error - //service *v1.Service - ep *v1.Endpoints - ss *v1beta1.StatefulSet + ep *v1.Endpoints + ss *v1beta1.StatefulSet + actions []Action ) defer func() { @@ -272,12 +272,13 @@ func (c *Cluster) Create() error { if c.Services[role] != nil { return fmt.Errorf("service already exists in the cluster") } - _, err = c.createService(role) + actions, err = c.createService(role) if err != nil { - return fmt.Errorf("could not calculate actions to create %s service: %v", - role, err) + msg := "could not prepare actions to create %s service: %v" + return fmt.Errorf(msg, role, err) } - //c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) + + return c.applyActions(actions) } if err = c.initUsers(); err != nil { @@ -546,9 +547,13 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error { if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) || !reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) { c.logger.Debugf("syncing services") - if _, err := c.syncServices(); err != nil { - c.logger.Errorf("could not sync services: %v", err) + if actions, err := c.syncServices(); err != nil { + c.logger.Errorf("could prepare actions to sync services: %v", err) updateFailed = true + } else { + if err := c.applyActions(actions); err != nil { + updateFailed = true + } } } @@ -657,8 +662,10 @@ func (c *Cluster) Delete() { c.logger.Warningf("could not delete %s endpoint: %v", role, err) } - if _, err := c.deleteService(role); err != nil { - c.logger.Warningf("could not delete %s service: %v", role, err) + if actions, err := c.deleteService(role); err != nil { + c.logger.Warningf("could prepare actions to delete %s service: %v", role, err) + } else { + c.applyActions(actions) } } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 73edb53b2..9c814f4a8 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -356,11 +356,38 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Ac } serviceName := util.NameFromMeta(c.Services[role].ObjectMeta) + endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta) // TODO: check if it possible to change the service type with a patch in future versions of Kubernetes if newService.Spec.Type != c.Services[role].Spec.Type { // service type has changed, need to replace the service completely. // we cannot use just pach the current service, since it may contain attributes incompatible with the new type. + var ( + currentEndpoint *v1.Endpoints + err error + ) + + if role == Master { + // for the master service we need to re-create the endpoint as well. Get the up-to-date version of + // the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) + currentEndpoint, err = c.KubeClient. + Endpoints(c.Namespace). + Get(c.endpointName(role), metav1.GetOptions{}) + + if err != nil { + return NoActions, fmt.Errorf("could not get current cluster %s endpoints: %v", role, err) + } + + // create the new endpoint using the addresses obtained from the previous one + endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets) + ep, err := c.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec) + if err != nil { + return NoActions, fmt.Errorf("could not create endpoint %q: %v", endpointName, err) + } + + c.Endpoints[role] = ep + } + return []Action{ DeleteService{ ActionData{ diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 404092fb5..327c6d0e6 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -108,7 +108,8 @@ func (c *Cluster) syncServices() (actions []Action, err error) { } if actions, err = c.syncService(role); err != nil { - return NoActions, fmt.Errorf("could not sync %s service: %v", role, err) + msg := "could prepare actions to sync %s service: %v" + return NoActions, fmt.Errorf(msg, role, err) } } @@ -142,10 +143,11 @@ func (c *Cluster) syncService(role PostgresRole) ([]Action, error) { if err == nil { c.Services[role] = svc desiredSvc := c.generateService(role, &c.Spec) - match, _ := k8sutil.SameService(svc, desiredSvc) + match, reason := k8sutil.SameService(svc, desiredSvc) if match { return NoActions, nil } + c.logServiceChanges(role, svc, desiredSvc, false, reason) actions, err := c.updateService(role, desiredSvc) if err != nil { @@ -156,6 +158,8 @@ func (c *Cluster) syncService(role PostgresRole) ([]Action, error) { } else if !k8sutil.ResourceNotFound(err) { return NoActions, fmt.Errorf("could not get %s service: %v", role, err) } + c.Services[role] = nil + c.logger.Infof("could not find the cluster's %s service", role) actions, err := c.createService(role)