From 2b0ed1ab9f80835d86fd5fe176f325c74e06b30f Mon Sep 17 00:00:00 2001 From: erthalion <9erthalion6@gmail.com> Date: Tue, 12 Jun 2018 16:38:03 +0200 Subject: [PATCH] Base implementation of process for actions --- pkg/actions.go | 103 ------------------ pkg/cluster/actions.go | 196 +++++++++++++++++++++++++++++++++++ pkg/cluster/cluster.go | 17 +-- pkg/cluster/resources.go | 79 ++++++++------ pkg/cluster/sync.go | 73 ++++++------- pkg/cluster/util.go | 13 +++ pkg/controller/postgresql.go | 1 - 7 files changed, 300 insertions(+), 182 deletions(-) delete mode 100644 pkg/actions.go create mode 100644 pkg/cluster/actions.go diff --git a/pkg/actions.go b/pkg/actions.go deleted file mode 100644 index 56ec495aa..000000000 --- a/pkg/actions.go +++ /dev/null @@ -1,103 +0,0 @@ -package actions - -import ( - "crypto/md5" - "github.com/zalando-incubator/postgres-operator/pkg/cluster/types" - "github.com/zalando-incubator/postgres-operator/pkg/spec" - "k8s.io/client-go/pkg/api/v1" -) - -var NoActions []Action = []Action{} - -type ActionHash = [16]byte - -type SyncSecretsData struct { - secrets map[string]*v1.Secret -} - -type ServiceData struct { - name string - role Role - spec *v1.Service -} - -type SyncVolumesData struct { - volumeSpec spec.Volume -} - -type ActionData struct { - namespace NamespacedName -} - -type CreateService struct { - common ActionData - service ServiceData -} - -type UpdateService struct { - common ActionData - service ServiceData -} - -type RecreateService struct { - common ActionData - service ServiceData -} - -type DeleteService struct { - common ActionData - service ServiceData -} - -type Action interface { - process() (bool, error) - name() string - hash() ActionHash -} - -func (action UpdateService) process() (bool, error) { - -} - -func (action RecreateService) process() (bool, error) { -} - -func (action CreateService) process() (bool, error) { - -} - -func (action DeleteService) process() (bool, error) { - -} - -func (action UpdateService) hash() ActionHash { - return md5.Sum([]byte("update" + action.data.name)) -} - -func (action RecreateService) hash() ActionHash { - return md5.Sum([]byte("recreate" + action.data.name)) -} - -func (action CreateService) hash() ActionHash { - return md5.Sum([]byte("create" + action.data.name)) -} - -func (action DeleteService) hash() ActionHash { - return md5.Sum([]byte("delete" + action.data.name)) -} - -func (action UpdateService) name() string { - return fmt.Sprintf("Update service %s", action.service.name) -} - -func (action RecreateService) name() string { - return fmt.Sprintf("Recreate service %s", action.service.name) -} - -func (action CreateService) name() string { - return fmt.Sprintf("Create a new service") -} - -func (action DeleteService) name() string { - return fmt.Sprintf("Delete service %s", action.service.name) -} diff --git a/pkg/cluster/actions.go b/pkg/cluster/actions.go new file mode 100644 index 000000000..860b074b1 --- /dev/null +++ b/pkg/cluster/actions.go @@ -0,0 +1,196 @@ +package cluster + +import ( + "crypto/md5" + "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/pkg/api/v1" + + "github.com/zalando-incubator/postgres-operator/pkg/spec" +) + +var NoActions []Action = []Action{} + +type ActionHash = [16]byte + +var orphanDependents bool = true + +var deleteOptions *metav1.DeleteOptions = &metav1.DeleteOptions{ + OrphanDependents: &orphanDependents, +} + +type SyncSecretsData struct { + secrets map[string]*v1.Secret +} + +type ServiceData struct { + name string + role PostgresRole + service *v1.Service +} + +type SyncVolumesData struct { + volumeSpec spec.Volume +} + +type ActionData struct { + cluster *Cluster + namespace string +} + +type CreateService struct { + common ActionData + service ServiceData +} + +type UpdateService struct { + common ActionData + service ServiceData +} + +type DeleteService struct { + common ActionData + service ServiceData +} + +type Action interface { + Process() error + Name() string + Hash() ActionHash + GetCommon() ActionData + SetCluster(*Cluster) +} + +func CheckAction(action Action) error { + if action.GetCommon().cluster == nil { + return fmt.Errorf("no valid cluster for %v", action) + } + + return nil +} + +func (action UpdateService) Process() error { + var ( + err error + patchData []byte + updatedService *v1.Service + ) + if err := CheckAction(action); err != nil { + return err + } + common := action.GetCommon() + service := action.service.service + + if len(service.ObjectMeta.Annotations) > 0 { + patchData, err = servicePatchData(service.Spec, service.ObjectMeta.Annotations) + if err != nil { + msg := "could not prepare patch data with annotations for service %q: %v" + return fmt.Errorf(msg, action.service.name, err) + } + } else { + patchData, err = specPatch(service.Spec) + if err != nil { + msg := "could not prepare patch data for service %q: %v" + return fmt.Errorf(msg, action.service.name, err) + } + } + + if updatedService, err = common.cluster.KubeClient. + Services(common.namespace). + Patch( + action.service.name, + types.MergePatchType, + patchData, + ""); err != nil { + return err + } + + common.cluster.Services[action.service.role] = updatedService + return nil +} + +func (action CreateService) Process() error { + var ( + err error + newService *v1.Service + ) + + if err := CheckAction(action); err != nil { + return err + } + common := action.GetCommon() + + if newService, err = common.cluster.KubeClient. + Services(common.namespace). + Create(action.service.service); err != nil { + return err + } + + common.cluster.Services[action.service.role] = newService + return nil +} + +func (action DeleteService) Process() error { + if err := CheckAction(action); err != nil { + return err + } + common := action.GetCommon() + + if err := common.cluster.KubeClient. + Services(common.namespace). + Delete(action.service.name, deleteOptions); err != nil { + return err + } + + common.cluster.Services[action.service.role] = nil + return nil +} + +func (action UpdateService) SetCluster(client *Cluster) { + action.common.cluster = client +} + +func (action CreateService) SetCluster(client *Cluster) { + action.common.cluster = client +} + +func (action DeleteService) SetCluster(client *Cluster) { + action.common.cluster = client +} + +func (action UpdateService) GetCommon() ActionData { + return action.common +} + +func (action CreateService) GetCommon() ActionData { + return action.common +} + +func (action DeleteService) GetCommon() ActionData { + return action.common +} + +func (action UpdateService) Hash() ActionHash { + return md5.Sum([]byte("update" + action.service.name)) +} + +func (action CreateService) Hash() ActionHash { + return md5.Sum([]byte("create" + action.service.name)) +} + +func (action DeleteService) Hash() ActionHash { + return md5.Sum([]byte("delete" + action.service.name)) +} + +func (action UpdateService) Name() string { + return fmt.Sprintf("Update service %s", action.service.name) +} + +func (action CreateService) Name() string { + return fmt.Sprintf("Create a new service") +} + +func (action DeleteService) Name() string { + return fmt.Sprintf("Delete service %s", action.service.name) +} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 2662cd521..5a5c3f5b3 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -239,9 +239,9 @@ func (c *Cluster) Create() error { var ( err error - service *v1.Service - ep *v1.Endpoints - ss *v1beta1.StatefulSet + //service *v1.Service + ep *v1.Endpoints + ss *v1beta1.StatefulSet ) defer func() { @@ -272,11 +272,12 @@ func (c *Cluster) Create() error { if c.Services[role] != nil { return fmt.Errorf("service already exists in the cluster") } - service, err = c.createService(role) + _, err = c.createService(role) if err != nil { - return fmt.Errorf("could not create %s service: %v", role, err) + return fmt.Errorf("could not calculate actions to create %s service: %v", + role, err) } - c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) + //c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) } if err = c.initUsers(); err != nil { @@ -545,7 +546,7 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error { if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) || !reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) { c.logger.Debugf("syncing services") - if err := c.syncServices(); err != nil { + if _, err := c.syncServices(); err != nil { c.logger.Errorf("could not sync services: %v", err) updateFailed = true } @@ -656,7 +657,7 @@ func (c *Cluster) Delete() { c.logger.Warningf("could not delete %s endpoint: %v", role, err) } - if err := c.deleteService(role); err != nil { + if _, err := c.deleteService(role); err != nil { c.logger.Warningf("could not delete %s service: %v", role, err) } } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 4817a0639..b8ea245d3 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -11,7 +11,6 @@ import ( "k8s.io/client-go/pkg/apis/apps/v1beta1" policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" - "github.com/zalando-incubator/postgres-operator/pkg/cluster/sync" "github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" @@ -339,12 +338,14 @@ func (c *Cluster) createService(role PostgresRole) ([]Action, error) { c.setProcessName("creating %v service", role) serviceSpec := c.generateService(role, &c.Spec) - return []CreateService{ - ActionData{ - namespace: serviceName.Namespace, - }, - ServiceData{ - spec: serviceSpec, + return []Action{ + CreateService{ + ActionData{ + namespace: serviceSpec.Namespace, + }, + ServiceData{ + service: serviceSpec, + }, }, }, nil } @@ -355,50 +356,60 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Ac } serviceName := util.NameFromMeta(c.Services[role].ObjectMeta) - endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta) // TODO: check if it possible to change the service type with a patch in future versions of Kubernetes if newService.Spec.Type != c.Services[role].Spec.Type { // service type has changed, need to replace the service completely. // we cannot use just pach the current service, since it may contain attributes incompatible with the new type. - return []RecreateService{ - ActionData{ - namespace: serviceName.Namespace, + return []Action{ + DeleteService{ + ActionData{ + namespace: serviceName.Namespace, + }, + ServiceData{ + name: serviceName.Name, + role: role, + service: newService, + }, }, - ServiceData{ - name: serviceName.Name, - role: role, - spec: newService, + CreateService{ + ActionData{ + namespace: serviceName.Namespace, + }, + ServiceData{ + name: serviceName.Name, + role: role, + service: newService, + }, }, }, nil } // update the service annotation in order to propagate ELB notation. - patchData, err := specPatch(newService.Spec) - if err != nil { - return NoActions, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) - } - - return []CreateService{ - ActionData{ - namespace: serviceName.Namespace, - }, - ServiceData{ - name: serviceName.Name, - spec: newService, + return []Action{ + CreateService{ + ActionData{ + namespace: serviceName.Namespace, + }, + ServiceData{ + name: serviceName.Name, + service: newService, + }, }, }, nil } -func (c *Cluster) deleteService(role PostgresRole) error { +func (c *Cluster) deleteService(role PostgresRole) ([]Action, error) { service := c.Services[role] - return []DeleteService{ - ActionData{ - namespace: serviceName.Namespace, - }, - ServiceData{ - name: service.Name, + return []Action{ + DeleteService{ + ActionData{ + namespace: service.Namespace, + }, + ServiceData{ + name: service.Name, + }, }, }, nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 2d74086e7..404092fb5 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -19,6 +19,7 @@ import ( func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { c.mu.Lock() defer c.mu.Unlock() + var actions []Action = NoActions c.setSpec(newSpec) @@ -98,43 +99,42 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { return } -func (c *Cluster) syncServices() error { +func (c *Cluster) syncServices() (actions []Action, err error) { for _, role := range []PostgresRole{Master, Replica} { c.logger.Debugf("syncing %s service", role) - if err := c.syncEndpoint(role); err != nil { - return fmt.Errorf("could not sync %s endpont: %v", role, err) + if err = c.syncEndpoint(role); err != nil { + return NoActions, fmt.Errorf("could not sync %s endpont: %v", role, err) } - if err := c.syncService(role); err != nil { - return fmt.Errorf("could not sync %s service: %v", role, err) + if actions, err = c.syncService(role); err != nil { + return NoActions, fmt.Errorf("could not sync %s service: %v", role, err) + } + } + + return actions, nil +} + +func (c *Cluster) applyActions(actions []Action) (err error) { + uniqueActions := NoActions + hashMap := map[ActionHash]bool{} + for _, action := range actions { + if _, present := hashMap[action.Hash()]; !present { + hashMap[action.Hash()] = true + action.SetCluster(c) + uniqueActions = append(uniqueActions, action) + } + } + + for _, action := range uniqueActions { + if err := action.Process(); err != nil { + c.logger.Errorf("Can't apply action %s: %v", action.Name(), err) } } return nil } -func (c *Cluster) applyActions(actions []Action) (err error) { - uniqueActions = []Actions{} - hashMap = map[ActionHash]bool{} - for idx, action := range actions { - if _, present := uniqueActions[action.hash()]; !present { - uniqueActions[action.hash()] = true - append(uniqueActions, action) - } - } - - for action := range uniqueActions { - if critical, err := action.process(); err != nil { - c.logger.Errorf("Can't apply action %s: %v", action.name(), err) - } - - if critical == true { - return err - } - } -} - func (c *Cluster) syncService(role PostgresRole) ([]Action, error) { c.setProcessName("syncing %s service", role) @@ -142,12 +142,13 @@ func (c *Cluster) syncService(role PostgresRole) ([]Action, error) { if err == nil { c.Services[role] = svc desiredSvc := c.generateService(role, &c.Spec) - match, reason := k8sutil.SameService(svc, desiredSvc) + match, _ := k8sutil.SameService(svc, desiredSvc) if match { return NoActions, nil } - if actions, err := c.updateService(role, desiredSvc); err != nil { + actions, err := c.updateService(role, desiredSvc) + if err != nil { return NoActions, fmt.Errorf("could not update %s service to match desired state: %v", role, err) } @@ -157,14 +158,14 @@ func (c *Cluster) syncService(role PostgresRole) ([]Action, error) { } c.logger.Infof("could not find the cluster's %s service", role) - return []CreateService{ - ActionData{ - namespace: serviceName.Namespace, - }, - ServiceData{ - role: role, - }, - }, nil + actions, err := c.createService(role) + if err != nil { + return NoActions, fmt.Errorf( + "could not calculate actions to create %s service: %v", + role, err) + } + + return actions, nil } func (c *Cluster) syncEndpoint(role PostgresRole) error { diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index e7db26d82..028e3cfdf 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -24,6 +24,8 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) +type Annotations = map[string]string + // OAuthTokenGetter provides the method for fetching OAuth tokens type OAuthTokenGetter interface { getOAuthToken() (string, error) @@ -134,6 +136,17 @@ func normalizeUserFlags(userFlags []string) ([]string, error) { return flags, nil } +func servicePatchData(spec interface{}, annotations Annotations) ([]byte, error) { + var meta metav1.ObjectMeta = metav1.ObjectMeta{ + Annotations: annotations, + } + + return json.Marshal(struct { + SpecField interface{} `json:"spec"` + ObjMetaField interface{} `json:"metadata"` + }{spec, &meta}) +} + // specPatch produces a JSON of the Kubernetes object specification passed (typically service or // statefulset) to use it in a MergePatch. func specPatch(spec interface{}) ([]byte, error) { diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 1a3817ca7..cb689b70a 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -17,7 +17,6 @@ import ( "k8s.io/client-go/tools/cache" "github.com/zalando-incubator/postgres-operator/pkg/cluster" - "github.com/zalando-incubator/postgres-operator/pkg/events" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"