Experiments with actions

This commit is contained in:
erthalion 2018-06-01 16:04:25 +02:00
parent d9d2c5cbe5
commit 2081fc47f1
4 changed files with 136 additions and 94 deletions

38
pkg/actions.go Normal file
View File

@ -0,0 +1,38 @@
package actions
import (
"github.com/zalando-incubator/postgres-operator/pkg/cluster/types"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"k8s.io/client-go/pkg/api/v1"
)
type ActionType int
const (
UpdateService ActionType = iota
RecreateService
CreateService
DeleteService
)
var NoActions []Action = []Action{}
type SyncSecretsData struct {
secrets map[string]*v1.Secret
}
type ServiceData struct {
name string
role Role
spec *v1.Service
}
type SyncVolumesData struct {
volumeSpec spec.Volume
}
type Action struct {
actionType actionType
namespace NamespacedName
data interface{}
}

View File

@ -11,6 +11,7 @@ import (
"k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/apis/apps/v1beta1"
policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1"
"github.com/zalando-incubator/postgres-operator/pkg/cluster/sync"
"github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
@ -334,118 +335,75 @@ func (c *Cluster) deleteStatefulSet() error {
return nil return nil
} }
func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { func (c *Cluster) createService(role PostgresRole) ([]Action, error) {
c.setProcessName("creating %v service", role) c.setProcessName("creating %v service", role)
serviceSpec := c.generateService(role, &c.Spec) serviceSpec := c.generateService(role, &c.Spec)
service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec) return []Action{
if err != nil { Action{
return nil, err CreateService,
serviceName.Namespace,
ServiceData{
spec: serviceSpec,
},
},
}, nil
} }
c.Services[role] = service func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Action, error) {
return service, nil
}
func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error {
c.setProcessName("updating %v service", role)
if c.Services[role] == nil { if c.Services[role] == nil {
return fmt.Errorf("there is no service in the cluster") return NoActions, fmt.Errorf("there is no service in the cluster")
} }
serviceName := util.NameFromMeta(c.Services[role].ObjectMeta) serviceName := util.NameFromMeta(c.Services[role].ObjectMeta)
endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta) endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta)
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes // TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
if newService.Spec.Type != c.Services[role].Spec.Type { if newService.Spec.Type != c.Services[role].Spec.Type {
// service type has changed, need to replace the service completely. // service type has changed, need to replace the service completely.
// we cannot use just pach the current service, since it may contain attributes incompatible with the new type. // we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
var ( return []Action{
currentEndpoint *v1.Endpoints Action{
err error RecreateService,
) ServiceData{
name: serviceName.Name,
if role == Master { role: role,
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of spec: newService,
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) },
currentEndpoint, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}) },
if err != nil { }, nil
return fmt.Errorf("could not get current cluster %s endpoints: %v", role, err)
}
}
err = c.KubeClient.Services(serviceName.Namespace).Delete(serviceName.Name, c.deleteOptions)
if err != nil {
return fmt.Errorf("could not delete service %q: %v", serviceName, err)
}
c.Endpoints[role] = nil
svc, err := c.KubeClient.Services(serviceName.Namespace).Create(newService)
if err != nil {
return fmt.Errorf("could not create service %q: %v", serviceName, err)
}
c.Services[role] = svc
if role == Master {
// create the new endpoint using the addresses obtained from the previous one
endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets)
ep, err := c.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec)
if err != nil {
return fmt.Errorf("could not create endpoint %q: %v", endpointName, err)
}
c.Endpoints[role] = ep
}
return nil
} }
// update the service annotation in order to propagate ELB notation. // update the service annotation in order to propagate ELB notation.
if len(newService.ObjectMeta.Annotations) > 0 {
if annotationsPatchData, err := metaAnnotationsPatch(newService.ObjectMeta.Annotations); err == nil {
_, err = c.KubeClient.Services(serviceName.Namespace).Patch(
serviceName.Name,
types.MergePatchType,
[]byte(annotationsPatchData), "")
if err != nil {
return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err)
}
} else {
return fmt.Errorf("could not form patch for the service metadata: %v", err)
}
}
patchData, err := specPatch(newService.Spec) patchData, err := specPatch(newService.Spec)
if err != nil { if err != nil {
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) return NoActions, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
} }
// update the service spec return []Action{
svc, err := c.KubeClient.Services(serviceName.Namespace).Patch( Action{
serviceName.Name, CreateService,
types.MergePatchType, serviceName.Namespace,
patchData, "") ServiceData{
if err != nil { name: serviceName.Name,
return fmt.Errorf("could not patch service %q: %v", serviceName, err) spec: newService,
} },
c.Services[role] = svc },
}, nil
return nil
} }
func (c *Cluster) deleteService(role PostgresRole) error { func (c *Cluster) deleteService(role PostgresRole) error {
c.logger.Debugf("deleting service %s", role)
service := c.Services[role] service := c.Services[role]
if err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions); err != nil { return []Action{
return err Action{
} DeleteService,
serviceName.Namespace,
c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) ServiceData{
c.Services[role] = nil name: service.Name,
},
return nil },
}, nil
} }
func (c *Cluster) createEndpoint(role PostgresRole) (*v1.Endpoints, error) { func (c *Cluster) createEndpoint(role PostgresRole) (*v1.Endpoints, error) {

View File

@ -14,9 +14,50 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/volumes" "github.com/zalando-incubator/postgres-operator/pkg/util/volumes"
) )
var syncChannel chan Action
func (c *Cluster) SyncActions(newSpec *spec.Postgresql) (err error) {
syncSecrets = Action{
SyncSecrets,
SyncSecretsData{
c.generateUserSecrets(),
},
}
syncServiceMaster = Action{
SyncService,
SyncServiceData{
Master,
},
}
syncServiceReplica = Action{
SyncService,
SyncServiceData{
Replica,
},
}
syncVolumes = Action{
SyncVolumes,
SyncVolumesData{
c.Spec.Volume,
},
}
return []Action{
syncSecrets,
syncServiceMaster,
syncServiceReplica,
syncVolumes,
}
}
// Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest. // Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest.
// Unlike the update, sync does not error out if some objects do not exist and takes care of creating them. // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them.
func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
syncChannel = make(chan Action)
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -109,7 +150,7 @@ func (c *Cluster) syncServices() error {
return nil return nil
} }
func (c *Cluster) syncService(role PostgresRole) error { func (c *Cluster) syncService(role PostgresRole) ([]Action, error) {
c.setProcessName("syncing %s service", role) c.setProcessName("syncing %s service", role)
svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}) svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{})
@ -118,18 +159,16 @@ func (c *Cluster) syncService(role PostgresRole) error {
desiredSvc := c.generateService(role, &c.Spec) desiredSvc := c.generateService(role, &c.Spec)
match, reason := k8sutil.SameService(svc, desiredSvc) match, reason := k8sutil.SameService(svc, desiredSvc)
if match { if match {
return nil return NoActions, nil
} }
c.logServiceChanges(role, svc, desiredSvc, false, reason)
if err := c.updateService(role, desiredSvc); err != nil { if actions, err := c.updateService(role, desiredSvc); err != nil {
return fmt.Errorf("could not update %s service to match desired state: %v", role, err) return NoActions, fmt.Errorf("could not update %s service to match desired state: %v", role, err)
} }
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
return nil return actions, nil
} else if !k8sutil.ResourceNotFound(err) { } else if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get %s service: %v", role, err) return NoActions, fmt.Errorf("could not get %s service: %v", role, err)
} }
c.Services[role] = nil c.Services[role] = nil

View File

@ -17,6 +17,7 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/cluster"
"github.com/zalando-incubator/postgres-operator/pkg/events"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
@ -263,6 +264,12 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
} }
c.curWorkerCluster.Store(event.WorkerID, cl) c.curWorkerCluster.Store(event.WorkerID, cl)
actions := cl.SyncActions(event.NewSpec)
for i, processor := range c.changeProcessors {
processor(changes)
}
if err := cl.Sync(event.NewSpec); err != nil { if err := cl.Sync(event.NewSpec); err != nil {
cl.Error = fmt.Errorf("could not sync cluster: %v", err) cl.Error = fmt.Errorf("could not sync cluster: %v", err)
lg.Error(cl.Error) lg.Error(cl.Error)