Use interfaces

This commit is contained in:
erthalion 2018-06-06 15:56:22 +02:00
parent 2081fc47f1
commit 627bb51a0c
4 changed files with 137 additions and 112 deletions

View File

@ -1,22 +1,16 @@
package actions package actions
import ( import (
"crypto/md5"
"github.com/zalando-incubator/postgres-operator/pkg/cluster/types" "github.com/zalando-incubator/postgres-operator/pkg/cluster/types"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
) )
type ActionType int
const (
UpdateService ActionType = iota
RecreateService
CreateService
DeleteService
)
var NoActions []Action = []Action{} var NoActions []Action = []Action{}
type ActionHash = [16]byte
type SyncSecretsData struct { type SyncSecretsData struct {
secrets map[string]*v1.Secret secrets map[string]*v1.Secret
} }
@ -31,8 +25,79 @@ type SyncVolumesData struct {
volumeSpec spec.Volume volumeSpec spec.Volume
} }
type Action struct { type ActionData struct {
actionType actionType namespace NamespacedName
namespace NamespacedName }
data interface{}
type CreateService struct {
common ActionData
service ServiceData
}
type UpdateService struct {
common ActionData
service ServiceData
}
type RecreateService struct {
common ActionData
service ServiceData
}
type DeleteService struct {
common ActionData
service ServiceData
}
type Action interface {
process() (bool, error)
name() string
hash() ActionHash
}
func (action UpdateService) process() (bool, error) {
}
func (action RecreateService) process() (bool, error) {
}
func (action CreateService) process() (bool, error) {
}
func (action DeleteService) process() (bool, error) {
}
func (action UpdateService) hash() ActionHash {
return md5.Sum([]byte("update" + action.data.name))
}
func (action RecreateService) hash() ActionHash {
return md5.Sum([]byte("recreate" + action.data.name))
}
func (action CreateService) hash() ActionHash {
return md5.Sum([]byte("create" + action.data.name))
}
func (action DeleteService) hash() ActionHash {
return md5.Sum([]byte("delete" + action.data.name))
}
func (action UpdateService) name() string {
return fmt.Sprintf("Update service %s", action.service.name)
}
func (action RecreateService) name() string {
return fmt.Sprintf("Recreate service %s", action.service.name)
}
func (action CreateService) name() string {
return fmt.Sprintf("Create a new service")
}
func (action DeleteService) name() string {
return fmt.Sprintf("Delete service %s", action.service.name)
} }

View File

@ -339,13 +339,12 @@ func (c *Cluster) createService(role PostgresRole) ([]Action, error) {
c.setProcessName("creating %v service", role) c.setProcessName("creating %v service", role)
serviceSpec := c.generateService(role, &c.Spec) serviceSpec := c.generateService(role, &c.Spec)
return []Action{ return []CreateService{
Action{ ActionData{
CreateService, namespace: serviceName.Namespace,
serviceName.Namespace, },
ServiceData{ ServiceData{
spec: serviceSpec, spec: serviceSpec,
},
}, },
}, nil }, nil
} }
@ -362,14 +361,14 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Ac
if newService.Spec.Type != c.Services[role].Spec.Type { if newService.Spec.Type != c.Services[role].Spec.Type {
// service type has changed, need to replace the service completely. // service type has changed, need to replace the service completely.
// we cannot use just pach the current service, since it may contain attributes incompatible with the new type. // we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
return []Action{ return []RecreateService{
Action{ ActionData{
RecreateService, namespace: serviceName.Namespace,
ServiceData{ },
name: serviceName.Name, ServiceData{
role: role, name: serviceName.Name,
spec: newService, role: role,
}, spec: newService,
}, },
}, nil }, nil
} }
@ -380,14 +379,13 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Ac
return NoActions, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) return NoActions, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
} }
return []Action{ return []CreateService{
Action{ ActionData{
CreateService, namespace: serviceName.Namespace,
serviceName.Namespace, },
ServiceData{ ServiceData{
name: serviceName.Name, name: serviceName.Name,
spec: newService, spec: newService,
},
}, },
}, nil }, nil
} }
@ -395,13 +393,12 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Ac
func (c *Cluster) deleteService(role PostgresRole) error { func (c *Cluster) deleteService(role PostgresRole) error {
service := c.Services[role] service := c.Services[role]
return []Action{ return []DeleteService{
Action{ ActionData{
DeleteService, namespace: serviceName.Namespace,
serviceName.Namespace, },
ServiceData{ ServiceData{
name: service.Name, name: service.Name,
},
}, },
}, nil }, nil
} }

View File

@ -14,50 +14,9 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/volumes" "github.com/zalando-incubator/postgres-operator/pkg/util/volumes"
) )
var syncChannel chan Action
func (c *Cluster) SyncActions(newSpec *spec.Postgresql) (err error) {
syncSecrets = Action{
SyncSecrets,
SyncSecretsData{
c.generateUserSecrets(),
},
}
syncServiceMaster = Action{
SyncService,
SyncServiceData{
Master,
},
}
syncServiceReplica = Action{
SyncService,
SyncServiceData{
Replica,
},
}
syncVolumes = Action{
SyncVolumes,
SyncVolumesData{
c.Spec.Volume,
},
}
return []Action{
syncSecrets,
syncServiceMaster,
syncServiceReplica,
syncVolumes,
}
}
// Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest. // Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest.
// Unlike the update, sync does not error out if some objects do not exist and takes care of creating them. // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them.
func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
syncChannel = make(chan Action)
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -86,8 +45,13 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
} }
c.logger.Debugf("syncing services") c.logger.Debugf("syncing services")
if err = c.syncServices(); err != nil { if actions, err = c.syncServices(); err != nil {
err = fmt.Errorf("could not sync services: %v", err) err = fmt.Errorf("could not resolve actions to sync services: %v", err)
return
}
if err = c.applyActions(actions); err != nil {
err = fmt.Errorf("could not apply actions to sync services: %v", err)
return return
} }
@ -150,6 +114,23 @@ func (c *Cluster) syncServices() error {
return nil return nil
} }
func (c *Cluster) applyActions(actions []Action) (err error) {
uniqueActions = []Actions{}
hashMap = map[ActionHash]bool{}
for idx, action := range actions {
if _, present := uniqueActions[action.hash()]; !present {
uniqueActions[action.hash()] = true
append(uniqueActions, action)
}
}
for action := range uniqueActions {
if dontStop, err := action.process(); err != nil && !dontStop {
c.logger.Errorf("Can't apply action %s: %v", action.name(), err)
}
}
}
func (c *Cluster) syncService(role PostgresRole) ([]Action, error) { func (c *Cluster) syncService(role PostgresRole) ([]Action, error) {
c.setProcessName("syncing %s service", role) c.setProcessName("syncing %s service", role)
@ -170,28 +151,16 @@ func (c *Cluster) syncService(role PostgresRole) ([]Action, error) {
} else if !k8sutil.ResourceNotFound(err) { } else if !k8sutil.ResourceNotFound(err) {
return NoActions, fmt.Errorf("could not get %s service: %v", role, err) return NoActions, fmt.Errorf("could not get %s service: %v", role, err)
} }
c.Services[role] = nil
c.logger.Infof("could not find the cluster's %s service", role) c.logger.Infof("could not find the cluster's %s service", role)
if svc, err := c.createService(role); err != nil { return []CreateService{
if k8sutil.ResourceAlreadyExists(err) { ActionData{
c.logger.Infof("%s service %q already exists", role, util.NameFromMeta(svc.ObjectMeta)) namespace: serviceName.Namespace,
svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}) },
if err == nil { ServiceData{
c.Services[role] = svc role: role,
} else { },
c.logger.Infof("could not fetch existing %s service: %v", role, err) }, nil
}
} else {
return fmt.Errorf("could not create missing %s service: %v", role, err)
}
} else {
c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta))
c.Services[role] = svc
}
return nil
} }
func (c *Cluster) syncEndpoint(role PostgresRole) error { func (c *Cluster) syncEndpoint(role PostgresRole) error {

View File

@ -264,12 +264,6 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
} }
c.curWorkerCluster.Store(event.WorkerID, cl) c.curWorkerCluster.Store(event.WorkerID, cl)
actions := cl.SyncActions(event.NewSpec)
for i, processor := range c.changeProcessors {
processor(changes)
}
if err := cl.Sync(event.NewSpec); err != nil { if err := cl.Sync(event.NewSpec); err != nil {
cl.Error = fmt.Errorf("could not sync cluster: %v", err) cl.Error = fmt.Errorf("could not sync cluster: %v", err)
lg.Error(cl.Error) lg.Error(cl.Error)