Merge 24724dc4ed into 1af4c50ed0
This commit is contained in:
commit
faf21a5674
|
|
@ -2,6 +2,8 @@ apiVersion: "acid.zalan.do/v1"
|
||||||
kind: postgresql
|
kind: postgresql
|
||||||
metadata:
|
metadata:
|
||||||
name: acid-minimal-cluster
|
name: acid-minimal-cluster
|
||||||
|
labels:
|
||||||
|
cluster-name: acid-minimal-cluster
|
||||||
spec:
|
spec:
|
||||||
teamId: "acid"
|
teamId: "acid"
|
||||||
volume:
|
volume:
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,10 @@ spec:
|
||||||
storage: true
|
storage: true
|
||||||
subresources:
|
subresources:
|
||||||
status: {}
|
status: {}
|
||||||
|
scale:
|
||||||
|
specReplicasPath: .spec.numberOfInstances
|
||||||
|
statusReplicasPath: .status.numberOfInstances
|
||||||
|
labelSelectorPath: .status.labelSelector
|
||||||
additionalPrinterColumns:
|
additionalPrinterColumns:
|
||||||
- name: Team
|
- name: Team
|
||||||
type: string
|
type: string
|
||||||
|
|
@ -51,7 +55,7 @@ spec:
|
||||||
- name: Status
|
- name: Status
|
||||||
type: string
|
type: string
|
||||||
description: Current sync status of postgresql resource
|
description: Current sync status of postgresql resource
|
||||||
jsonPath: .status.PostgresClusterStatus
|
jsonPath: .status.postgresClusterStatus
|
||||||
schema:
|
schema:
|
||||||
openAPIV3Schema:
|
openAPIV3Schema:
|
||||||
type: object
|
type: object
|
||||||
|
|
@ -684,5 +688,30 @@ spec:
|
||||||
type: integer
|
type: integer
|
||||||
status:
|
status:
|
||||||
type: object
|
type: object
|
||||||
additionalProperties:
|
properties:
|
||||||
|
postgresClusterStatus:
|
||||||
|
type: string
|
||||||
|
numberOfInstances:
|
||||||
|
format: int32
|
||||||
|
type: integer
|
||||||
|
labelSelector:
|
||||||
|
type: string
|
||||||
|
observedGeneration:
|
||||||
|
format: int64
|
||||||
|
type: integer
|
||||||
|
conditions:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
type:
|
||||||
|
type: string
|
||||||
|
status:
|
||||||
|
type: string
|
||||||
|
lastTransitionTime:
|
||||||
|
type: string
|
||||||
|
format: date-time
|
||||||
|
reason:
|
||||||
|
type: string
|
||||||
|
message:
|
||||||
type: string
|
type: string
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,10 @@ const (
|
||||||
OperatorConfigCRDResourceList = OperatorConfigCRDResouceKind + "List"
|
OperatorConfigCRDResourceList = OperatorConfigCRDResouceKind + "List"
|
||||||
OperatorConfigCRDResourceName = OperatorConfigCRDResourcePlural + "." + acidzalando.GroupName
|
OperatorConfigCRDResourceName = OperatorConfigCRDResourcePlural + "." + acidzalando.GroupName
|
||||||
OperatorConfigCRDResourceShort = "opconfig"
|
OperatorConfigCRDResourceShort = "opconfig"
|
||||||
|
|
||||||
|
specReplicasPath = ".spec.numberOfInstances"
|
||||||
|
statusReplicasPath = ".status.numberOfInstances"
|
||||||
|
labelSelectorPath = ".status.labelSelector"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PostgresCRDResourceColumns definition of AdditionalPrinterColumns for postgresql CRD
|
// PostgresCRDResourceColumns definition of AdditionalPrinterColumns for postgresql CRD
|
||||||
|
|
@ -72,7 +76,7 @@ var PostgresCRDResourceColumns = []apiextv1.CustomResourceColumnDefinition{
|
||||||
Name: "Status",
|
Name: "Status",
|
||||||
Type: "string",
|
Type: "string",
|
||||||
Description: "Current sync status of postgresql resource",
|
Description: "Current sync status of postgresql resource",
|
||||||
JSONPath: ".status.PostgresClusterStatus",
|
JSONPath: ".status.postgresClusterStatus",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1103,10 +1107,47 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
|
||||||
},
|
},
|
||||||
"status": {
|
"status": {
|
||||||
Type: "object",
|
Type: "object",
|
||||||
AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{
|
Properties: map[string]apiextv1.JSONSchemaProps{
|
||||||
Schema: &apiextv1.JSONSchemaProps{
|
"postgresClusterStatus": {
|
||||||
Type: "string",
|
Type: "string",
|
||||||
},
|
},
|
||||||
|
"numberOfInstances": {
|
||||||
|
Type: "integer",
|
||||||
|
Format: "int32",
|
||||||
|
},
|
||||||
|
"labelSelector": {
|
||||||
|
Type: "string",
|
||||||
|
},
|
||||||
|
"observedGeneration": {
|
||||||
|
Type: "integer",
|
||||||
|
Format: "int64",
|
||||||
|
},
|
||||||
|
"conditions": {
|
||||||
|
Type: "array",
|
||||||
|
Items: &apiextv1.JSONSchemaPropsOrArray{
|
||||||
|
Schema: &apiextv1.JSONSchemaProps{
|
||||||
|
Type: "object",
|
||||||
|
Properties: map[string]apiextv1.JSONSchemaProps{
|
||||||
|
"type": {
|
||||||
|
Type: "string",
|
||||||
|
},
|
||||||
|
"status": {
|
||||||
|
Type: "string",
|
||||||
|
},
|
||||||
|
"lastTransitionTime": {
|
||||||
|
Type: "string",
|
||||||
|
Format: "date-time",
|
||||||
|
},
|
||||||
|
"reason": {
|
||||||
|
Type: "string",
|
||||||
|
},
|
||||||
|
"message": {
|
||||||
|
Type: "string",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
@ -1984,7 +2025,7 @@ var OperatorConfigCRDResourceValidation = apiextv1.CustomResourceValidation{
|
||||||
func buildCRD(name, kind, plural, list, short string,
|
func buildCRD(name, kind, plural, list, short string,
|
||||||
categories []string,
|
categories []string,
|
||||||
columns []apiextv1.CustomResourceColumnDefinition,
|
columns []apiextv1.CustomResourceColumnDefinition,
|
||||||
validation apiextv1.CustomResourceValidation) *apiextv1.CustomResourceDefinition {
|
validation apiextv1.CustomResourceValidation, specReplicasPath string, statusReplicasPath string, labelSelectorPath string) *apiextv1.CustomResourceDefinition {
|
||||||
return &apiextv1.CustomResourceDefinition{
|
return &apiextv1.CustomResourceDefinition{
|
||||||
TypeMeta: metav1.TypeMeta{
|
TypeMeta: metav1.TypeMeta{
|
||||||
APIVersion: fmt.Sprintf("%s/%s", apiextv1.GroupName, apiextv1.SchemeGroupVersion.Version),
|
APIVersion: fmt.Sprintf("%s/%s", apiextv1.GroupName, apiextv1.SchemeGroupVersion.Version),
|
||||||
|
|
@ -2011,6 +2052,11 @@ func buildCRD(name, kind, plural, list, short string,
|
||||||
Storage: true,
|
Storage: true,
|
||||||
Subresources: &apiextv1.CustomResourceSubresources{
|
Subresources: &apiextv1.CustomResourceSubresources{
|
||||||
Status: &apiextv1.CustomResourceSubresourceStatus{},
|
Status: &apiextv1.CustomResourceSubresourceStatus{},
|
||||||
|
Scale: &apiextv1.CustomResourceSubresourceScale{
|
||||||
|
SpecReplicasPath: specReplicasPath,
|
||||||
|
StatusReplicasPath: statusReplicasPath,
|
||||||
|
LabelSelectorPath: &labelSelectorPath,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
AdditionalPrinterColumns: columns,
|
AdditionalPrinterColumns: columns,
|
||||||
Schema: &validation,
|
Schema: &validation,
|
||||||
|
|
@ -2029,7 +2075,10 @@ func PostgresCRD(crdCategories []string) *apiextv1.CustomResourceDefinition {
|
||||||
PostgresCRDResourceShort,
|
PostgresCRDResourceShort,
|
||||||
crdCategories,
|
crdCategories,
|
||||||
PostgresCRDResourceColumns,
|
PostgresCRDResourceColumns,
|
||||||
PostgresCRDResourceValidation)
|
PostgresCRDResourceValidation,
|
||||||
|
specReplicasPath,
|
||||||
|
statusReplicasPath,
|
||||||
|
labelSelectorPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConfigurationCRD returns CustomResourceDefinition built from OperatorConfigCRDResource
|
// ConfigurationCRD returns CustomResourceDefinition built from OperatorConfigCRDResource
|
||||||
|
|
@ -2041,5 +2090,8 @@ func ConfigurationCRD(crdCategories []string) *apiextv1.CustomResourceDefinition
|
||||||
OperatorConfigCRDResourceShort,
|
OperatorConfigCRDResourceShort,
|
||||||
crdCategories,
|
crdCategories,
|
||||||
OperatorConfigCRDResourceColumns,
|
OperatorConfigCRDResourceColumns,
|
||||||
OperatorConfigCRDResourceValidation)
|
OperatorConfigCRDResourceValidation,
|
||||||
|
specReplicasPath,
|
||||||
|
statusReplicasPath,
|
||||||
|
labelSelectorPath)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package v1
|
||||||
// Postgres CRD definition, please use CamelCase for field names.
|
// Postgres CRD definition, please use CamelCase for field names.
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"k8s.io/apimachinery/pkg/api/equality"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
|
@ -226,9 +227,48 @@ type Sidecar struct {
|
||||||
// UserFlags defines flags (such as superuser, nologin) that could be assigned to individual users
|
// UserFlags defines flags (such as superuser, nologin) that could be assigned to individual users
|
||||||
type UserFlags []string
|
type UserFlags []string
|
||||||
|
|
||||||
|
type Conditions []Condition
|
||||||
|
|
||||||
|
type PostgresqlConditionType string
|
||||||
|
type VolatileTime struct {
|
||||||
|
Inner metav1.Time `json:",inline"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON implements the json.Marshaler interface.
|
||||||
|
func (t VolatileTime) MarshalJSON() ([]byte, error) {
|
||||||
|
return t.Inner.MarshalJSON()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON implements the json.Unmarshaller interface.
|
||||||
|
func (t *VolatileTime) UnmarshalJSON(b []byte) error {
|
||||||
|
return t.Inner.UnmarshalJSON(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
equality.Semantic.AddFunc(
|
||||||
|
// Always treat VolatileTime fields as equivalent.
|
||||||
|
func(VolatileTime, VolatileTime) bool {
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Condition contains the conditions of the PostgreSQL cluster
|
||||||
|
type Condition struct {
|
||||||
|
Type PostgresqlConditionType `json:"type" description:"type of status condition"`
|
||||||
|
Status v1.ConditionStatus `json:"status" description:"status of the condition, one of True, False, Unknown"`
|
||||||
|
LastTransitionTime VolatileTime `json:"lastTransitionTime,omitempty" description:"last time the condition transit from one status to another"`
|
||||||
|
Reason string `json:"reason,omitempty" description:"one-word CamelCase reason for the condition's last transition"`
|
||||||
|
Message string `json:"message,omitempty" description:"human-readable message indicating details about last transition"`
|
||||||
|
}
|
||||||
|
|
||||||
// PostgresStatus contains status of the PostgreSQL cluster (running, creation failed etc.)
|
// PostgresStatus contains status of the PostgreSQL cluster (running, creation failed etc.)
|
||||||
type PostgresStatus struct {
|
type PostgresStatus struct {
|
||||||
PostgresClusterStatus string `json:"PostgresClusterStatus"`
|
PostgresClusterStatus string `json:"postgresClusterStatus"`
|
||||||
|
NumberOfInstances int32 `json:"numberOfInstances"`
|
||||||
|
LabelSelector string `json:"labelSelector"`
|
||||||
|
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
|
||||||
|
Conditions Conditions `json:"conditions,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConnectionPooler Options for connection pooler
|
// ConnectionPooler Options for connection pooler
|
||||||
|
|
|
||||||
|
|
@ -100,7 +100,3 @@ func (postgresStatus PostgresStatus) Running() bool {
|
||||||
func (postgresStatus PostgresStatus) Creating() bool {
|
func (postgresStatus PostgresStatus) Creating() bool {
|
||||||
return postgresStatus.PostgresClusterStatus == ClusterStatusCreating
|
return postgresStatus.PostgresClusterStatus == ClusterStatusCreating
|
||||||
}
|
}
|
||||||
|
|
||||||
func (postgresStatus PostgresStatus) String() string {
|
|
||||||
return postgresStatus.PostgresClusterStatus
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -98,6 +98,45 @@ func (in *CloneDescription) DeepCopy() *CloneDescription {
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||||
|
func (in *Condition) DeepCopyInto(out *Condition) {
|
||||||
|
*out = *in
|
||||||
|
in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Condition.
|
||||||
|
func (in *Condition) DeepCopy() *Condition {
|
||||||
|
if in == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
out := new(Condition)
|
||||||
|
in.DeepCopyInto(out)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||||
|
func (in Conditions) DeepCopyInto(out *Conditions) {
|
||||||
|
{
|
||||||
|
in := &in
|
||||||
|
*out = make(Conditions, len(*in))
|
||||||
|
for i := range *in {
|
||||||
|
(*in)[i].DeepCopyInto(&(*out)[i])
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Conditions.
|
||||||
|
func (in Conditions) DeepCopy() Conditions {
|
||||||
|
if in == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
out := new(Conditions)
|
||||||
|
in.DeepCopyInto(out)
|
||||||
|
return *out
|
||||||
|
}
|
||||||
|
|
||||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||||
func (in *ConnectionPooler) DeepCopyInto(out *ConnectionPooler) {
|
func (in *ConnectionPooler) DeepCopyInto(out *ConnectionPooler) {
|
||||||
*out = *in
|
*out = *in
|
||||||
|
|
@ -897,6 +936,13 @@ func (in *PostgresSpec) DeepCopy() *PostgresSpec {
|
||||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||||
func (in *PostgresStatus) DeepCopyInto(out *PostgresStatus) {
|
func (in *PostgresStatus) DeepCopyInto(out *PostgresStatus) {
|
||||||
*out = *in
|
*out = *in
|
||||||
|
if in.Conditions != nil {
|
||||||
|
in, out := &in.Conditions, &out.Conditions
|
||||||
|
*out = make(Conditions, len(*in))
|
||||||
|
for i := range *in {
|
||||||
|
(*in)[i].DeepCopyInto(&(*out)[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1058,7 +1104,7 @@ func (in *Postgresql) DeepCopyInto(out *Postgresql) {
|
||||||
out.TypeMeta = in.TypeMeta
|
out.TypeMeta = in.TypeMeta
|
||||||
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
|
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
|
||||||
in.Spec.DeepCopyInto(&out.Spec)
|
in.Spec.DeepCopyInto(&out.Spec)
|
||||||
out.Status = in.Status
|
in.Status.DeepCopyInto(&out.Status)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1469,6 +1515,23 @@ func (in UserFlags) DeepCopy() UserFlags {
|
||||||
return *out
|
return *out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||||
|
func (in *VolatileTime) DeepCopyInto(out *VolatileTime) {
|
||||||
|
*out = *in
|
||||||
|
in.Inner.DeepCopyInto(&out.Inner)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VolatileTime.
|
||||||
|
func (in *VolatileTime) DeepCopy() *VolatileTime {
|
||||||
|
if in == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
out := new(VolatileTime)
|
||||||
|
in.DeepCopyInto(out)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||||
func (in *Volume) DeepCopyInto(out *Volume) {
|
func (in *Volume) DeepCopyInto(out *Volume) {
|
||||||
*out = *in
|
*out = *in
|
||||||
|
|
|
||||||
|
|
@ -276,10 +276,10 @@ func (c *Cluster) Create() (err error) {
|
||||||
errStatus error
|
errStatus error
|
||||||
)
|
)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
|
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning, c.OpConfig.ClusterNameLabel,"") //TODO: are you sure it's running?
|
||||||
} else {
|
} else {
|
||||||
c.logger.Warningf("cluster created failed: %v", err)
|
c.logger.Warningf("cluster created failed: %v", err)
|
||||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
|
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed, c.OpConfig.ClusterNameLabel, err.Error())
|
||||||
}
|
}
|
||||||
if errStatus != nil {
|
if errStatus != nil {
|
||||||
c.logger.Warningf("could not set cluster status: %v", errStatus)
|
c.logger.Warningf("could not set cluster status: %v", errStatus)
|
||||||
|
|
@ -289,7 +289,7 @@ func (c *Cluster) Create() (err error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
|
pgCreateStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating, c.OpConfig.ClusterNameLabel, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not set cluster status: %v", err)
|
return fmt.Errorf("could not set cluster status: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -977,12 +977,13 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)
|
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating, c.OpConfig.ClusterNameLabel, "")
|
||||||
|
|
||||||
if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
|
if !isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) {
|
||||||
// do not apply any major version related changes yet
|
// do not apply any major version related changes yet
|
||||||
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
|
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
c.setSpec(newSpec)
|
c.setSpec(newSpec)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -991,9 +992,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
if updateFailed {
|
if updateFailed {
|
||||||
pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed)
|
pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed, c.OpConfig.ClusterNameLabel, err.Error())
|
||||||
} else {
|
} else {
|
||||||
pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
|
pgUpdatedStatus, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning, c.OpConfig.ClusterNameLabel, "")
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warningf("could not set cluster status: %v", err)
|
c.logger.Warningf("could not set cluster status: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -49,9 +49,9 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warningf("error while syncing cluster state: %v", err)
|
c.logger.Warningf("error while syncing cluster state: %v", err)
|
||||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusSyncFailed)
|
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusSyncFailed, c.OpConfig.ClusterNameLabel, err.Error())
|
||||||
} else if !c.Status.Running() {
|
} else if !c.Status.Running() {
|
||||||
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
|
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning, c.OpConfig.ClusterNameLabel, "")
|
||||||
}
|
}
|
||||||
if errStatus != nil {
|
if errStatus != nil {
|
||||||
c.logger.Warningf("could not set cluster status: %v", errStatus)
|
c.logger.Warningf("could not set cluster status: %v", errStatus)
|
||||||
|
|
|
||||||
|
|
@ -161,7 +161,7 @@ func (c *Controller) acquireInitialListOfClusters() error {
|
||||||
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
|
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) (*cluster.Cluster, error) {
|
||||||
if c.opConfig.EnableTeamIdClusternamePrefix {
|
if c.opConfig.EnableTeamIdClusternamePrefix {
|
||||||
if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
|
if _, err := acidv1.ExtractClusterName(clusterName.Name, pgSpec.Spec.TeamID); err != nil {
|
||||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusInvalid)
|
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusInvalid, c.opConfig.ClusterNameLabel, err.Error())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -207,10 +207,10 @@ func (c *Controller) processEvent(event ClusterEvent) {
|
||||||
if event.EventType == EventRepair {
|
if event.EventType == EventRepair {
|
||||||
runRepair, lastOperationStatus := cl.NeedsRepair()
|
runRepair, lastOperationStatus := cl.NeedsRepair()
|
||||||
if !runRepair {
|
if !runRepair {
|
||||||
lg.Debugf("observed cluster status %s, repair is not required", lastOperationStatus)
|
lg.Debugf("observed cluster status %#v, repair is not required", lastOperationStatus)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lg.Debugf("observed cluster status %s, running sync scan to repair the cluster", lastOperationStatus)
|
lg.Debugf("observed cluster status %#v, running sync scan to repair the cluster", lastOperationStatus)
|
||||||
event.EventType = EventSync
|
event.EventType = EventSync
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -467,16 +467,15 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
||||||
|
|
||||||
if clusterError != "" && eventType != EventDelete {
|
if clusterError != "" && eventType != EventDelete {
|
||||||
c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
|
c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
|
||||||
|
|
||||||
switch eventType {
|
switch eventType {
|
||||||
case EventAdd:
|
case EventAdd:
|
||||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed)
|
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed, c.opConfig.ClusterNameLabel, clusterError)
|
||||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
|
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
|
||||||
case EventUpdate:
|
case EventUpdate:
|
||||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed)
|
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed, c.opConfig.ClusterNameLabel, clusterError)
|
||||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
|
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
|
||||||
default:
|
default:
|
||||||
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed)
|
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed, c.opConfig.ClusterNameLabel, clusterError)
|
||||||
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
|
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package k8sutil
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
b64 "encoding/base64"
|
b64 "encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
@ -191,9 +192,20 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetPostgresCRDStatus of Postgres cluster
|
// SetPostgresCRDStatus of Postgres cluster
|
||||||
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, status string) (*apiacidv1.Postgresql, error) {
|
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, status string, clusterNameLabel string, message string) (*apiacidv1.Postgresql, error) {
|
||||||
var pg *apiacidv1.Postgresql
|
var pg *apiacidv1.Postgresql
|
||||||
var pgStatus apiacidv1.PostgresStatus
|
var pgStatus apiacidv1.PostgresStatus
|
||||||
|
|
||||||
|
pg, err := client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Get(context.TODO(), clusterName.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not fetch Postgres CR %s/%s: %v", clusterName.Namespace, clusterName.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pgStatus = updateConditions(pg, status, message)
|
||||||
|
if pgStatus.LabelSelector == "" {
|
||||||
|
pgStatus.LabelSelector = fmt.Sprintf("%s=%s", clusterNameLabel, pg.Name)
|
||||||
|
}
|
||||||
|
|
||||||
pgStatus.PostgresClusterStatus = status
|
pgStatus.PostgresClusterStatus = status
|
||||||
|
|
||||||
patch, err := json.Marshal(struct {
|
patch, err := json.Marshal(struct {
|
||||||
|
|
@ -216,6 +228,95 @@ func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.Namespaced
|
||||||
return pg, nil
|
return pg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateConditions(existingPg *apiacidv1.Postgresql, currentStatus string, message string) apiacidv1.PostgresStatus {
|
||||||
|
now := apiacidv1.VolatileTime{Inner: metav1.NewTime(time.Now())}
|
||||||
|
existingStatus := existingPg.Status
|
||||||
|
existingConditions := existingStatus.Conditions
|
||||||
|
var readyCondition, reconciliationCondition *apiacidv1.Condition
|
||||||
|
|
||||||
|
// Find existing conditions
|
||||||
|
for i := range existingConditions {
|
||||||
|
if existingConditions[i].Type == "Ready" {
|
||||||
|
readyCondition = &existingConditions[i]
|
||||||
|
} else if existingConditions[i].Type == "ReconciliationSuccessful" {
|
||||||
|
reconciliationCondition = &existingConditions[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Safety checks to avoid nil pointer dereference
|
||||||
|
if readyCondition == nil {
|
||||||
|
readyCondition = &apiacidv1.Condition{Type: "Ready"}
|
||||||
|
existingConditions = append(existingConditions, *readyCondition)
|
||||||
|
}
|
||||||
|
|
||||||
|
if reconciliationCondition == nil {
|
||||||
|
reconciliationCondition = &apiacidv1.Condition{Type: "ReconciliationSuccessful"}
|
||||||
|
existingConditions = append(existingConditions, *reconciliationCondition)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update Ready condition
|
||||||
|
switch currentStatus {
|
||||||
|
case "Running":
|
||||||
|
readyCondition.Status = v1.ConditionTrue
|
||||||
|
readyCondition.LastTransitionTime = now
|
||||||
|
existingPg.Status.NumberOfInstances = existingPg.Spec.NumberOfInstances
|
||||||
|
existingPg.Status.ObservedGeneration = existingPg.Generation
|
||||||
|
case "CreateFailed":
|
||||||
|
readyCondition.Status = v1.ConditionFalse
|
||||||
|
readyCondition.LastTransitionTime = now
|
||||||
|
existingPg.Status.NumberOfInstances = 0
|
||||||
|
existingPg.Status.ObservedGeneration = 0
|
||||||
|
case "UpdateFailed", "SyncFailed", "Invalid":
|
||||||
|
if readyCondition.Status == v1.ConditionFalse {
|
||||||
|
readyCondition.LastTransitionTime = now
|
||||||
|
existingPg.Status.NumberOfInstances = existingStatus.NumberOfInstances
|
||||||
|
existingPg.Status.ObservedGeneration = existingStatus.ObservedGeneration
|
||||||
|
}
|
||||||
|
case "Updating":
|
||||||
|
existingPg.Status.NumberOfInstances = existingStatus.NumberOfInstances
|
||||||
|
existingPg.Status.ObservedGeneration = existingStatus.ObservedGeneration
|
||||||
|
// not updating time, just setting the status
|
||||||
|
if readyCondition.Status == v1.ConditionFalse {
|
||||||
|
readyCondition.Status = v1.ConditionFalse
|
||||||
|
} else {
|
||||||
|
readyCondition.Status = v1.ConditionTrue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update ReconciliationSuccessful condition
|
||||||
|
reconciliationCondition.LastTransitionTime = now
|
||||||
|
reconciliationCondition.Message = message
|
||||||
|
if currentStatus == "Running" {
|
||||||
|
reconciliationCondition.Status = v1.ConditionTrue
|
||||||
|
reconciliationCondition.Reason = ""
|
||||||
|
} else {
|
||||||
|
reconciliationCondition.Status = v1.ConditionFalse
|
||||||
|
reconciliationCondition.Reason = currentStatus
|
||||||
|
}
|
||||||
|
// Directly modify elements in the existingConditions slice
|
||||||
|
for i := range existingConditions {
|
||||||
|
if existingConditions[i].Type == "Ready" && readyCondition != nil {
|
||||||
|
existingConditions[i] = *readyCondition
|
||||||
|
} else if existingConditions[i].Type == "ReconciliationSuccessful" && reconciliationCondition != nil {
|
||||||
|
existingConditions[i] = *reconciliationCondition
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if currentStatus == "Creating" {
|
||||||
|
existingPg.Status.NumberOfInstances = 0
|
||||||
|
existingPg.Status.ObservedGeneration = 0
|
||||||
|
for i := range existingConditions {
|
||||||
|
if existingConditions[i].Type == "Ready" {
|
||||||
|
existingConditions = append(existingConditions[:i], existingConditions[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
existingPg.Status.Conditions = existingConditions
|
||||||
|
|
||||||
|
return existingPg.Status
|
||||||
|
}
|
||||||
|
|
||||||
// SetFinalizer of Postgres cluster
|
// SetFinalizer of Postgres cluster
|
||||||
func (client *KubernetesClient) SetFinalizer(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql, finalizers []string) (*apiacidv1.Postgresql, error) {
|
func (client *KubernetesClient) SetFinalizer(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql, finalizers []string) (*apiacidv1.Postgresql, error) {
|
||||||
var (
|
var (
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue