move SetStatus to k8sclient and emit event when skipping creation and rename to SetPostgresCRDStatus

This commit is contained in:
Felix Kunde 2020-06-15 17:40:53 +02:00
parent 0691ce8255
commit 09c7e7a843
5 changed files with 64 additions and 50 deletions

View File

@ -5,7 +5,6 @@ package cluster
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"fmt" "fmt"
"reflect" "reflect"
"regexp" "regexp"
@ -181,34 +180,6 @@ func (c *Cluster) GetReference() *v1.ObjectReference {
return ref return ref
} }
// SetStatus of Postgres cluster
// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above
func (c *Cluster) SetStatus(status string) {
var pgStatus acidv1.PostgresStatus
pgStatus.PostgresClusterStatus = status
patch, err := json.Marshal(struct {
PgStatus interface{} `json:"status"`
}{&pgStatus})
if err != nil {
c.logger.Errorf("could not marshal status: %v", err)
}
// we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ),
// however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11)
// we should take advantage of it.
newspec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(
context.TODO(), c.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
if err != nil {
c.logger.Errorf("could not update status: %v", err)
// return as newspec is empty, see PR654
return
}
// update the spec, maintaining the new resourceVersion.
c.setSpec(newspec)
}
func (c *Cluster) isNewCluster() bool { func (c *Cluster) isNewCluster() bool {
return c.Status.Creating() return c.Status.Creating()
} }
@ -257,13 +228,13 @@ func (c *Cluster) Create() error {
defer func() { defer func() {
if err == nil { if err == nil {
c.SetStatus(acidv1.ClusterStatusRunning) //TODO: are you sure it's running? c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
} else { } else {
c.SetStatus(acidv1.ClusterStatusAddFailed) c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
} }
}() }()
c.SetStatus(acidv1.ClusterStatusCreating) c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
if err = c.enforceMinResourceLimits(&c.Spec); err != nil { if err = c.enforceMinResourceLimits(&c.Spec); err != nil {
@ -630,14 +601,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.SetStatus(acidv1.ClusterStatusUpdating) c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)
c.setSpec(newSpec) c.setSpec(newSpec)
defer func() { defer func() {
if updateFailed { if updateFailed {
c.SetStatus(acidv1.ClusterStatusUpdateFailed) c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed)
} else { } else {
c.SetStatus(acidv1.ClusterStatusRunning) c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
} }
}() }()

View File

@ -32,9 +32,9 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
defer func() { defer func() {
if err != nil { if err != nil {
c.logger.Warningf("error while syncing cluster state: %v", err) c.logger.Warningf("error while syncing cluster state: %v", err)
c.SetStatus(acidv1.ClusterStatusSyncFailed) c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusSyncFailed)
} else if !c.Status.Running() { } else if !c.Status.Running() {
c.SetStatus(acidv1.ClusterStatusRunning) c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
} }
}() }()

View File

@ -25,6 +25,7 @@ import (
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
) )
// Controller represents operator controller // Controller represents operator controller
@ -442,6 +443,16 @@ func (c *Controller) getEffectiveNamespace(namespaceFromEnvironment, namespaceFr
return namespace return namespace
} }
// GetReference of Postgres CR object
// i.e. required to emit events to this resource
func (c *Controller) GetReference(postgresql *acidv1.Postgresql) *v1.ObjectReference {
ref, err := reference.GetReference(scheme.Scheme, postgresql)
if err != nil {
c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", postgresql.Namespace, postgresql.Name, err)
}
return ref
}
// hasOwnership returns true if the controller is the "owner" of the postgresql. // hasOwnership returns true if the controller is the "owner" of the postgresql.
// Whether it's owner is determined by the value of 'acid.zalan.do/controller' // Whether it's owner is determined by the value of 'acid.zalan.do/controller'
// annotation. If the value matches the controllerID then it owns it, or if the // annotation. If the value matches the controllerID then it owns it, or if the

View File

@ -234,15 +234,6 @@ func (c *Controller) processEvent(event ClusterEvent) {
c.curWorkerCluster.Store(event.WorkerID, cl) c.curWorkerCluster.Store(event.WorkerID, cl)
// if there are already issues skip creation
if cl.Error != "" {
cl.SetStatus(acidv1.ClusterStatusInvalid)
lg.Errorf("could not create cluster: %v", cl.Error)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error)
return
}
if err := cl.Create(); err != nil { if err := cl.Create(); err != nil {
cl.Error = fmt.Sprintf("could not create cluster: %v", err) cl.Error = fmt.Sprintf("could not create cluster: %v", err)
lg.Error(cl.Error) lg.Error(cl.Error)
@ -429,10 +420,21 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
clusterError = informerNewSpec.Error clusterError = informerNewSpec.Error
} }
if clusterError != "" && eventType != EventDelete && eventType != EventAdd { if clusterError != "" && eventType != EventDelete {
c.logger. c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
WithField("cluster-name", clusterName).
Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError) switch eventType {
case EventAdd:
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed)
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
case EventUpdate:
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed)
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
default:
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed)
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
}
return return
} }

View File

@ -6,10 +6,13 @@ import (
"reflect" "reflect"
b64 "encoding/base64" b64 "encoding/base64"
"encoding/json"
batchv1beta1 "k8s.io/api/batch/v1beta1" batchv1beta1 "k8s.io/api/batch/v1beta1"
clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1" clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec"
apiappsv1 "k8s.io/api/apps/v1" apiappsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policybeta1 "k8s.io/api/policy/v1beta1" policybeta1 "k8s.io/api/policy/v1beta1"
@ -156,6 +159,33 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
return kubeClient, nil return kubeClient, nil
} }
// SetPostgresCRDStatus of Postgres cluster
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, status string) (*acidv1.Postgresql, error) {
var pg *acidv1.Postgresql
var pgStatus acidv1.PostgresStatus
pgStatus.PostgresClusterStatus = status
patch, err := json.Marshal(struct {
PgStatus interface{} `json:"status"`
}{&pgStatus})
if err != nil {
return pg, fmt.Errorf("could not marshal status: %v", err)
}
// we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ),
// however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11)
// we should take advantage of it.
pg, err = client.AcidV1ClientSet.AcidV1().Postgresqls(clusterName.Namespace).Patch(
context.TODO(), clusterName.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
if err != nil {
return pg, fmt.Errorf("could not update status: %v", err)
}
// update the spec, maintaining the new resourceVersion.
return pg, nil
}
// SameService compares the Services // SameService compares the Services
func SameService(cur, new *v1.Service) (match bool, reason string) { func SameService(cur, new *v1.Service) (match bool, reason string) {
//TODO: improve comparison //TODO: improve comparison