This commit is contained in:
thoro 2026-01-19 14:56:49 +00:00 committed by GitHub
commit e800d0b0b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 55 additions and 34 deletions

View File

@ -3,6 +3,7 @@ package cluster
// Postgres CustomResourceDefinition object i.e. Spilo
import (
"context"
"database/sql"
"encoding/json"
"fmt"
@ -70,7 +71,7 @@ type kubeResources struct {
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
// Pods are treated separately
}
// Cluster describes postgresql cluster
@ -95,7 +96,7 @@ type Cluster struct {
teamsAPIClient teams.Interface
oauthTokenGetter OAuthTokenGetter
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
KubeClient k8sutil.KubernetesClient // TODO: move clients to the better place?
currentProcess Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
@ -149,7 +150,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim),
Streams: make(map[string]*zalandov1.FabricEventStream)},
Streams: make(map[string]*zalandov1.FabricEventStream),
},
userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption,
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
@ -276,7 +278,7 @@ func (c *Cluster) Create() (err error) {
errStatus error
)
if err == nil {
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) // TODO: are you sure it's running?
} else {
c.logger.Warningf("cluster created failed: %v", err)
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
@ -441,7 +443,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
var match, needsRollUpdate, needsReplace bool
match = true
//TODO: improve me
// TODO: improve me
if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
match = false
reasons = append(reasons, "new statefulset's number of replicas does not match the current one")
@ -673,7 +675,6 @@ func compareResourcesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resourc
}
}
return true
}
func compareEnv(a, b []v1.EnvVar) bool {
@ -708,9 +709,7 @@ func compareEnv(a, b []v1.EnvVar) bool {
}
func compareSpiloConfiguration(configa, configb string) bool {
var (
oa, ob spiloConfiguration
)
var oa, ob spiloConfiguration
var err error
err = json.Unmarshal([]byte(configa), &oa)
@ -819,7 +818,6 @@ func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]
}
return reason != "", reason
}
func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
@ -896,7 +894,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLog
}
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
// TODO: improve comparison
if !reflect.DeepEqual(new.Spec, cur.Spec) {
return false, "new PDB's spec does not match the current one"
}
@ -945,8 +943,17 @@ func (c *Cluster) removeFinalizer() error {
}
c.logger.Infof("removing finalizer %s", finalizerName)
finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers)
// Fetch the latest version of the object to avoid resourceVersion conflicts
clusterName := c.clusterName()
latestPg, err := c.KubeClient.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Get(
context.TODO(), clusterName.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching latest postgresql for finalizer removal: %v", err)
}
finalizers := util.RemoveString(latestPg.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(clusterName, latestPg, finalizers)
if err != nil {
return fmt.Errorf("error removing finalizer: %v", err)
}
@ -1065,7 +1072,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}
c.logger.Debug("syncing secrets")
//TODO: mind the secrets of the deleted/new users
// TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
c.logger.Errorf("could not sync secrets: %v", err)
updateFailed = true
@ -1103,7 +1110,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// logical backup job
func() {
// create if it did not exist
if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup {
c.logger.Debug("creating backup cron job")
@ -1131,7 +1137,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true
}
}
}()
// Roles and Databases
@ -1208,7 +1213,7 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
func (c *Cluster) Delete() error {
var anyErrors = false
anyErrors := false
c.mu.Lock()
defer c.mu.Unlock()
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
@ -1299,7 +1304,6 @@ func (c *Cluster) NeedsRepair() (bool, acidv1.PostgresStatus) {
c.specMu.RLock()
defer c.specMu.RUnlock()
return !c.Status.Success(), c.Status
}
// ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue.
@ -1408,7 +1412,6 @@ func (c *Cluster) initSystemUsers() {
}
func (c *Cluster) initPreparedDatabaseRoles() error {
if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB
c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}}
}
@ -1474,10 +1477,9 @@ func (c *Cluster) initPreparedDatabaseRoles() error {
}
func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error {
for defaultRole, inherits := range defaultRoles {
namespace := c.Namespace
//if namespaced secrets are allowed
// if namespaced secrets are allowed
if secretNamespace != "" {
if c.Config.OpConfig.EnableCrossNamespaceSecret {
namespace = secretNamespace
@ -1545,7 +1547,7 @@ func (c *Cluster) initRobotUsers() error {
}
}
//if namespaced secrets are allowed
// if namespaced secrets are allowed
if c.Config.OpConfig.EnableCrossNamespaceSecret {
if strings.Contains(username, ".") {
splits := strings.Split(username, ".")
@ -1596,7 +1598,6 @@ func (c *Cluster) initAdditionalOwnerRoles() {
func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error {
teamMembers, err := c.getTeamMembers(teamID)
if err != nil {
return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err)
}
@ -1635,7 +1636,6 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e
}
func (c *Cluster) initHumanUsers() error {
var clusterIsOwnedBySuperuserTeam bool
superuserTeams := []string{}

View File

@ -277,7 +277,7 @@ func (c *Controller) initRoleBinding() {
}`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name)
c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue)
}
c.logger.Info("Parse role bindings")
// re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountRoleBindingDefinition), nil, nil)

View File

@ -258,13 +258,26 @@ func (c *Controller) processEvent(event ClusterEvent) {
lg.Infoln("cluster has been created")
case EventUpdate:
lg.Infoln("update of the cluster started")
if !clusterFound {
lg.Warningln("cluster does not exist")
return
}
c.curWorkerCluster.Store(event.WorkerID, cl)
// Check if this cluster has been marked for deletion
if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
if err = cl.Delete(); err != nil {
cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
lg.Error(cl.Error)
return
}
lg.Infoln("cluster has been deleted via update event")
return
}
lg.Infoln("update of the cluster started")
err = cl.Update(event.OldSpec, event.NewSpec)
if err != nil {
cl.Error = fmt.Sprintf("could not update cluster: %v", err)
@ -379,7 +392,6 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
}
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
deprecate := func(deprecated, replacement string) {
c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
}
@ -425,7 +437,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
clusterError string
)
if informerOldSpec != nil { //update, delete
if informerOldSpec != nil { // update, delete
uid = informerOldSpec.GetUID()
clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)
@ -440,7 +452,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
} else {
clusterError = informerOldSpec.Error
}
} else { //add, sync
} else { // add, sync
uid = informerNewSpec.GetUID()
clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
clusterError = informerNewSpec.Error
@ -539,7 +551,19 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
pgOld := c.postgresqlCheck(prev)
pgNew := c.postgresqlCheck(cur)
if pgOld != nil && pgNew != nil {
// Avoid the inifinite recursion for status updates
clusterName := util.NameFromMeta(pgNew.ObjectMeta)
// Check if DeletionTimestamp was set (resource marked for deletion)
deletionTimestampChanged := pgOld.ObjectMeta.DeletionTimestamp.IsZero() && !pgNew.ObjectMeta.DeletionTimestamp.IsZero()
if deletionTimestampChanged {
c.logger.WithField("cluster-name", clusterName).Infof(
"UPDATE event: DeletionTimestamp set to %s, queueing event",
pgNew.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
c.queueClusterEvent(pgOld, pgNew, EventUpdate)
return
}
// Avoid the infinite recursion for status updates
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
return
@ -578,7 +602,6 @@ or config maps.
The operator does not sync accounts/role bindings after creation.
*/
func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
namespace := event.NewSpec.GetNamespace()
if err := c.createPodServiceAccount(namespace); err != nil {
@ -592,7 +615,6 @@ func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
}
func (c *Controller) createPodServiceAccount(namespace string) error {
podServiceAccountName := c.opConfig.PodServiceAccountName
_, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
if k8sutil.ResourceNotFound(err) {
@ -615,7 +637,6 @@ func (c *Controller) createPodServiceAccount(namespace string) error {
}
func (c *Controller) createRoleBindings(namespace string) error {
podServiceAccountName := c.opConfig.PodServiceAccountName
podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name