Merge branch 'master' into pg-18
This commit is contained in:
commit
f47d3f7738
|
|
@ -32,6 +32,7 @@ import (
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
policyv1 "k8s.io/api/policy/v1"
|
policyv1 "k8s.io/api/policy/v1"
|
||||||
rbacv1 "k8s.io/api/rbac/v1"
|
rbacv1 "k8s.io/api/rbac/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/equality"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
|
|
@ -271,26 +272,29 @@ func (c *Cluster) Create() (err error) {
|
||||||
)
|
)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
var (
|
currentStatus := c.Status.DeepCopy()
|
||||||
pgUpdatedStatus *acidv1.Postgresql
|
pg := c.Postgresql.DeepCopy()
|
||||||
errStatus error
|
pg.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning
|
||||||
)
|
|
||||||
if err == nil {
|
if err != nil {
|
||||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
|
|
||||||
} else {
|
|
||||||
c.logger.Warningf("cluster created failed: %v", err)
|
c.logger.Warningf("cluster created failed: %v", err)
|
||||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
|
pg.Status.PostgresClusterStatus = acidv1.ClusterStatusAddFailed
|
||||||
}
|
}
|
||||||
if errStatus != nil {
|
|
||||||
c.logger.Warningf("could not set cluster status: %v", errStatus)
|
if !equality.Semantic.DeepEqual(currentStatus, pg.Status) {
|
||||||
return
|
pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pg)
|
||||||
}
|
if err != nil {
|
||||||
if pgUpdatedStatus != nil {
|
c.logger.Warningf("could not set cluster status: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
c.setSpec(pgUpdatedStatus)
|
c.setSpec(pgUpdatedStatus)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
|
pg := c.Postgresql.DeepCopy()
|
||||||
|
pg.Status.PostgresClusterStatus = acidv1.ClusterStatusCreating
|
||||||
|
|
||||||
|
pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not set cluster status: %v", err)
|
return fmt.Errorf("could not set cluster status: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -978,7 +982,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)
|
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdating
|
||||||
|
|
||||||
|
newSpec, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not set cluster status to updating: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
|
if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
|
||||||
// do not apply any major version related changes yet
|
// do not apply any major version related changes yet
|
||||||
|
|
@ -987,20 +996,19 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
c.setSpec(newSpec)
|
c.setSpec(newSpec)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
var (
|
currentStatus := newSpec.Status.DeepCopy()
|
||||||
pgUpdatedStatus *acidv1.Postgresql
|
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning
|
||||||
err error
|
|
||||||
)
|
|
||||||
if updateFailed {
|
if updateFailed {
|
||||||
pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed)
|
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed
|
||||||
} else {
|
|
||||||
pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
|
||||||
c.logger.Warningf("could not set cluster status: %v", err)
|
if !equality.Semantic.DeepEqual(currentStatus, newSpec.Status) {
|
||||||
return
|
pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec)
|
||||||
}
|
if err != nil {
|
||||||
if pgUpdatedStatus != nil {
|
c.logger.Warningf("could not set cluster status: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
c.setSpec(pgUpdatedStatus)
|
c.setSpec(pgUpdatedStatus)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import (
|
||||||
batchv1 "k8s.io/api/batch/v1"
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
policyv1 "k8s.io/api/policy/v1"
|
policyv1 "k8s.io/api/policy/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/equality"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
@ -43,21 +44,19 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
||||||
c.setSpec(newSpec)
|
c.setSpec(newSpec)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
var (
|
|
||||||
pgUpdatedStatus *acidv1.Postgresql
|
|
||||||
errStatus error
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warningf("error while syncing cluster state: %v", err)
|
c.logger.Warningf("error while syncing cluster state: %v", err)
|
||||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusSyncFailed)
|
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed
|
||||||
} else if !c.Status.Running() {
|
} else if !c.Status.Running() {
|
||||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
|
newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning
|
||||||
}
|
}
|
||||||
if errStatus != nil {
|
|
||||||
c.logger.Warningf("could not set cluster status: %v", errStatus)
|
if !equality.Semantic.DeepEqual(oldSpec.Status, newSpec.Status) {
|
||||||
return
|
pgUpdatedStatus, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec)
|
||||||
}
|
if err != nil {
|
||||||
if pgUpdatedStatus != nil {
|
c.logger.Warningf("could not set cluster status: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
c.setSpec(pgUpdatedStatus)
|
c.setSpec(pgUpdatedStatus)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
||||||
|
|
@ -288,6 +288,12 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add postgresql cluster to fake client
|
||||||
|
_, err := client.PostgresqlsGetter.Postgresqls(namespace).Create(context.TODO(), &pg, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
cluster := New(
|
cluster := New(
|
||||||
Config{
|
Config{
|
||||||
OpConfig: config.Config{
|
OpConfig: config.Config{
|
||||||
|
|
@ -321,7 +327,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
|
||||||
}, client, pg, logger, eventRecorder)
|
}, client, pg, logger, eventRecorder)
|
||||||
cluster.Name = clusterName
|
cluster.Name = clusterName
|
||||||
cluster.Namespace = namespace
|
cluster.Namespace = namespace
|
||||||
_, err := cluster.createStatefulSet()
|
_, err = cluster.createStatefulSet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -161,7 +161,8 @@ func (c *Controller) acquireInitialListOfClusters() error {
|
||||||
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
|
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
|
||||||
if c.opConfig.EnableTeamIdClusternamePrefix {
|
if c.opConfig.EnableTeamIdClusternamePrefix {
|
||||||
if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
|
if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
|
||||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusInvalid)
|
pgSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusInvalid
|
||||||
|
c.KubeClient.SetPostgresCRDStatus(clusterName, pgSpec)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -470,13 +471,25 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
||||||
|
|
||||||
switch eventType {
|
switch eventType {
|
||||||
case EventAdd:
|
case EventAdd:
|
||||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed)
|
informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusAddFailed
|
||||||
|
_, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
|
||||||
|
}
|
||||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
|
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
|
||||||
case EventUpdate:
|
case EventUpdate:
|
||||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed)
|
informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdateFailed
|
||||||
|
_, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
|
||||||
|
}
|
||||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
|
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
|
||||||
default:
|
default:
|
||||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed)
|
informerNewSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed
|
||||||
|
_, err := c.KubeClient.SetPostgresCRDStatus(clusterName, informerNewSpec)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.WithField("cluster-name", clusterName).Errorf("could not set PostgresCRD status: %v", err)
|
||||||
|
}
|
||||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
|
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -191,24 +191,8 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetPostgresCRDStatus of Postgres cluster
|
// SetPostgresCRDStatus of Postgres cluster
|
||||||
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, status string) (*apiacidv1.Postgresql, error) {
|
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql) (*apiacidv1.Postgresql, error) {
|
||||||
var pg *apiacidv1.Postgresql
|
pg, err := client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).UpdateStatus(context.TODO(), pg, metav1.UpdateOptions{})
|
||||||
var pgStatus apiacidv1.PostgresStatus
|
|
||||||
pgStatus.PostgresClusterStatus = status
|
|
||||||
|
|
||||||
patch, err := json.Marshal(struct {
|
|
||||||
PgStatus interface{} `json:"status"`
|
|
||||||
}{&pgStatus})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return pg, fmt.Errorf("could not marshal status: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ),
|
|
||||||
// however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11)
|
|
||||||
// we should take advantage of it.
|
|
||||||
pg, err = client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Patch(
|
|
||||||
context.TODO(), clusterName.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pg, fmt.Errorf("could not update status: %v", err)
|
return pg, fmt.Errorf("could not update status: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue