diff --git a/pkg/actions.go b/pkg/actions.go new file mode 100644 index 000000000..a2f16d221 --- /dev/null +++ b/pkg/actions.go @@ -0,0 +1,38 @@ +package actions + +import ( + "github.com/zalando-incubator/postgres-operator/pkg/cluster/types" + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "k8s.io/client-go/pkg/api/v1" +) + +type ActionType int + +const ( + UpdateService ActionType = iota + RecreateService + CreateService + DeleteService +) + +var NoActions []Action = []Action{} + +type SyncSecretsData struct { + secrets map[string]*v1.Secret +} + +type ServiceData struct { + name string + role Role + spec *v1.Service +} + +type SyncVolumesData struct { + volumeSpec spec.Volume +} + +type Action struct { + actionType actionType + namespace NamespacedName + data interface{} +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index cc3c4e6f9..895988af4 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -11,6 +11,7 @@ import ( "k8s.io/client-go/pkg/apis/apps/v1beta1" policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" + "github.com/zalando-incubator/postgres-operator/pkg/cluster/sync" "github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" @@ -334,118 +335,75 @@ func (c *Cluster) deleteStatefulSet() error { return nil } -func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { +func (c *Cluster) createService(role PostgresRole) ([]Action, error) { c.setProcessName("creating %v service", role) serviceSpec := c.generateService(role, &c.Spec) - service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec) - if err != nil { - return nil, err - } - - c.Services[role] = service - return service, nil + return []Action{ + Action{ + CreateService, + serviceName.Namespace, + ServiceData{ + spec: serviceSpec, + }, + }, + }, nil } -func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error { - c.setProcessName("updating %v service", role) - +func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) ([]Action, error) { if c.Services[role] == nil { - return fmt.Errorf("there is no service in the cluster") + return NoActions, fmt.Errorf("there is no service in the cluster") } serviceName := util.NameFromMeta(c.Services[role].ObjectMeta) endpointName := util.NameFromMeta(c.Endpoints[role].ObjectMeta) + // TODO: check if it possible to change the service type with a patch in future versions of Kubernetes if newService.Spec.Type != c.Services[role].Spec.Type { // service type has changed, need to replace the service completely. // we cannot use just pach the current service, since it may contain attributes incompatible with the new type. - var ( - currentEndpoint *v1.Endpoints - err error - ) - - if role == Master { - // for the master service we need to re-create the endpoint as well. Get the up-to-date version of - // the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) - currentEndpoint, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("could not get current cluster %s endpoints: %v", role, err) - } - } - err = c.KubeClient.Services(serviceName.Namespace).Delete(serviceName.Name, c.deleteOptions) - if err != nil { - return fmt.Errorf("could not delete service %q: %v", serviceName, err) - } - - c.Endpoints[role] = nil - svc, err := c.KubeClient.Services(serviceName.Namespace).Create(newService) - if err != nil { - return fmt.Errorf("could not create service %q: %v", serviceName, err) - } - - c.Services[role] = svc - if role == Master { - // create the new endpoint using the addresses obtained from the previous one - endpointSpec := c.generateEndpoint(role, currentEndpoint.Subsets) - ep, err := c.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec) - if err != nil { - return fmt.Errorf("could not create endpoint %q: %v", endpointName, err) - } - - c.Endpoints[role] = ep - } - - return nil + return []Action{ + Action{ + RecreateService, + ServiceData{ + name: serviceName.Name, + role: role, + spec: newService, + }, + }, + }, nil } // update the service annotation in order to propagate ELB notation. - if len(newService.ObjectMeta.Annotations) > 0 { - if annotationsPatchData, err := metaAnnotationsPatch(newService.ObjectMeta.Annotations); err == nil { - _, err = c.KubeClient.Services(serviceName.Namespace).Patch( - serviceName.Name, - types.MergePatchType, - []byte(annotationsPatchData), "") - - if err != nil { - return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) - } - } else { - return fmt.Errorf("could not form patch for the service metadata: %v", err) - } - } - patchData, err := specPatch(newService.Spec) if err != nil { - return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) + return NoActions, fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) } - // update the service spec - svc, err := c.KubeClient.Services(serviceName.Namespace).Patch( - serviceName.Name, - types.MergePatchType, - patchData, "") - if err != nil { - return fmt.Errorf("could not patch service %q: %v", serviceName, err) - } - c.Services[role] = svc - - return nil + return []Action{ + Action{ + CreateService, + serviceName.Namespace, + ServiceData{ + name: serviceName.Name, + spec: newService, + }, + }, + }, nil } func (c *Cluster) deleteService(role PostgresRole) error { - c.logger.Debugf("deleting service %s", role) - service := c.Services[role] - if err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions); err != nil { - return err - } - - c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) - c.Services[role] = nil - - return nil + return []Action{ + Action{ + DeleteService, + serviceName.Namespace, + ServiceData{ + name: service.Name, + }, + }, + }, nil } func (c *Cluster) createEndpoint(role PostgresRole) (*v1.Endpoints, error) { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index cf38a6d4f..ef61c0405 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -14,9 +14,50 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" ) +var syncChannel chan Action + +func (c *Cluster) SyncActions(newSpec *spec.Postgresql) (err error) { + syncSecrets = Action{ + SyncSecrets, + SyncSecretsData{ + c.generateUserSecrets(), + }, + } + + syncServiceMaster = Action{ + SyncService, + SyncServiceData{ + Master, + }, + } + + syncServiceReplica = Action{ + SyncService, + SyncServiceData{ + Replica, + }, + } + + syncVolumes = Action{ + SyncVolumes, + SyncVolumesData{ + c.Spec.Volume, + }, + } + + return []Action{ + syncSecrets, + syncServiceMaster, + syncServiceReplica, + syncVolumes, + } +} + // Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest. // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them. func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { + syncChannel = make(chan Action) + c.mu.Lock() defer c.mu.Unlock() @@ -109,7 +150,7 @@ func (c *Cluster) syncServices() error { return nil } -func (c *Cluster) syncService(role PostgresRole) error { +func (c *Cluster) syncService(role PostgresRole) ([]Action, error) { c.setProcessName("syncing %s service", role) svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}) @@ -118,18 +159,16 @@ func (c *Cluster) syncService(role PostgresRole) error { desiredSvc := c.generateService(role, &c.Spec) match, reason := k8sutil.SameService(svc, desiredSvc) if match { - return nil + return NoActions, nil } - c.logServiceChanges(role, svc, desiredSvc, false, reason) - if err := c.updateService(role, desiredSvc); err != nil { - return fmt.Errorf("could not update %s service to match desired state: %v", role, err) + if actions, err := c.updateService(role, desiredSvc); err != nil { + return NoActions, fmt.Errorf("could not update %s service to match desired state: %v", role, err) } - c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) - return nil + return actions, nil } else if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("could not get %s service: %v", role, err) + return NoActions, fmt.Errorf("could not get %s service: %v", role, err) } c.Services[role] = nil diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index cb689b70a..5073ea49b 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -17,6 +17,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/zalando-incubator/postgres-operator/pkg/cluster" + "github.com/zalando-incubator/postgres-operator/pkg/events" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" @@ -263,6 +264,12 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { } c.curWorkerCluster.Store(event.WorkerID, cl) + actions := cl.SyncActions(event.NewSpec) + + for i, processor := range c.changeProcessors { + processor(changes) + } + if err := cl.Sync(event.NewSpec); err != nil { cl.Error = fmt.Errorf("could not sync cluster: %v", err) lg.Error(cl.Error)