Merge a2088a28b4 into 3ca1884876
This commit is contained in:
commit
cd55c474f4
|
|
@ -3,6 +3,7 @@ package cluster
|
|||
// Postgres CustomResourceDefinition object i.e. Spilo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
|
@ -71,7 +72,7 @@ type kubeResources struct {
|
|||
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
|
||||
LogicalBackupJob *batchv1.CronJob
|
||||
Streams map[string]*zalandov1.FabricEventStream
|
||||
//Pods are treated separately
|
||||
// Pods are treated separately
|
||||
}
|
||||
|
||||
// Cluster describes postgresql cluster
|
||||
|
|
@ -96,7 +97,7 @@ type Cluster struct {
|
|||
|
||||
teamsAPIClient teams.Interface
|
||||
oauthTokenGetter OAuthTokenGetter
|
||||
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
||||
KubeClient k8sutil.KubernetesClient // TODO: move clients to the better place?
|
||||
currentProcess Process
|
||||
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
|
||||
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
|
||||
|
|
@ -150,7 +151,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
|
|||
PatroniEndpoints: make(map[string]*v1.Endpoints),
|
||||
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
|
||||
VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim),
|
||||
Streams: make(map[string]*zalandov1.FabricEventStream)},
|
||||
Streams: make(map[string]*zalandov1.FabricEventStream),
|
||||
},
|
||||
userSyncStrategy: users.DefaultUserSyncStrategy{
|
||||
PasswordEncryption: passwordEncryption,
|
||||
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
|
||||
|
|
@ -445,7 +447,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
|
|||
var match, needsRollUpdate, needsReplace bool
|
||||
|
||||
match = true
|
||||
//TODO: improve me
|
||||
// TODO: improve me
|
||||
if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
|
||||
match = false
|
||||
reasons = append(reasons, "new statefulset's number of replicas does not match the current one")
|
||||
|
|
@ -677,7 +679,6 @@ func compareResourcesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resourc
|
|||
}
|
||||
}
|
||||
return true
|
||||
|
||||
}
|
||||
|
||||
func compareEnv(a, b []v1.EnvVar) bool {
|
||||
|
|
@ -712,9 +713,7 @@ func compareEnv(a, b []v1.EnvVar) bool {
|
|||
}
|
||||
|
||||
func compareSpiloConfiguration(configa, configb string) bool {
|
||||
var (
|
||||
oa, ob spiloConfiguration
|
||||
)
|
||||
var oa, ob spiloConfiguration
|
||||
|
||||
var err error
|
||||
err = json.Unmarshal([]byte(configa), &oa)
|
||||
|
|
@ -823,7 +822,6 @@ func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]
|
|||
}
|
||||
|
||||
return reason != "", reason
|
||||
|
||||
}
|
||||
|
||||
func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
|
||||
|
|
@ -900,7 +898,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLog
|
|||
}
|
||||
|
||||
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
|
||||
//TODO: improve comparison
|
||||
// TODO: improve comparison
|
||||
if !reflect.DeepEqual(new.Spec, cur.Spec) {
|
||||
return false, "new PDB's spec does not match the current one"
|
||||
}
|
||||
|
|
@ -949,8 +947,17 @@ func (c *Cluster) removeFinalizer() error {
|
|||
}
|
||||
|
||||
c.logger.Infof("removing finalizer %s", finalizerName)
|
||||
finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName)
|
||||
newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers)
|
||||
|
||||
// Fetch the latest version of the object to avoid resourceVersion conflicts
|
||||
clusterName := c.clusterName()
|
||||
latestPg, err := c.KubeClient.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Get(
|
||||
context.TODO(), clusterName.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error fetching latest postgresql for finalizer removal: %v", err)
|
||||
}
|
||||
|
||||
finalizers := util.RemoveString(latestPg.ObjectMeta.Finalizers, finalizerName)
|
||||
newSpec, err := c.KubeClient.SetFinalizer(clusterName, latestPg, finalizers)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error removing finalizer: %v", err)
|
||||
}
|
||||
|
|
@ -1073,7 +1080,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
}
|
||||
|
||||
c.logger.Debug("syncing secrets")
|
||||
//TODO: mind the secrets of the deleted/new users
|
||||
// TODO: mind the secrets of the deleted/new users
|
||||
if err := c.syncSecrets(); err != nil {
|
||||
c.logger.Errorf("could not sync secrets: %v", err)
|
||||
updateFailed = true
|
||||
|
|
@ -1111,7 +1118,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
|
||||
// logical backup job
|
||||
func() {
|
||||
|
||||
// create if it did not exist
|
||||
if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup {
|
||||
c.logger.Debug("creating backup cron job")
|
||||
|
|
@ -1139,7 +1145,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
|
|||
updateFailed = true
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
// Roles and Databases
|
||||
|
|
@ -1216,7 +1221,7 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
|
|||
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
|
||||
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
|
||||
func (c *Cluster) Delete() error {
|
||||
var anyErrors = false
|
||||
anyErrors := false
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
|
||||
|
|
@ -1307,7 +1312,6 @@ func (c *Cluster) NeedsRepair() (bool, acidv1.PostgresStatus) {
|
|||
c.specMu.RLock()
|
||||
defer c.specMu.RUnlock()
|
||||
return !c.Status.Success(), c.Status
|
||||
|
||||
}
|
||||
|
||||
// ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue.
|
||||
|
|
@ -1416,7 +1420,6 @@ func (c *Cluster) initSystemUsers() {
|
|||
}
|
||||
|
||||
func (c *Cluster) initPreparedDatabaseRoles() error {
|
||||
|
||||
if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB
|
||||
c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}}
|
||||
}
|
||||
|
|
@ -1482,10 +1485,9 @@ func (c *Cluster) initPreparedDatabaseRoles() error {
|
|||
}
|
||||
|
||||
func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error {
|
||||
|
||||
for defaultRole, inherits := range defaultRoles {
|
||||
namespace := c.Namespace
|
||||
//if namespaced secrets are allowed
|
||||
// if namespaced secrets are allowed
|
||||
if secretNamespace != "" {
|
||||
if c.Config.OpConfig.EnableCrossNamespaceSecret {
|
||||
namespace = secretNamespace
|
||||
|
|
@ -1553,7 +1555,7 @@ func (c *Cluster) initRobotUsers() error {
|
|||
}
|
||||
}
|
||||
|
||||
//if namespaced secrets are allowed
|
||||
// if namespaced secrets are allowed
|
||||
if c.Config.OpConfig.EnableCrossNamespaceSecret {
|
||||
if strings.Contains(username, ".") {
|
||||
splits := strings.Split(username, ".")
|
||||
|
|
@ -1604,7 +1606,6 @@ func (c *Cluster) initAdditionalOwnerRoles() {
|
|||
|
||||
func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error {
|
||||
teamMembers, err := c.getTeamMembers(teamID)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err)
|
||||
}
|
||||
|
|
@ -1643,7 +1644,6 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e
|
|||
}
|
||||
|
||||
func (c *Cluster) initHumanUsers() error {
|
||||
|
||||
var clusterIsOwnedBySuperuserTeam bool
|
||||
superuserTeams := []string{}
|
||||
|
||||
|
|
|
|||
|
|
@ -277,7 +277,7 @@ func (c *Controller) initRoleBinding() {
|
|||
}`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name)
|
||||
c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue)
|
||||
}
|
||||
c.logger.Info("Parse role bindings")
|
||||
|
||||
// re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
|
||||
decode := scheme.Codecs.UniversalDeserializer().Decode
|
||||
obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountRoleBindingDefinition), nil, nil)
|
||||
|
|
|
|||
|
|
@ -259,13 +259,26 @@ func (c *Controller) processEvent(event ClusterEvent) {
|
|||
|
||||
lg.Infoln("cluster has been created")
|
||||
case EventUpdate:
|
||||
lg.Infoln("update of the cluster started")
|
||||
|
||||
if !clusterFound {
|
||||
lg.Warningln("cluster does not exist")
|
||||
return
|
||||
}
|
||||
c.curWorkerCluster.Store(event.WorkerID, cl)
|
||||
|
||||
// Check if this cluster has been marked for deletion
|
||||
if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
|
||||
if err = cl.Delete(); err != nil {
|
||||
cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
|
||||
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
|
||||
lg.Error(cl.Error)
|
||||
return
|
||||
}
|
||||
lg.Infoln("cluster has been deleted via update event")
|
||||
return
|
||||
}
|
||||
|
||||
lg.Infoln("update of the cluster started")
|
||||
err = cl.Update(event.OldSpec, event.NewSpec)
|
||||
if err != nil {
|
||||
cl.Error = fmt.Sprintf("could not update cluster: %v", err)
|
||||
|
|
@ -380,7 +393,6 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
|
|||
}
|
||||
|
||||
func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {
|
||||
|
||||
deprecate := func(deprecated, replacement string) {
|
||||
c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
|
||||
}
|
||||
|
|
@ -426,7 +438,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
|||
clusterError string
|
||||
)
|
||||
|
||||
if informerOldSpec != nil { //update, delete
|
||||
if informerOldSpec != nil { // update, delete
|
||||
uid = informerOldSpec.GetUID()
|
||||
clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)
|
||||
|
||||
|
|
@ -441,7 +453,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
|
|||
} else {
|
||||
clusterError = informerOldSpec.Error
|
||||
}
|
||||
} else { //add, sync
|
||||
} else { // add, sync
|
||||
uid = informerNewSpec.GetUID()
|
||||
clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
|
||||
clusterError = informerNewSpec.Error
|
||||
|
|
@ -552,7 +564,19 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
|
|||
pgOld := c.postgresqlCheck(prev)
|
||||
pgNew := c.postgresqlCheck(cur)
|
||||
if pgOld != nil && pgNew != nil {
|
||||
// Avoid the inifinite recursion for status updates
|
||||
clusterName := util.NameFromMeta(pgNew.ObjectMeta)
|
||||
|
||||
// Check if DeletionTimestamp was set (resource marked for deletion)
|
||||
deletionTimestampChanged := pgOld.ObjectMeta.DeletionTimestamp.IsZero() && !pgNew.ObjectMeta.DeletionTimestamp.IsZero()
|
||||
if deletionTimestampChanged {
|
||||
c.logger.WithField("cluster-name", clusterName).Infof(
|
||||
"UPDATE event: DeletionTimestamp set to %s, queueing event",
|
||||
pgNew.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
|
||||
c.queueClusterEvent(pgOld, pgNew, EventUpdate)
|
||||
return
|
||||
}
|
||||
|
||||
// Avoid the infinite recursion for status updates
|
||||
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
|
||||
if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
|
||||
return
|
||||
|
|
@ -591,7 +615,6 @@ or config maps.
|
|||
The operator does not sync accounts/role bindings after creation.
|
||||
*/
|
||||
func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
|
||||
|
||||
namespace := event.NewSpec.GetNamespace()
|
||||
|
||||
if err := c.createPodServiceAccount(namespace); err != nil {
|
||||
|
|
@ -605,7 +628,6 @@ func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
|
|||
}
|
||||
|
||||
func (c *Controller) createPodServiceAccount(namespace string) error {
|
||||
|
||||
podServiceAccountName := c.opConfig.PodServiceAccountName
|
||||
_, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
|
||||
if k8sutil.ResourceNotFound(err) {
|
||||
|
|
@ -628,7 +650,6 @@ func (c *Controller) createPodServiceAccount(namespace string) error {
|
|||
}
|
||||
|
||||
func (c *Controller) createRoleBindings(namespace string) error {
|
||||
|
||||
podServiceAccountName := c.opConfig.PodServiceAccountName
|
||||
podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue