Set status of the cluster on sync fail/success

This commit is contained in:
Murat Kabilov 2017-10-12 15:10:42 +02:00 committed by GitHub
parent cec695d48e
commit 3b32265258
3 changed files with 38 additions and 20 deletions

View File

@ -11,32 +11,45 @@ import (
// Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest. // Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest.
// Unlike the update, sync does not error out if some objects do not exist and takes care of creating them. // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them.
func (c *Cluster) Sync() error { func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
err := c.loadResources() c.Postgresql = *newSpec
err = c.loadResources()
if err != nil { if err != nil {
c.logger.Errorf("could not load resources: %v", err) c.logger.Errorf("could not load resources: %v", err)
} }
defer func() {
if err != nil {
c.setStatus(spec.ClusterStatusSyncFailed)
} else if c.Status != spec.ClusterStatusRunning {
c.setStatus(spec.ClusterStatusRunning)
}
}()
if err = c.initUsers(); err != nil { if err = c.initUsers(); err != nil {
return err err = fmt.Errorf("could not init users: %v", err)
return
} }
c.logger.Debugf("syncing secrets") c.logger.Debugf("syncing secrets")
//TODO: mind the secrets of the deleted/new users //TODO: mind the secrets of the deleted/new users
if err := c.applySecrets(); err != nil { if err = c.applySecrets(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) { if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not sync secrets: %v", err) err = fmt.Errorf("could not sync secrets: %v", err)
return
} }
} }
c.logger.Debugf("syncing endpoints") c.logger.Debugf("syncing endpoints")
if err := c.syncEndpoint(); err != nil { if err = c.syncEndpoint(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) { if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not sync endpoints: %v", err) err = fmt.Errorf("could not sync endpoints: %v", err)
return
} }
} }
@ -45,39 +58,44 @@ func (c *Cluster) Sync() error {
if role == Replica && !c.Spec.ReplicaLoadBalancer { if role == Replica && !c.Spec.ReplicaLoadBalancer {
if c.Services[role] != nil { if c.Services[role] != nil {
// delete the left over replica service // delete the left over replica service
if err := c.deleteService(role); err != nil { if err = c.deleteService(role); err != nil {
return fmt.Errorf("could not delete obsolete %s service: %v", role, err) err = fmt.Errorf("could not delete obsolete %s service: %v", role, err)
return
} }
} }
continue continue
} }
if err := c.syncService(role); err != nil { if err = c.syncService(role); err != nil {
if !k8sutil.ResourceAlreadyExists(err) { if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("coud not sync %s service: %v", role, err) err = fmt.Errorf("coud not sync %s service: %v", role, err)
return
} }
} }
} }
c.logger.Debugf("syncing statefulsets") c.logger.Debugf("syncing statefulsets")
if err := c.syncStatefulSet(); err != nil { if err = c.syncStatefulSet(); err != nil {
if !k8sutil.ResourceAlreadyExists(err) { if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not sync statefulsets: %v", err) err = fmt.Errorf("could not sync statefulsets: %v", err)
return
} }
} }
if !c.databaseAccessDisabled() { if !c.databaseAccessDisabled() {
c.logger.Debugf("syncing roles") c.logger.Debugf("syncing roles")
if err := c.syncRoles(true); err != nil { if err = c.syncRoles(true); err != nil {
return fmt.Errorf("could not sync roles: %v", err) err = fmt.Errorf("could not sync roles: %v", err)
return
} }
} }
c.logger.Debugf("syncing persistent volumes") c.logger.Debugf("syncing persistent volumes")
if err := c.syncVolumes(); err != nil { if err = c.syncVolumes(); err != nil {
return fmt.Errorf("could not sync persistent volumes: %v", err) err = fmt.Errorf("could not sync persistent volumes: %v", err)
return
} }
return nil return
} }
func (c *Cluster) syncService(role PostgresRole) error { func (c *Cluster) syncService(role PostgresRole) error {

View File

@ -57,7 +57,6 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object
err = json.Unmarshal(b, &list) err = json.Unmarshal(b, &list)
if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) {
c.logger.Debugln("skipping resync of clusters")
return &list, err return &list, err
} }
@ -249,7 +248,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
} }
c.curWorkerCluster.Store(event.WorkerID, cl) c.curWorkerCluster.Store(event.WorkerID, cl)
if err := cl.Sync(); err != nil { if err := cl.Sync(event.NewSpec); err != nil {
cl.Error = fmt.Errorf("could not sync cluster: %v", err) cl.Error = fmt.Errorf("could not sync cluster: %v", err)
lg.Error(cl.Error) lg.Error(cl.Error)
return return

View File

@ -68,6 +68,7 @@ const (
ClusterStatusCreating PostgresStatus = "Creating" ClusterStatusCreating PostgresStatus = "Creating"
ClusterStatusUpdating PostgresStatus = "Updating" ClusterStatusUpdating PostgresStatus = "Updating"
ClusterStatusUpdateFailed PostgresStatus = "UpdateFailed" ClusterStatusUpdateFailed PostgresStatus = "UpdateFailed"
ClusterStatusSyncFailed PostgresStatus = "SyncFailed"
ClusterStatusAddFailed PostgresStatus = "CreateFailed" ClusterStatusAddFailed PostgresStatus = "CreateFailed"
ClusterStatusRunning PostgresStatus = "Running" ClusterStatusRunning PostgresStatus = "Running"
ClusterStatusInvalid PostgresStatus = "Invalid" ClusterStatusInvalid PostgresStatus = "Invalid"