Use UpdateStatus instead of patch (#3005)
Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de> Co-authored-by: Felix Kunde <felix-kunde@gmx.de>
This commit is contained in:
parent
b97de5d7f1
commit
f05150a81e
|
|
@ -32,6 +32,7 @@ import (
|
|||
v1 "k8s.io/api/core/v1"
|
||||
policyv1 "k8s.io/api/policy/v1"
|
||||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/rest"
|
||||
|
|
@ -271,26 +272,29 @@ func (c *Cluster) Create() (err error) {
|
|||
)
|
||||
|
||||
defer func() {
|
||||
var (
|
||||
pgUpdatedStatus *acidv1.Postgresql
|
||||
errStatus error
|
||||
)
|
||||
if err == nil {
|
||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
|
||||
} else {
|
||||
currentStatus := c.Status.DeepCopy()
|
||||
pg := c.Postgresql.DeepCopy()
|
||||
pg.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning
|
||||
|
||||
if err != nil {
|
||||
c.logger.Warningf("cluster created failed: %v", err)
|
||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
|
||||
pg.Status.PostgresClusterStatus = acidv1.ClusterStatusAddFailed
|
||||
}
|
||||
if errStatus != nil {
|
||||
c.logger.Warningf("could not set cluster status: %v", errStatus)
|
||||
return
|
||||
}
|
||||
if pgUpdatedStatus != nil {
|
||||
|
||||
if !equality.Semantic.DeepEqual(currentStatus, pg.Status) {
|
||||
pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pg)
|
||||
if err != nil {
|
||||
c.logger.Warningf("could not set cluster status: %v", err)
|
||||
return
|
||||
}
|
||||
c.setSpec(pgUpdatedStatus)
|
||||
}
|
||||
}()
|
||||
|
||||
pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
|
||||
pg := c.Postgresql.DeepCopy()
|
||||
pg.Status.PostgresClusterStatus = acidv1.ClusterStatusCreating
|
||||
|
||||
pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not set cluster status: %v", err)
|
||||
}
|
||||
|
|
@ -978,7 +982,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)
|
||||
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdating
|
||||
|
||||
newSpec, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not set cluster status to updating: %w", err)
|
||||
}
|
||||
|
||||
if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
|
||||
// do not apply any major version related changes yet
|
||||
|
|
@ -987,20 +996,19 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
c.setSpec(newSpec)
|
||||
|
||||
defer func() {
|
||||
var (
|
||||
pgUpdatedStatus *acidv1.Postgresql
|
||||
err error
|
||||
)
|
||||
currentStatus := newSpec.Status.DeepCopy()
|
||||
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning
|
||||
|
||||
if updateFailed {
|
||||
pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed)
|
||||
} else {
|
||||
pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
|
||||
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed
|
||||
}
|
||||
if err != nil {
|
||||
c.logger.Warningf("could not set cluster status: %v", err)
|
||||
return
|
||||
}
|
||||
if pgUpdatedStatus != nil {
|
||||
|
||||
if !equality.Semantic.DeepEqual(currentStatus, newSpec.Status) {
|
||||
pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec)
|
||||
if err != nil {
|
||||
c.logger.Warningf("could not set cluster status: %v", err)
|
||||
return
|
||||
}
|
||||
c.setSpec(pgUpdatedStatus)
|
||||
}
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import (
|
|||
batchv1 "k8s.io/api/batch/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policyv1 "k8s.io/api/policy/v1"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
|
@ -43,21 +44,19 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
|||
c.setSpec(newSpec)
|
||||
|
||||
defer func() {
|
||||
var (
|
||||
pgUpdatedStatus *acidv1.Postgresql
|
||||
errStatus error
|
||||
)
|
||||
if err != nil {
|
||||
c.logger.Warningf("error while syncing cluster state: %v", err)
|
||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusSyncFailed)
|
||||
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed
|
||||
} else if !c.Status.Running() {
|
||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
|
||||
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning
|
||||
}
|
||||
if errStatus != nil {
|
||||
c.logger.Warningf("could not set cluster status: %v", errStatus)
|
||||
return
|
||||
}
|
||||
if pgUpdatedStatus != nil {
|
||||
|
||||
if !equality.Semantic.DeepEqual(oldSpec.Status, newSpec.Status) {
|
||||
pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec)
|
||||
if err != nil {
|
||||
c.logger.Warningf("could not set cluster status: %v", err)
|
||||
return
|
||||
}
|
||||
c.setSpec(pgUpdatedStatus)
|
||||
}
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -288,6 +288,12 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
|
|||
},
|
||||
}
|
||||
|
||||
// add postgresql cluster to fake client
|
||||
_, err := client.PostgresqlsGetter.Postgresqls(namespace).Create(context.TODO(), &pg, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cluster := New(
|
||||
Config{
|
||||
OpConfig: config.Config{
|
||||
|
|
@ -321,7 +327,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
|
|||
}, client, pg, logger, eventRecorder)
|
||||
cluster.Name = clusterName
|
||||
cluster.Namespace = namespace
|
||||
_, err := cluster.createStatefulSet()
|
||||
_, err = cluster.createStatefulSet()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -161,7 +161,8 @@ func (c *Controller) acquireInitialListOfClusters() error {
|
|||
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
|
||||
if c.opConfig.EnableTeamIdClusternamePrefix {
|
||||
if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
|
||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusInvalid)
|
||||
pgSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusInvalid
|
||||
c.KubeClient.SetPostgresCRDStatus(clusterName, pgSpec)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
|
@ -470,13 +471,25 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
|||
|
||||
switch eventType {
|
||||
case EventAdd:
|
||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed)
|
||||
informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusAddFailed
|
||||
_, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
|
||||
if err != nil {
|
||||
c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
|
||||
}
|
||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
|
||||
case EventUpdate:
|
||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed)
|
||||
informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed
|
||||
_, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
|
||||
if err != nil {
|
||||
c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
|
||||
}
|
||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
|
||||
default:
|
||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed)
|
||||
informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed
|
||||
_, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
|
||||
if err != nil {
|
||||
c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
|
||||
}
|
||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -191,24 +191,8 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
|
|||
}
|
||||
|
||||
// SetPostgresCRDStatus of Postgres cluster
|
||||
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, status string) (*apiacidv1.Postgresql, error) {
|
||||
var pg *apiacidv1.Postgresql
|
||||
var pgStatus apiacidv1.PostgresStatus
|
||||
pgStatus.PostgresClusterStatus = status
|
||||
|
||||
patch, err := json.Marshal(struct {
|
||||
PgStatus interface{} `json:"status"`
|
||||
}{&pgStatus})
|
||||
|
||||
if err != nil {
|
||||
return pg, fmt.Errorf("could not marshal status: %v", err)
|
||||
}
|
||||
|
||||
// we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ),
|
||||
// however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11)
|
||||
// we should take advantage of it.
|
||||
pg, err = client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Patch(
|
||||
context.TODO(), clusterName.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
|
||||
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql) (*apiacidv1.Postgresql, error) {
|
||||
pg, err := client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).UpdateStatus(context.TODO(), pg, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return pg, fmt.Errorf("could not update status: %v", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue