skip creation after new cluster object
This commit is contained in:
parent
7e6e316c3b
commit
0691ce8255
|
|
@ -183,7 +183,7 @@ func (c *Cluster) GetReference() *v1.ObjectReference {
|
||||||
|
|
||||||
// SetStatus of Postgres cluster
|
// SetStatus of Postgres cluster
|
||||||
// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above
|
// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above
|
||||||
func (c *Cluster) setStatus(status string) {
|
func (c *Cluster) SetStatus(status string) {
|
||||||
var pgStatus acidv1.PostgresStatus
|
var pgStatus acidv1.PostgresStatus
|
||||||
pgStatus.PostgresClusterStatus = status
|
pgStatus.PostgresClusterStatus = status
|
||||||
|
|
||||||
|
|
@ -257,13 +257,13 @@ func (c *Cluster) Create() error {
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.setStatus(acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
|
c.SetStatus(acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
|
||||||
} else {
|
} else {
|
||||||
c.setStatus(acidv1.ClusterStatusAddFailed)
|
c.SetStatus(acidv1.ClusterStatusAddFailed)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
c.setStatus(acidv1.ClusterStatusCreating)
|
c.SetStatus(acidv1.ClusterStatusCreating)
|
||||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
|
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
|
||||||
|
|
||||||
if err = c.enforceMinResourceLimits(&c.Spec); err != nil {
|
if err = c.enforceMinResourceLimits(&c.Spec); err != nil {
|
||||||
|
|
@ -630,14 +630,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
c.setStatus(acidv1.ClusterStatusUpdating)
|
c.SetStatus(acidv1.ClusterStatusUpdating)
|
||||||
c.setSpec(newSpec)
|
c.setSpec(newSpec)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if updateFailed {
|
if updateFailed {
|
||||||
c.setStatus(acidv1.ClusterStatusUpdateFailed)
|
c.SetStatus(acidv1.ClusterStatusUpdateFailed)
|
||||||
} else {
|
} else {
|
||||||
c.setStatus(acidv1.ClusterStatusRunning)
|
c.SetStatus(acidv1.ClusterStatusRunning)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,9 +32,9 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warningf("error while syncing cluster state: %v", err)
|
c.logger.Warningf("error while syncing cluster state: %v", err)
|
||||||
c.setStatus(acidv1.ClusterStatusSyncFailed)
|
c.SetStatus(acidv1.ClusterStatusSyncFailed)
|
||||||
} else if !c.Status.Running() {
|
} else if !c.Status.Running() {
|
||||||
c.setStatus(acidv1.ClusterStatusRunning)
|
c.SetStatus(acidv1.ClusterStatusRunning)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,9 +14,7 @@ import (
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/reference"
|
|
||||||
|
|
||||||
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
||||||
"github.com/zalando/postgres-operator/pkg/cluster"
|
"github.com/zalando/postgres-operator/pkg/cluster"
|
||||||
|
|
@ -236,6 +234,15 @@ func (c *Controller) processEvent(event ClusterEvent) {
|
||||||
|
|
||||||
c.curWorkerCluster.Store(event.WorkerID, cl)
|
c.curWorkerCluster.Store(event.WorkerID, cl)
|
||||||
|
|
||||||
|
// if there are already issues skip creation
|
||||||
|
if cl.Error != "" {
|
||||||
|
cl.SetStatus(acidv1.ClusterStatusInvalid)
|
||||||
|
lg.Errorf("could not create cluster: %v", cl.Error)
|
||||||
|
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err := cl.Create(); err != nil {
|
if err := cl.Create(); err != nil {
|
||||||
cl.Error = fmt.Sprintf("could not create cluster: %v", err)
|
cl.Error = fmt.Sprintf("could not create cluster: %v", err)
|
||||||
lg.Error(cl.Error)
|
lg.Error(cl.Error)
|
||||||
|
|
@ -422,16 +429,10 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
||||||
clusterError = informerNewSpec.Error
|
clusterError = informerNewSpec.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
workerID := c.clusterWorkerID(clusterName)
|
if clusterError != "" && eventType != EventDelete && eventType != EventAdd {
|
||||||
lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName)
|
c.logger.
|
||||||
|
WithField("cluster-name", clusterName).
|
||||||
if clusterError != "" && eventType != EventDelete {
|
Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
|
||||||
lg.Errorf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
|
|
||||||
ref, err := reference.GetReference(scheme.Scheme, informerNewSpec)
|
|
||||||
if err != nil {
|
|
||||||
lg.Errorf("could not get reference for Postgresql CR %v/%v: %v", informerNewSpec.Namespace, informerNewSpec.Name, err)
|
|
||||||
}
|
|
||||||
c.eventRecorder.Eventf(ref, v1.EventTypeWarning, strings.Title(strings.ToLower(string(eventType))), "%v", clusterError)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -439,6 +440,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
||||||
// in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side
|
// in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side
|
||||||
// effect, the modified state will be returned together with subsequent events).
|
// effect, the modified state will be returned together with subsequent events).
|
||||||
|
|
||||||
|
workerID := c.clusterWorkerID(clusterName)
|
||||||
clusterEvent := ClusterEvent{
|
clusterEvent := ClusterEvent{
|
||||||
EventTime: time.Now(),
|
EventTime: time.Now(),
|
||||||
EventType: eventType,
|
EventType: eventType,
|
||||||
|
|
@ -448,6 +450,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
||||||
WorkerID: workerID,
|
WorkerID: workerID,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName)
|
||||||
if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
|
if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
|
||||||
lg.Errorf("error while queueing cluster event: %v", clusterEvent)
|
lg.Errorf("error while queueing cluster event: %v", clusterEvent)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue