From 627bb51a0cc5583a9b901a2b4834a2236635ddac Mon Sep 17 00:00:00 2001 From: erthalion <9erthalion6@gmail.com> Date: Wed, 6 Jun 2018 15:56:22 +0200 Subject: [PATCH] Use interfaces --- pkg/actions.go | 91 +++++++++++++++++++++++++++++----- pkg/cluster/resources.go | 57 ++++++++++------------ pkg/cluster/sync.go | 95 ++++++++++++------------------------ pkg/controller/postgresql.go | 6 --- 4 files changed, 137 insertions(+), 112 deletions(-) diff --git a/pkg/actions.go b/pkg/actions.go index a2f16d221..56ec495aa 100644 --- a/pkg/actions.go +++ b/pkg/actions.go @@ -1,22 +1,16 @@ package actions import ( + "crypto/md5" "github.com/zalando-incubator/postgres-operator/pkg/cluster/types" "github.com/zalando-incubator/postgres-operator/pkg/spec" "k8s.io/client-go/pkg/api/v1" ) -type ActionType int - -const ( - UpdateService ActionType = iota - RecreateService - CreateService - DeleteService -) - var NoActions []Action = []Action{} +type ActionHash = [16]byte + type SyncSecretsData struct { secrets map[string]*v1.Secret } @@ -31,8 +25,79 @@ type SyncVolumesData struct { volumeSpec spec.Volume } -type Action struct { - actionType actionType - namespace NamespacedName - data interface{} +type ActionData struct { + namespace NamespacedName +} + +type CreateService struct { + common ActionData + service ServiceData +} + +type UpdateService struct { + common ActionData + service ServiceData +} + +type RecreateService struct { + common ActionData + service ServiceData +} + +type DeleteService struct { + common ActionData + service ServiceData +} + +type Action interface { + process() (bool, error) + name() string + hash() ActionHash +} + +func (action UpdateService) process() (bool, error) { + +} + +func (action RecreateService) process() (bool, error) { +} + +func (action CreateService) process() (bool, error) { + +} + +func (action DeleteService) process() (bool, error) { + +} + +func (action UpdateService) hash() ActionHash { + return md5.Sum([]byte("update" + action.data.name)) +} + +func (action RecreateService) hash() ActionHash { + return md5.Sum([]byte("recreate" + action.data.name)) +} + +func (action CreateService) hash() ActionHash { + return md5.Sum([]byte("create" + action.data.name)) +} + +func (action DeleteService) hash() ActionHash { + return md5.Sum([]byte("delete" + action.data.name)) +} + +func (action UpdateService) name() string { + return fmt.Sprintf("Update service %s", action.service.name) +} + +func (action RecreateService) name() string { + return fmt.Sprintf("Recreate service %s", action.service.name) +} + +func (action CreateService) name() string { + return fmt.Sprintf("Create a new service") +} + +func (action DeleteService) name() string { + return fmt.Sprintf("Delete service %s", action.service.name) } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 895988af4..4817a0639 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -339,13 +339,12 @@ func (c *Cluster) createService(role PostgresRole) ([]Action, error) { c.setProcessName("creating %v service", role) serviceSpec := c.generateService(role, &c.Spec) - return []Action{ - Action{ - CreateService, - serviceName.Namespace, - ServiceData{ - spec: serviceSpec, - }, + return []CreateService{ + ActionData{ + namespace: serviceName.Namespace, + }, + ServiceData{ + spec: serviceSpec, }, }, nil } @@ -362,14 +361,14 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Ac if newService.Spec.Type != c.Services[role].Spec.Type { // service type has changed, need to replace the service completely. // we cannot use just pach the current service, since it may contain attributes incompatible with the new type. - return []Action{ - Action{ - RecreateService, - ServiceData{ - name: serviceName.Name, - role: role, - spec: newService, - }, + return []RecreateService{ + ActionData{ + namespace: serviceName.Namespace, + }, + ServiceData{ + name: serviceName.Name, + role: role, + spec: newService, }, }, nil } @@ -380,14 +379,13 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Ac return NoActions, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) } - return []Action{ - Action{ - CreateService, - serviceName.Namespace, - ServiceData{ - name: serviceName.Name, - spec: newService, - }, + return []CreateService{ + ActionData{ + namespace: serviceName.Namespace, + }, + ServiceData{ + name: serviceName.Name, + spec: newService, }, }, nil } @@ -395,13 +393,12 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Ac func (c *Cluster) deleteService(role PostgresRole) error { service := c.Services[role] - return []Action{ - Action{ - DeleteService, - serviceName.Namespace, - ServiceData{ - name: service.Name, - }, + return []DeleteService{ + ActionData{ + namespace: serviceName.Namespace, + }, + ServiceData{ + name: service.Name, }, }, nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index ef61c0405..84b9d7698 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -14,50 +14,9 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" ) -var syncChannel chan Action - -func (c *Cluster) SyncActions(newSpec *spec.Postgresql) (err error) { - syncSecrets = Action{ - SyncSecrets, - SyncSecretsData{ - c.generateUserSecrets(), - }, - } - - syncServiceMaster = Action{ - SyncService, - SyncServiceData{ - Master, - }, - } - - syncServiceReplica = Action{ - SyncService, - SyncServiceData{ - Replica, - }, - } - - syncVolumes = Action{ - SyncVolumes, - SyncVolumesData{ - c.Spec.Volume, - }, - } - - return []Action{ - syncSecrets, - syncServiceMaster, - syncServiceReplica, - syncVolumes, - } -} - // Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest. // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them. func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { - syncChannel = make(chan Action) - c.mu.Lock() defer c.mu.Unlock() @@ -86,8 +45,13 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { } c.logger.Debugf("syncing services") - if err = c.syncServices(); err != nil { - err = fmt.Errorf("could not sync services: %v", err) + if actions, err = c.syncServices(); err != nil { + err = fmt.Errorf("could not resolve actions to sync services: %v", err) + return + } + + if err = c.applyActions(actions); err != nil { + err = fmt.Errorf("could not apply actions to sync services: %v", err) return } @@ -150,6 +114,23 @@ func (c *Cluster) syncServices() error { return nil } +func (c *Cluster) applyActions(actions []Action) (err error) { + uniqueActions = []Actions{} + hashMap = map[ActionHash]bool{} + for idx, action := range actions { + if _, present := uniqueActions[action.hash()]; !present { + uniqueActions[action.hash()] = true + append(uniqueActions, action) + } + } + + for action := range uniqueActions { + if dontStop, err := action.process(); err != nil && !dontStop { + c.logger.Errorf("Can't apply action %s: %v", action.name(), err) + } + } +} + func (c *Cluster) syncService(role PostgresRole) ([]Action, error) { c.setProcessName("syncing %s service", role) @@ -170,28 +151,16 @@ func (c *Cluster) syncService(role PostgresRole) ([]Action, error) { } else if !k8sutil.ResourceNotFound(err) { return NoActions, fmt.Errorf("could not get %s service: %v", role, err) } - c.Services[role] = nil - c.logger.Infof("could not find the cluster's %s service", role) - if svc, err := c.createService(role); err != nil { - if k8sutil.ResourceAlreadyExists(err) { - c.logger.Infof("%s service %q already exists", role, util.NameFromMeta(svc.ObjectMeta)) - svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}) - if err == nil { - c.Services[role] = svc - } else { - c.logger.Infof("could not fetch existing %s service: %v", role, err) - } - } else { - return fmt.Errorf("could not create missing %s service: %v", role, err) - } - } else { - c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) - c.Services[role] = svc - } - - return nil + return []CreateService{ + ActionData{ + namespace: serviceName.Namespace, + }, + ServiceData{ + role: role, + }, + }, nil } func (c *Cluster) syncEndpoint(role PostgresRole) error { diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 5073ea49b..1a3817ca7 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -264,12 +264,6 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { } c.curWorkerCluster.Store(event.WorkerID, cl) - actions := cl.SyncActions(event.NewSpec) - - for i, processor := range c.changeProcessors { - processor(changes) - } - if err := cl.Sync(event.NewSpec); err != nil { cl.Error = fmt.Errorf("could not sync cluster: %v", err) lg.Error(cl.Error)