Base implementation of process for actions

This commit is contained in:
erthalion 2018-06-12 16:38:03 +02:00
parent bd0cd62151
commit 2b0ed1ab9f
7 changed files with 300 additions and 182 deletions

View File

@ -1,103 +0,0 @@
package actions
import (
"crypto/md5"
"github.com/zalando-incubator/postgres-operator/pkg/cluster/types"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"k8s.io/client-go/pkg/api/v1"
)
var NoActions []Action = []Action{}
type ActionHash = [16]byte
type SyncSecretsData struct {
secrets map[string]*v1.Secret
}
type ServiceData struct {
name string
role Role
spec *v1.Service
}
type SyncVolumesData struct {
volumeSpec spec.Volume
}
type ActionData struct {
namespace NamespacedName
}
type CreateService struct {
common ActionData
service ServiceData
}
type UpdateService struct {
common ActionData
service ServiceData
}
type RecreateService struct {
common ActionData
service ServiceData
}
type DeleteService struct {
common ActionData
service ServiceData
}
type Action interface {
process() (bool, error)
name() string
hash() ActionHash
}
func (action UpdateService) process() (bool, error) {
}
func (action RecreateService) process() (bool, error) {
}
func (action CreateService) process() (bool, error) {
}
func (action DeleteService) process() (bool, error) {
}
func (action UpdateService) hash() ActionHash {
return md5.Sum([]byte("update" + action.data.name))
}
func (action RecreateService) hash() ActionHash {
return md5.Sum([]byte("recreate" + action.data.name))
}
func (action CreateService) hash() ActionHash {
return md5.Sum([]byte("create" + action.data.name))
}
func (action DeleteService) hash() ActionHash {
return md5.Sum([]byte("delete" + action.data.name))
}
func (action UpdateService) name() string {
return fmt.Sprintf("Update service %s", action.service.name)
}
func (action RecreateService) name() string {
return fmt.Sprintf("Recreate service %s", action.service.name)
}
func (action CreateService) name() string {
return fmt.Sprintf("Create a new service")
}
func (action DeleteService) name() string {
return fmt.Sprintf("Delete service %s", action.service.name)
}

196
pkg/cluster/actions.go Normal file
View File

@ -0,0 +1,196 @@
package cluster
import (
"crypto/md5"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
)
var NoActions []Action = []Action{}
type ActionHash = [16]byte
var orphanDependents bool = true
var deleteOptions *metav1.DeleteOptions = &metav1.DeleteOptions{
OrphanDependents: &orphanDependents,
}
type SyncSecretsData struct {
secrets map[string]*v1.Secret
}
type ServiceData struct {
name string
role PostgresRole
service *v1.Service
}
type SyncVolumesData struct {
volumeSpec spec.Volume
}
type ActionData struct {
cluster *Cluster
namespace string
}
type CreateService struct {
common ActionData
service ServiceData
}
type UpdateService struct {
common ActionData
service ServiceData
}
type DeleteService struct {
common ActionData
service ServiceData
}
type Action interface {
Process() error
Name() string
Hash() ActionHash
GetCommon() ActionData
SetCluster(*Cluster)
}
func CheckAction(action Action) error {
if action.GetCommon().cluster == nil {
return fmt.Errorf("no valid cluster for %v", action)
}
return nil
}
func (action UpdateService) Process() error {
var (
err error
patchData []byte
updatedService *v1.Service
)
if err := CheckAction(action); err != nil {
return err
}
common := action.GetCommon()
service := action.service.service
if len(service.ObjectMeta.Annotations) > 0 {
patchData, err = servicePatchData(service.Spec, service.ObjectMeta.Annotations)
if err != nil {
msg := "could not prepare patch data with annotations for service %q: %v"
return fmt.Errorf(msg, action.service.name, err)
}
} else {
patchData, err = specPatch(service.Spec)
if err != nil {
msg := "could not prepare patch data for service %q: %v"
return fmt.Errorf(msg, action.service.name, err)
}
}
if updatedService, err = common.cluster.KubeClient.
Services(common.namespace).
Patch(
action.service.name,
types.MergePatchType,
patchData,
""); err != nil {
return err
}
common.cluster.Services[action.service.role] = updatedService
return nil
}
func (action CreateService) Process() error {
var (
err error
newService *v1.Service
)
if err := CheckAction(action); err != nil {
return err
}
common := action.GetCommon()
if newService, err = common.cluster.KubeClient.
Services(common.namespace).
Create(action.service.service); err != nil {
return err
}
common.cluster.Services[action.service.role] = newService
return nil
}
func (action DeleteService) Process() error {
if err := CheckAction(action); err != nil {
return err
}
common := action.GetCommon()
if err := common.cluster.KubeClient.
Services(common.namespace).
Delete(action.service.name, deleteOptions); err != nil {
return err
}
common.cluster.Services[action.service.role] = nil
return nil
}
func (action UpdateService) SetCluster(client *Cluster) {
action.common.cluster = client
}
func (action CreateService) SetCluster(client *Cluster) {
action.common.cluster = client
}
func (action DeleteService) SetCluster(client *Cluster) {
action.common.cluster = client
}
func (action UpdateService) GetCommon() ActionData {
return action.common
}
func (action CreateService) GetCommon() ActionData {
return action.common
}
func (action DeleteService) GetCommon() ActionData {
return action.common
}
func (action UpdateService) Hash() ActionHash {
return md5.Sum([]byte("update" + action.service.name))
}
func (action CreateService) Hash() ActionHash {
return md5.Sum([]byte("create" + action.service.name))
}
func (action DeleteService) Hash() ActionHash {
return md5.Sum([]byte("delete" + action.service.name))
}
func (action UpdateService) Name() string {
return fmt.Sprintf("Update service %s", action.service.name)
}
func (action CreateService) Name() string {
return fmt.Sprintf("Create a new service")
}
func (action DeleteService) Name() string {
return fmt.Sprintf("Delete service %s", action.service.name)
}

View File

@ -239,9 +239,9 @@ func (c *Cluster) Create() error {
var ( var (
err error err error
service *v1.Service //service *v1.Service
ep *v1.Endpoints ep *v1.Endpoints
ss *v1beta1.StatefulSet ss *v1beta1.StatefulSet
) )
defer func() { defer func() {
@ -272,11 +272,12 @@ func (c *Cluster) Create() error {
if c.Services[role] != nil { if c.Services[role] != nil {
return fmt.Errorf("service already exists in the cluster") return fmt.Errorf("service already exists in the cluster")
} }
service, err = c.createService(role) _, err = c.createService(role)
if err != nil { if err != nil {
return fmt.Errorf("could not create %s service: %v", role, err) return fmt.Errorf("could not calculate actions to create %s service: %v",
role, err)
} }
c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) //c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta))
} }
if err = c.initUsers(); err != nil { if err = c.initUsers(); err != nil {
@ -545,7 +546,7 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) || if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) { !reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) {
c.logger.Debugf("syncing services") c.logger.Debugf("syncing services")
if err := c.syncServices(); err != nil { if _, err := c.syncServices(); err != nil {
c.logger.Errorf("could not sync services: %v", err) c.logger.Errorf("could not sync services: %v", err)
updateFailed = true updateFailed = true
} }
@ -656,7 +657,7 @@ func (c *Cluster) Delete() {
c.logger.Warningf("could not delete %s endpoint: %v", role, err) c.logger.Warningf("could not delete %s endpoint: %v", role, err)
} }
if err := c.deleteService(role); err != nil { if _, err := c.deleteService(role); err != nil {
c.logger.Warningf("could not delete %s service: %v", role, err) c.logger.Warningf("could not delete %s service: %v", role, err)
} }
} }

View File

@ -11,7 +11,6 @@ import (
"k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/apis/apps/v1beta1"
policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1"
"github.com/zalando-incubator/postgres-operator/pkg/cluster/sync"
"github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
@ -339,12 +338,14 @@ func (c *Cluster) createService(role PostgresRole) ([]Action, error) {
c.setProcessName("creating %v service", role) c.setProcessName("creating %v service", role)
serviceSpec := c.generateService(role, &c.Spec) serviceSpec := c.generateService(role, &c.Spec)
return []CreateService{ return []Action{
ActionData{ CreateService{
namespace: serviceName.Namespace, ActionData{
}, namespace: serviceSpec.Namespace,
ServiceData{ },
spec: serviceSpec, ServiceData{
service: serviceSpec,
},
}, },
}, nil }, nil
} }
@ -355,50 +356,60 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Ac
} }
serviceName := util.NameFromMeta(c.Services[role].ObjectMeta) serviceName := util.NameFromMeta(c.Services[role].ObjectMeta)
endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta)
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes // TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
if newService.Spec.Type != c.Services[role].Spec.Type { if newService.Spec.Type != c.Services[role].Spec.Type {
// service type has changed, need to replace the service completely. // service type has changed, need to replace the service completely.
// we cannot use just pach the current service, since it may contain attributes incompatible with the new type. // we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
return []RecreateService{ return []Action{
ActionData{ DeleteService{
namespace: serviceName.Namespace, ActionData{
namespace: serviceName.Namespace,
},
ServiceData{
name: serviceName.Name,
role: role,
service: newService,
},
}, },
ServiceData{ CreateService{
name: serviceName.Name, ActionData{
role: role, namespace: serviceName.Namespace,
spec: newService, },
ServiceData{
name: serviceName.Name,
role: role,
service: newService,
},
}, },
}, nil }, nil
} }
// update the service annotation in order to propagate ELB notation. // update the service annotation in order to propagate ELB notation.
patchData, err := specPatch(newService.Spec) return []Action{
if err != nil { CreateService{
return NoActions, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) ActionData{
} namespace: serviceName.Namespace,
},
return []CreateService{ ServiceData{
ActionData{ name: serviceName.Name,
namespace: serviceName.Namespace, service: newService,
}, },
ServiceData{
name: serviceName.Name,
spec: newService,
}, },
}, nil }, nil
} }
func (c *Cluster) deleteService(role PostgresRole) error { func (c *Cluster) deleteService(role PostgresRole) ([]Action, error) {
service := c.Services[role] service := c.Services[role]
return []DeleteService{ return []Action{
ActionData{ DeleteService{
namespace: serviceName.Namespace, ActionData{
}, namespace: service.Namespace,
ServiceData{ },
name: service.Name, ServiceData{
name: service.Name,
},
}, },
}, nil }, nil
} }

View File

@ -19,6 +19,7 @@ import (
func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
var actions []Action = NoActions
c.setSpec(newSpec) c.setSpec(newSpec)
@ -98,43 +99,42 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
return return
} }
func (c *Cluster) syncServices() error { func (c *Cluster) syncServices() (actions []Action, err error) {
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []PostgresRole{Master, Replica} {
c.logger.Debugf("syncing %s service", role) c.logger.Debugf("syncing %s service", role)
if err := c.syncEndpoint(role); err != nil { if err = c.syncEndpoint(role); err != nil {
return fmt.Errorf("could not sync %s endpont: %v", role, err) return NoActions, fmt.Errorf("could not sync %s endpont: %v", role, err)
} }
if err := c.syncService(role); err != nil { if actions, err = c.syncService(role); err != nil {
return fmt.Errorf("could not sync %s service: %v", role, err) return NoActions, fmt.Errorf("could not sync %s service: %v", role, err)
}
}
return actions, nil
}
func (c *Cluster) applyActions(actions []Action) (err error) {
uniqueActions := NoActions
hashMap := map[ActionHash]bool{}
for _, action := range actions {
if _, present := hashMap[action.Hash()]; !present {
hashMap[action.Hash()] = true
action.SetCluster(c)
uniqueActions = append(uniqueActions, action)
}
}
for _, action := range uniqueActions {
if err := action.Process(); err != nil {
c.logger.Errorf("Can't apply action %s: %v", action.Name(), err)
} }
} }
return nil return nil
} }
func (c *Cluster) applyActions(actions []Action) (err error) {
uniqueActions = []Actions{}
hashMap = map[ActionHash]bool{}
for idx, action := range actions {
if _, present := uniqueActions[action.hash()]; !present {
uniqueActions[action.hash()] = true
append(uniqueActions, action)
}
}
for action := range uniqueActions {
if critical, err := action.process(); err != nil {
c.logger.Errorf("Can't apply action %s: %v", action.name(), err)
}
if critical == true {
return err
}
}
}
func (c *Cluster) syncService(role PostgresRole) ([]Action, error) { func (c *Cluster) syncService(role PostgresRole) ([]Action, error) {
c.setProcessName("syncing %s service", role) c.setProcessName("syncing %s service", role)
@ -142,12 +142,13 @@ func (c *Cluster) syncService(role PostgresRole) ([]Action, error) {
if err == nil { if err == nil {
c.Services[role] = svc c.Services[role] = svc
desiredSvc := c.generateService(role, &c.Spec) desiredSvc := c.generateService(role, &c.Spec)
match, reason := k8sutil.SameService(svc, desiredSvc) match, _ := k8sutil.SameService(svc, desiredSvc)
if match { if match {
return NoActions, nil return NoActions, nil
} }
if actions, err := c.updateService(role, desiredSvc); err != nil { actions, err := c.updateService(role, desiredSvc)
if err != nil {
return NoActions, fmt.Errorf("could not update %s service to match desired state: %v", role, err) return NoActions, fmt.Errorf("could not update %s service to match desired state: %v", role, err)
} }
@ -157,14 +158,14 @@ func (c *Cluster) syncService(role PostgresRole) ([]Action, error) {
} }
c.logger.Infof("could not find the cluster's %s service", role) c.logger.Infof("could not find the cluster's %s service", role)
return []CreateService{ actions, err := c.createService(role)
ActionData{ if err != nil {
namespace: serviceName.Namespace, return NoActions, fmt.Errorf(
}, "could not calculate actions to create %s service: %v",
ServiceData{ role, err)
role: role, }
},
}, nil return actions, nil
} }
func (c *Cluster) syncEndpoint(role PostgresRole) error { func (c *Cluster) syncEndpoint(role PostgresRole) error {

View File

@ -24,6 +24,8 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
) )
type Annotations = map[string]string
// OAuthTokenGetter provides the method for fetching OAuth tokens // OAuthTokenGetter provides the method for fetching OAuth tokens
type OAuthTokenGetter interface { type OAuthTokenGetter interface {
getOAuthToken() (string, error) getOAuthToken() (string, error)
@ -134,6 +136,17 @@ func normalizeUserFlags(userFlags []string) ([]string, error) {
return flags, nil return flags, nil
} }
func servicePatchData(spec interface{}, annotations Annotations) ([]byte, error) {
var meta metav1.ObjectMeta = metav1.ObjectMeta{
Annotations: annotations,
}
return json.Marshal(struct {
SpecField interface{} `json:"spec"`
ObjMetaField interface{} `json:"metadata"`
}{spec, &meta})
}
// specPatch produces a JSON of the Kubernetes object specification passed (typically service or // specPatch produces a JSON of the Kubernetes object specification passed (typically service or
// statefulset) to use it in a MergePatch. // statefulset) to use it in a MergePatch.
func specPatch(spec interface{}) ([]byte, error) { func specPatch(spec interface{}) ([]byte, error) {

View File

@ -17,7 +17,6 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/cluster"
"github.com/zalando-incubator/postgres-operator/pkg/events"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"