Run sync cluster when previous add failed. (#28)
This commit is contained in:
parent
afce38f6f0
commit
bc17897478
|
|
@ -49,8 +49,6 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e
|
|||
continue
|
||||
}
|
||||
c.queueClusterEvent(nil, pg, spec.EventSync)
|
||||
|
||||
c.logger.Debugf("Sync of the '%s' cluster has been queued", util.NameFromMeta(pg.Metadata))
|
||||
activeClustersCnt++
|
||||
}
|
||||
if len(objList) > 0 {
|
||||
|
|
@ -73,7 +71,6 @@ func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface,
|
|||
RequestURI(fmt.Sprintf(constants.WatchClustersURITemplate, c.opConfig.Namespace)).
|
||||
VersionedParams(&options, api.ParameterCodec).
|
||||
FieldsSelectorParam(fields.Everything())
|
||||
|
||||
return req.Watch()
|
||||
}
|
||||
|
||||
|
|
@ -115,7 +112,9 @@ func (c *Controller) processEvent(obj interface{}) error {
|
|||
c.clustersMu.Unlock()
|
||||
|
||||
if err := cl.Create(stopCh); err != nil {
|
||||
logger.Errorf("could not create cluster: %s", err)
|
||||
cl.Error = fmt.Errorf("could not create cluster: %v", err)
|
||||
logger.Errorf("%v", cl.Error)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -128,7 +127,9 @@ func (c *Controller) processEvent(obj interface{}) error {
|
|||
return nil
|
||||
}
|
||||
if err := cl.Update(event.NewSpec); err != nil {
|
||||
logger.Errorf("could not update cluster: %s", err)
|
||||
cl.Error = fmt.Errorf("could not update cluster: %s", err)
|
||||
logger.Errorf("%v", cl.Error)
|
||||
|
||||
return nil
|
||||
}
|
||||
logger.Infof("Cluster '%s' has been updated", clusterName)
|
||||
|
|
@ -166,7 +167,8 @@ func (c *Controller) processEvent(obj interface{}) error {
|
|||
}
|
||||
|
||||
if err := cl.Sync(stopCh); err != nil {
|
||||
logger.Errorf("could not sync cluster '%s': %s", clusterName, err)
|
||||
cl.Error = fmt.Errorf("could not sync cluster '%s': %s", clusterName, err)
|
||||
logger.Errorf("%v", cl)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -192,8 +194,8 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
|
|||
if old != nil { //update, delete
|
||||
uid = old.Metadata.GetUID()
|
||||
clusterName = util.NameFromMeta(old.Metadata)
|
||||
if eventType == spec.EventUpdate && new.Error == nil && old != nil {
|
||||
eventType = spec.EventAdd
|
||||
if eventType == spec.EventUpdate && new.Error == nil && old.Error != nil {
|
||||
eventType = spec.EventSync
|
||||
clusterError = new.Error
|
||||
} else {
|
||||
clusterError = old.Error
|
||||
|
|
@ -220,7 +222,7 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
|
|||
//TODO: if we delete cluster, discard all the previous events for the cluster
|
||||
|
||||
c.clusterEventQueues[workerID].Add(clusterEvent)
|
||||
c.logger.WithField("worker", workerID).Infof("%s of the '%s' cluster has been queued for", eventType, clusterName)
|
||||
c.logger.WithField("worker", workerID).Infof("%s of the '%s' cluster has been queued", eventType, clusterName)
|
||||
}
|
||||
|
||||
func (c *Controller) postgresqlAdd(obj interface{}) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue