skip creation later to improve visibility of errors (#1013)
* try to emit error for missing team name in cluster name * skip creation after new cluster object * move SetStatus to k8sclient and emit event when skipping creation and rename to SetPostgresCRDStatus Co-authored-by: Felix Kunde <felix.kunde@zalando.de>
This commit is contained in:
parent
0e3fb9ec43
commit
0c6655a22d
|
|
@ -102,7 +102,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error {
|
|||
}
|
||||
|
||||
tmp.Error = err.Error()
|
||||
tmp.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid}
|
||||
tmp.Status.PostgresClusterStatus = ClusterStatusInvalid
|
||||
|
||||
*p = Postgresql(tmp)
|
||||
|
||||
|
|
@ -112,10 +112,10 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error {
|
|||
|
||||
if clusterName, err := extractClusterName(tmp2.ObjectMeta.Name, tmp2.Spec.TeamID); err != nil {
|
||||
tmp2.Error = err.Error()
|
||||
tmp2.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid}
|
||||
tmp2.Status.PostgresClusterStatus = ClusterStatusInvalid
|
||||
} else if err := validateCloneClusterDescription(&tmp2.Spec.Clone); err != nil {
|
||||
tmp2.Error = err.Error()
|
||||
tmp2.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid}
|
||||
tmp2.Status.PostgresClusterStatus = ClusterStatusInvalid
|
||||
} else {
|
||||
tmp2.Spec.ClusterName = clusterName
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ package cluster
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"regexp"
|
||||
|
|
@ -181,34 +180,6 @@ func (c *Cluster) GetReference() *v1.ObjectReference {
|
|||
return ref
|
||||
}
|
||||
|
||||
// SetStatus of Postgres cluster
|
||||
// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above
|
||||
func (c *Cluster) setStatus(status string) {
|
||||
var pgStatus acidv1.PostgresStatus
|
||||
pgStatus.PostgresClusterStatus = status
|
||||
|
||||
patch, err := json.Marshal(struct {
|
||||
PgStatus interface{} `json:"status"`
|
||||
}{&pgStatus})
|
||||
|
||||
if err != nil {
|
||||
c.logger.Errorf("could not marshal status: %v", err)
|
||||
}
|
||||
|
||||
// we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ),
|
||||
// however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11)
|
||||
// we should take advantage of it.
|
||||
newspec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(
|
||||
context.TODO(), c.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
|
||||
if err != nil {
|
||||
c.logger.Errorf("could not update status: %v", err)
|
||||
// return as newspec is empty, see PR654
|
||||
return
|
||||
}
|
||||
// update the spec, maintaining the new resourceVersion.
|
||||
c.setSpec(newspec)
|
||||
}
|
||||
|
||||
func (c *Cluster) isNewCluster() bool {
|
||||
return c.Status.Creating()
|
||||
}
|
||||
|
|
@ -257,13 +228,13 @@ func (c *Cluster) Create() error {
|
|||
|
||||
defer func() {
|
||||
if err == nil {
|
||||
c.setStatus(acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
|
||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
|
||||
} else {
|
||||
c.setStatus(acidv1.ClusterStatusAddFailed)
|
||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
|
||||
}
|
||||
}()
|
||||
|
||||
c.setStatus(acidv1.ClusterStatusCreating)
|
||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
|
||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
|
||||
|
||||
if err = c.enforceMinResourceLimits(&c.Spec); err != nil {
|
||||
|
|
@ -630,14 +601,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.setStatus(acidv1.ClusterStatusUpdating)
|
||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)
|
||||
c.setSpec(newSpec)
|
||||
|
||||
defer func() {
|
||||
if updateFailed {
|
||||
c.setStatus(acidv1.ClusterStatusUpdateFailed)
|
||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed)
|
||||
} else {
|
||||
c.setStatus(acidv1.ClusterStatusRunning)
|
||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
|||
|
|
@ -32,9 +32,9 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
|||
defer func() {
|
||||
if err != nil {
|
||||
c.logger.Warningf("error while syncing cluster state: %v", err)
|
||||
c.setStatus(acidv1.ClusterStatusSyncFailed)
|
||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusSyncFailed)
|
||||
} else if !c.Status.Running() {
|
||||
c.setStatus(acidv1.ClusterStatusRunning)
|
||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/tools/reference"
|
||||
)
|
||||
|
||||
// Controller represents operator controller
|
||||
|
|
@ -442,6 +443,16 @@ func (c *Controller) getEffectiveNamespace(namespaceFromEnvironment, namespaceFr
|
|||
return namespace
|
||||
}
|
||||
|
||||
// GetReference of Postgres CR object
|
||||
// i.e. required to emit events to this resource
|
||||
func (c *Controller) GetReference(postgresql *acidv1.Postgresql) *v1.ObjectReference {
|
||||
ref, err := reference.GetReference(scheme.Scheme, postgresql)
|
||||
if err != nil {
|
||||
c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", postgresql.Namespace, postgresql.Name, err)
|
||||
}
|
||||
return ref
|
||||
}
|
||||
|
||||
// hasOwnership returns true if the controller is the "owner" of the postgresql.
|
||||
// Whether it's owner is determined by the value of 'acid.zalan.do/controller'
|
||||
// annotation. If the value matches the controllerID then it owns it, or if the
|
||||
|
|
|
|||
|
|
@ -421,14 +421,25 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
|||
}
|
||||
|
||||
if clusterError != "" && eventType != EventDelete {
|
||||
c.logger.
|
||||
WithField("cluster-name", clusterName).
|
||||
Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
|
||||
c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
|
||||
|
||||
switch eventType {
|
||||
case EventAdd:
|
||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed)
|
||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
|
||||
case EventUpdate:
|
||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed)
|
||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
|
||||
default:
|
||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed)
|
||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Don't pass the spec directly from the informer, since subsequent modifications of it would be reflected
|
||||
// in the informer internal state, making it incohherent with the actual Kubernetes object (and, as a side
|
||||
// in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side
|
||||
// effect, the modified state will be returned together with subsequent events).
|
||||
|
||||
workerID := c.clusterWorkerID(clusterName)
|
||||
|
|
|
|||
|
|
@ -6,10 +6,13 @@ import (
|
|||
"reflect"
|
||||
|
||||
b64 "encoding/base64"
|
||||
"encoding/json"
|
||||
|
||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
||||
clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1"
|
||||
|
||||
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
||||
"github.com/zalando/postgres-operator/pkg/spec"
|
||||
apiappsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policybeta1 "k8s.io/api/policy/v1beta1"
|
||||
|
|
@ -156,6 +159,33 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
|
|||
return kubeClient, nil
|
||||
}
|
||||
|
||||
// SetPostgresCRDStatus of Postgres cluster
|
||||
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, status string) (*acidv1.Postgresql, error) {
|
||||
var pg *acidv1.Postgresql
|
||||
var pgStatus acidv1.PostgresStatus
|
||||
pgStatus.PostgresClusterStatus = status
|
||||
|
||||
patch, err := json.Marshal(struct {
|
||||
PgStatus interface{} `json:"status"`
|
||||
}{&pgStatus})
|
||||
|
||||
if err != nil {
|
||||
return pg, fmt.Errorf("could not marshal status: %v", err)
|
||||
}
|
||||
|
||||
// we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ),
|
||||
// however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11)
|
||||
// we should take advantage of it.
|
||||
pg, err = client.AcidV1ClientSet.AcidV1().Postgresqls(clusterName.Namespace).Patch(
|
||||
context.TODO(), clusterName.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
|
||||
if err != nil {
|
||||
return pg, fmt.Errorf("could not update status: %v", err)
|
||||
}
|
||||
|
||||
// update the spec, maintaining the new resourceVersion.
|
||||
return pg, nil
|
||||
}
|
||||
|
||||
// SameService compares the Services
|
||||
func SameService(cur, new *v1.Service) (match bool, reason string) {
|
||||
//TODO: improve comparison
|
||||
|
|
|
|||
Loading…
Reference in New Issue