From eba23279c8bfce331c85fd5ca58df204568aceb6 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Thu, 19 Oct 2017 10:49:42 +0200 Subject: [PATCH] Kube cluster upgrade --- glide.lock | 6 +- glide.yaml | 6 +- manifests/configmap.yaml | 4 +- manifests/postgres-operator.yaml | 2 +- manifests/testpostgresql.yaml | 2 +- pkg/cluster/cluster.go | 56 +++++++++-- pkg/cluster/k8sres.go | 44 +++++++++ pkg/cluster/pod.go | 146 +++++++++++++++++++++++++++- pkg/cluster/resources.go | 67 ++++++++++++- pkg/cluster/sync.go | 34 +++++++ pkg/cluster/util.go | 21 ++-- pkg/controller/controller.go | 55 ++++++++--- pkg/controller/node.go | 162 +++++++++++++++++++++++++++++++ pkg/controller/pod.go | 10 -- pkg/controller/postgresql.go | 2 +- pkg/spec/types.go | 14 +-- pkg/util/config/config.go | 27 +++--- pkg/util/constants/kubernetes.go | 5 +- pkg/util/k8sutil/k8sutil.go | 5 + 19 files changed, 595 insertions(+), 73 deletions(-) create mode 100644 pkg/controller/node.go diff --git a/glide.lock b/glide.lock index 0499d45b2..f0551f302 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ -hash: 42ffa063321a691ec1de30532989e66e81fb7a080d6d4867bbb2c9d7f2a008ce -updated: 2017-10-06T15:06:00.742579+02:00 +hash: bb2d336f3efb57376916e47b585ad55611b5023b79044ced59c762ea35427f19 +updated: 2017-10-10T10:40:56.894070487+02:00 imports: - name: github.com/aws/aws-sdk-go - version: da415b5fa0ff3f91d4707348a8ea1be53f700c22 + version: 760741802ad40f49ae9fc4a69ef6706d2527d62e subpackages: - aws - aws/awserr diff --git a/glide.yaml b/glide.yaml index 4661c587c..19dd46670 100644 --- a/glide.yaml +++ b/glide.yaml @@ -17,10 +17,8 @@ import: version: release-1.7 subpackages: - pkg/api/errors - - pkg/api/meta - pkg/api/resource - pkg/apis/meta/v1 - - pkg/fields - pkg/labels - pkg/runtime - pkg/runtime/schema @@ -33,6 +31,10 @@ import: version: ^4.0.0 subpackages: - kubernetes + - kubernetes/scheme + - kubernetes/typed/apps/v1beta1 + - kubernetes/typed/core/v1 + - kubernetes/typed/extensions/v1beta1 - pkg/api - pkg/api/v1 - pkg/apis/apps/v1beta1 diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index a9a2a16d3..cf3eac6f3 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -12,7 +12,7 @@ data: dns_name_format: '{cluster}.{team}.staging.{hostedzone}' docker_image: registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4 etcd_host: etcd-client.default.svc.cluster.local:2379 - secret_name_template: '{username}.{clustername}.credentials.{tprkind}.{tprgroup}' + secret_name_template: '{username}.{cluster}.credentials.{tprkind}.{tprgroup}' infrastructure_roles_secret_name: postgresql-infrastructure-roles oauth_token_secret_name: postgresql-operator pam_configuration: | @@ -37,3 +37,5 @@ data: ring_log_lines: "100" cluster_history_entries: "1000" pod_terminate_grace_period: 5m + pdb_name_format: "postgres-{cluster}-pdb" + eol_node_label: "eol:true" diff --git a/manifests/postgres-operator.yaml b/manifests/postgres-operator.yaml index f9db500c2..54ee3480e 100644 --- a/manifests/postgres-operator.yaml +++ b/manifests/postgres-operator.yaml @@ -12,7 +12,7 @@ spec: serviceAccountName: operator containers: - name: postgres-operator - image: pierone.example.com/acid/postgres-operator:0.1 + image: pierone.stups.zalan.do/acid/postgres-operator:workerassgn imagePullPolicy: IfNotPresent env: - name: WATCH_NAMESPACE diff --git a/manifests/testpostgresql.yaml b/manifests/testpostgresql.yaml index c6a7ad6fd..d3adb5e30 100644 --- a/manifests/testpostgresql.yaml +++ b/manifests/testpostgresql.yaml @@ -2,7 +2,7 @@ apiVersion: "acid.zalan.do/v1" kind: postgresql metadata: - name: acid-testcluster + name: acid-testcluster17 spec: teamId: "ACID" volume: diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 065c7c336..119237976 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" + policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -44,10 +45,11 @@ type Config struct { } type kubeResources struct { - Services map[PostgresRole]*v1.Service - Endpoint *v1.Endpoints - Secrets map[types.UID]*v1.Secret - Statefulset *v1beta1.StatefulSet + Services map[PostgresRole]*v1.Service + Endpoint *v1.Endpoints + Secrets map[types.UID]*v1.Secret + Statefulset *v1beta1.StatefulSet + PodDisruptionBudget *policybeta1.PodDisruptionBudget //Pods are treated separately //PVCs are treated separately } @@ -259,6 +261,12 @@ func (c *Cluster) Create() error { } } + pdb, err := c.createPodDisruptionBudget() + if err != nil { + return fmt.Errorf("could not create pod disruption budget: %v", err) + } + c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(pdb.ObjectMeta)) + err = c.listResources() if err != nil { c.logger.Errorf("could not list resources: %v", err) @@ -334,6 +342,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp needsRollUpdate = true reasons = append(reasons, "new statefulset's terminationGracePeriodSeconds doesn't match the current one") } + if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.Affinity, statefulSet.Spec.Template.Spec.Affinity) { + needsReplace = true + needsRollUpdate = true + reasons = append(reasons, "new statefulset's pod affinity doesn't match the current one") + } + // Some generated fields like creationTimestamp make it not possible to use DeepCompare on Spec.Template.ObjectMeta if !reflect.DeepEqual(c.Statefulset.Spec.Template.Labels, statefulSet.Spec.Template.Labels) { needsReplace = true @@ -522,6 +536,15 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.logger.Infof("volumes have been updated successfully") } + newPDB := c.generatePodDisruptionBudget() + if match, reason := c.samePDBWith(newPDB); !match { + c.logPDBChanges(c.PodDisruptionBudget, newPDB, true, reason) + if err := c.updatePodDisruptionBudget(newPDB); err != nil { + c.setStatus(spec.ClusterStatusUpdateFailed) + return fmt.Errorf("could not update pod disruption budget: %v", err) + } + } + c.setStatus(spec.ClusterStatusRunning) return nil @@ -555,6 +578,10 @@ func (c *Cluster) Delete() error { } } + if err := c.deletePodDisruptionBudget(); err != nil { + return fmt.Errorf("could not delete pod disruption budget: %v", err) + } + return nil } @@ -690,11 +717,12 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus { Status: c.Status, Spec: c.Spec, - MasterService: c.GetServiceMaster(), - ReplicaService: c.GetServiceReplica(), - Endpoint: c.GetEndpoint(), - StatefulSet: c.GetStatefulSet(), - CurrentProcess: c.GetCurrentProcess(), + MasterService: c.GetServiceMaster(), + ReplicaService: c.GetServiceReplica(), + Endpoint: c.GetEndpoint(), + StatefulSet: c.GetStatefulSet(), + PodDisruptionBudget: c.GetPodDisruptionBudget(), + CurrentProcess: c.GetCurrentProcess(), Error: c.Error, } @@ -733,3 +761,13 @@ func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedNam return nil } + +// Lock locks the cluster +func (c *Cluster) Lock() { + c.mu.Lock() +} + +// Unlock unlocks the cluster +func (c *Cluster) Unlock() { + c.mu.Unlock() +} diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 041cd2f42..dc333ea7f 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" + policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" @@ -67,6 +68,10 @@ func (c *Cluster) serviceName(role PostgresRole) string { return name } +func (c *Cluster) podDisruptionBudgetName() string { + return c.OpConfig.PDBNameFormat.Format("cluster", c.Spec.ClusterName) +} + func (c *Cluster) resourceRequirements(resources spec.Resources) (*v1.ResourceRequirements, error) { var err error @@ -226,6 +231,25 @@ PATRONI_INITDB_PARAMS: return string(result) } +func (c *Cluster) nodeAffinity() *v1.Affinity { + matchExpressions := make([]v1.NodeSelectorRequirement, 0) + for k, v := range c.OpConfig.EOLNodeLabel { + matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{ + Key: k, + Operator: v1.NodeSelectorOpNotIn, + Values: []string{v}, + }) + } + + return &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{{MatchExpressions: matchExpressions}}, + }, + }, + } +} + func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni, @@ -347,6 +371,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme ServiceAccountName: c.OpConfig.ServiceAccountName, TerminationGracePeriodSeconds: &terminateGracePeriodSeconds, Containers: []v1.Container{container}, + Affinity: c.nodeAffinity(), } template := v1.PodTemplateSpec{ @@ -573,6 +598,25 @@ func (c *Cluster) generateCloneEnvironment(description *spec.CloneDescription) [ return result } +func (c *Cluster) generatePodDisruptionBudget() *policybeta1.PodDisruptionBudget { + minAvailable := intstr.FromInt(1) + matchLabels := c.OpConfig.ClusterLabels + matchLabels[c.OpConfig.ClusterNameLabel] = c.Name + + return &policybeta1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.podDisruptionBudgetName(), + Namespace: c.Namespace, + }, + Spec: policybeta1.PodDisruptionBudgetSpec{ + MinAvailable: &minAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: matchLabels, + }, + }, + } +} + // getClusterServiceConnectionParameters fetches cluster host name and port // TODO: perhaps we need to query the service (i.e. if non-standard port is used?) // TODO: handle clusters in different namespaces diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index d562fa60b..5f458db9c 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -2,6 +2,7 @@ package cluster import ( "fmt" + "math/rand" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" @@ -71,7 +72,7 @@ func (c *Cluster) deletePods() error { } func (c *Cluster) deletePod(podName spec.NamespacedName) error { - c.setProcessName("deleting %q pod", podName) + c.setProcessName("deleting pod %q", podName) ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) @@ -87,6 +88,7 @@ func (c *Cluster) deletePod(podName spec.NamespacedName) error { } func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { + c.logger.Debugf("unsubscribing from pod %q events", podName) c.podSubscribersMu.Lock() defer c.podSubscribersMu.Unlock() @@ -99,6 +101,7 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { } func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.PodEvent { + c.logger.Debugf("subscribing to pod %q", podName) c.podSubscribersMu.Lock() defer c.podSubscribersMu.Unlock() @@ -111,9 +114,135 @@ func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.P return ch } -func (c *Cluster) recreatePod(podName spec.NamespacedName) error { - c.setProcessName("recreating %q pod", podName) +func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) { + podName := util.NameFromMeta(pod.ObjectMeta) + if eol, err := c.podIsEndOfLife(pod); err != nil { + return nil, fmt.Errorf("could not get node %q: %v", pod.Spec.NodeName, err) + } else if !eol { + c.logger.Infof("pod %q is already on a live node", podName) + return pod, nil + } + + c.setProcessName("moving pod %q out of end-of-life node %q", podName, pod.Spec.NodeName) + c.logger.Infof("moving pod %q out of the end-of-life node %q", podName, pod.Spec.NodeName) + + if err := c.recreatePod(podName); err != nil { + return nil, fmt.Errorf("could not move pod: %v", err) + } + + newPod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("could not get pod: %v", err) + } + + if newPod.Spec.NodeName == pod.Spec.NodeName { + return nil, fmt.Errorf("pod %q remained on the same node", podName) + } + + if eol, err := c.podIsEndOfLife(newPod); err != nil { + return nil, fmt.Errorf("could not get node %q: %v", pod.Spec.NodeName, err) + } else if eol { + c.logger.Warningf("pod %q moved to end-of-life node %q", podName, newPod.Spec.NodeName) + return newPod, nil + } + + c.logger.Infof("pod %q moved from node %q to node %q", podName, pod.Spec.NodeName, newPod.Spec.NodeName) + + return newPod, nil +} + +func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) { + replicas, err := c.getRolePods(Replica) + if err != nil { + return nil, fmt.Errorf("could not get replica pods: %v", err) + } + + for i, pod := range replicas { + // look for replicas running on live nodes. Ignore errors when querying the nodes. + if pod.Spec.NodeName != oldNodeName { + eol, err := c.podIsEndOfLife(&pod) + if err == nil && !eol { + return &replicas[i], nil + } + } + } + c.logger.Debug("no available master candidates on live nodes") + return &replicas[rand.Intn(len(replicas))], nil +} + +// MigrateMasterPod migrates master pod via failover to a replica +func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { + oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) + + if err != nil { + return fmt.Errorf("could not get pod: %v", err) + } + + c.logger.Infof("migrating master pod %q", podName) + + if eol, err := c.podIsEndOfLife(oldMaster); err != nil { + return fmt.Errorf("could not get node %q: %v", oldMaster.Spec.NodeName, err) + } else if !eol { + c.logger.Debugf("pod is already on a live node") + return nil + } + + if role := PostgresRole(oldMaster.Labels[c.OpConfig.PodRoleLabel]); role != Master { + c.logger.Warningf("pod %q is not a master", podName) + return nil + } + + masterCandidatePod, err := c.masterCandidate(oldMaster.Spec.NodeName) + if err != nil { + return fmt.Errorf("could not get new master candidate: %v", err) + } + + pod, err := c.movePodFromEndOfLifeNode(masterCandidatePod) + if err != nil { + return fmt.Errorf("could not move pod: %v", err) + } + + masterCandidateName := util.NameFromMeta(pod.ObjectMeta) + if err := c.ManualFailover(oldMaster, masterCandidateName); err != nil { + return fmt.Errorf("could not failover to pod %q: %v", masterCandidateName, err) + } + + _, err = c.movePodFromEndOfLifeNode(oldMaster) + if err != nil { + return fmt.Errorf("could not move pod: %v", err) + } + + return nil +} + +// MigrateReplicaPod recreates pod on a new node +func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName string) error { + replicaPod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("could not get pod: %v", err) + } + + c.logger.Infof("migrating replica pod %q", podName) + + if replicaPod.Spec.NodeName != fromNodeName { + c.logger.Infof("pod %q has already migrated to node %q", podName, replicaPod.Spec.NodeName) + return nil + } + + if role := PostgresRole(replicaPod.Labels[c.OpConfig.PodRoleLabel]); role != Replica { + return fmt.Errorf("pod %q is not a replica", podName) + } + + _, err = c.movePodFromEndOfLifeNode(replicaPod) + if err != nil { + return fmt.Errorf("could not move pod: %v", err) + } + + return nil +} + +func (c *Cluster) recreatePod(podName spec.NamespacedName) error { ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) @@ -127,7 +256,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) error { if err := c.waitForPodLabel(ch, nil); err != nil { return err } - c.logger.Infof("pod %q is ready", podName) + c.logger.Infof("pod %q has been recreated", podName) return nil } @@ -184,3 +313,12 @@ func (c *Cluster) recreatePods() error { return nil } + +func (c *Cluster) podIsEndOfLife(pod *v1.Pod) (bool, error) { + node, err := c.KubeClient.Nodes().Get(pod.Spec.NodeName, metav1.GetOptions{}) + if err != nil { + return false, err + } + return node.Spec.Unschedulable || util.MapContains(node.Labels, c.OpConfig.EOLNodeLabel), nil + +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 0fc29b2ea..ce8e80f56 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -9,6 +9,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" + policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -61,10 +62,21 @@ func (c *Cluster) loadResources() error { c.logger.Errorf("could not get statefulset: %v", err) } + pdb, err := c.KubeClient.PodDisruptionBudgets(ns).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}) + if err == nil { + c.PodDisruptionBudget = pdb + } else if !k8sutil.ResourceNotFound(err) { + c.logger.Errorf("could not get pod disruption budget: %v", err) + } + return nil } func (c *Cluster) listResources() error { + if c.PodDisruptionBudget != nil { + c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) + } + if c.Statefulset != nil { c.logger.Infof("found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) } @@ -400,6 +412,54 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { return endpoints, nil } +func (c *Cluster) createPodDisruptionBudget() (*policybeta1.PodDisruptionBudget, error) { + if c.PodDisruptionBudget != nil { + return nil, fmt.Errorf("pod disruption budget already exists in the cluster") + } + podDisruptionBudgetSpec := c.generatePodDisruptionBudget() + podDisruptionBudget, err := c.KubeClient. + PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). + Create(podDisruptionBudgetSpec) + + if err != nil { + return nil, err + } + c.PodDisruptionBudget = podDisruptionBudget + + return podDisruptionBudget, nil +} + +func (c *Cluster) updatePodDisruptionBudget(pdb *policybeta1.PodDisruptionBudget) error { + if c.podEventsQueue == nil { + return fmt.Errorf("there is no pod disruption budget in the cluster") + } + + newPdb, err := c.KubeClient.PodDisruptionBudgets(pdb.Namespace).Update(pdb) + if err != nil { + return fmt.Errorf("could not update pod disruption budget: %v", err) + } + c.PodDisruptionBudget = newPdb + + return nil +} + +func (c *Cluster) deletePodDisruptionBudget() error { + c.logger.Debug("deleting pod disruption budget") + if c.PodDisruptionBudget == nil { + return fmt.Errorf("there is no pod disruption budget in the cluster") + } + err := c.KubeClient. + PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). + Delete(c.PodDisruptionBudget.Namespace, c.deleteOptions) + if err != nil { + return fmt.Errorf("could not delete pod disruption budget: %v", err) + } + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) + c.PodDisruptionBudget = nil + + return nil +} + func (c *Cluster) deleteEndpoint() error { c.setProcessName("deleting endpoint") c.logger.Debugln("deleting endpoint") @@ -408,7 +468,7 @@ func (c *Cluster) deleteEndpoint() error { } err := c.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, c.deleteOptions) if err != nil { - return err + return fmt.Errorf("could not delete endpoint: %v", err) } c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) c.Endpoint = nil @@ -492,3 +552,8 @@ func (c *Cluster) GetEndpoint() *v1.Endpoints { func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet { return c.Statefulset } + +// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget +func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { + return c.PodDisruptionBudget +} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 56794cf73..dd232cff8 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -2,6 +2,9 @@ package cluster import ( "fmt" + "reflect" + + policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -95,6 +98,12 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { return } + c.logger.Debug("syncing pod disruption budgets") + if err = c.syncPodDisruptionBudget(); err != nil { + err = fmt.Errorf("could not sync pod disruption budget: %v", err) + return + } + return } @@ -140,6 +149,20 @@ func (c *Cluster) syncEndpoint() error { return nil } +func (c *Cluster) syncPodDisruptionBudget() error { + if c.PodDisruptionBudget == nil { + c.logger.Infof("could not find the cluster's pod disruption budget") + pdb, err := c.createPodDisruptionBudget() + if err != nil { + return fmt.Errorf("could not create pod disruption budget: %v", err) + } + c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) + return nil + } + + return nil +} + func (c *Cluster) syncStatefulSet() error { cSpec := c.Spec var rollUpdate bool @@ -248,6 +271,17 @@ func (c *Cluster) syncVolumes() error { if err := c.resizeVolumes(c.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { return fmt.Errorf("could not sync volumes: %v", err) } + c.logger.Infof("volumes have been synced successfully") + return nil } + +func (c *Cluster) samePDBWith(pdb *policybeta1.PodDisruptionBudget) (match bool, reason string) { + match = reflect.DeepEqual(pdb.Spec, c.PodDisruptionBudget.Spec) + if !match { + reason = "new service spec doesn't match the current one" + } + + return +} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index c4814ee6a..a479e3211 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -11,6 +11,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" + policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -97,11 +98,21 @@ func metadataAnnotationsPatch(annotations map[string]string) string { return fmt.Sprintf(constants.ServiceMetadataAnnotationReplaceFormat, annotationsString) } -func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { +func (c *Cluster) logPDBChanges(old, new *policybeta1.PodDisruptionBudget, isUpdate bool, reason string) { if isUpdate { - c.logger.Infof("statefulset %q has been changed", + c.logger.Infof("pod disruption budget %q has been changed", util.NameFromMeta(old.ObjectMeta)) + } else { + c.logger.Infof("pod disruption budget %q is not in the desired state and needs to be updated", util.NameFromMeta(old.ObjectMeta), ) + } + + c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) +} + +func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { + if isUpdate { + c.logger.Infof("statefulset %q has been changed", util.NameFromMeta(old.ObjectMeta)) } else { c.logger.Infof("statefulset %q is not in the desired state and needs to be updated", util.NameFromMeta(old.ObjectMeta), @@ -340,15 +351,11 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st return c.OpConfig.SecretNameTemplate.Format( "username", strings.Replace(username, "_", "-", -1), - "clustername", clusterName, + "cluster", clusterName, "tprkind", constants.CRDKind, "tprgroup", constants.CRDGroup) } -func (c *Cluster) podSpiloRole(pod *v1.Pod) PostgresRole { - return PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) -} - func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName { return replicas[rand.Intn(len(replicas))] } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ad2077188..7992e4d16 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -41,6 +41,7 @@ type Controller struct { postgresqlInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer + nodesInformer cache.SharedIndexInformer podCh chan spec.PodEvent clusterEventQueues []*cache.FIFO // [workerID]Queue @@ -111,6 +112,7 @@ func (c *Controller) initOperatorConfig() { func (c *Controller) initController() { c.initClients() c.initOperatorConfig() + c.initSharedInformers() c.logger.Infof("config: %s", c.opConfig.MustMarshal()) @@ -128,6 +130,23 @@ func (c *Controller) initController() { c.config.InfrastructureRoles = infraRoles } + c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) + c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers) + for i := range c.clusterEventQueues { + c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) { + e, ok := obj.(spec.ClusterEvent) + if !ok { + return "", fmt.Errorf("could not cast to ClusterEvent") + } + + return queueClusterKey(e.EventType, e.UID), nil + }) + } + + c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger) +} + +func (c *Controller) initSharedInformers() { // Postgresqls c.postgresqlInformer = cache.NewSharedIndexInformer( &cache.ListWatch{ @@ -162,31 +181,35 @@ func (c *Controller) initController() { DeleteFunc: c.podDelete, }) - c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) - c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers) - for i := range c.clusterEventQueues { - c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) { - e, ok := obj.(spec.ClusterEvent) - if !ok { - return "", fmt.Errorf("could not cast to ClusterEvent") - } - - return queueClusterKey(e.EventType, e.UID), nil - }) + // Kubernetes Nodes + nodeLw := &cache.ListWatch{ + ListFunc: c.nodeListFunc, + WatchFunc: c.nodeWatchFunc, } - c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger) + c.nodesInformer = cache.NewSharedIndexInformer( + nodeLw, + &v1.Node{}, + constants.QueueResyncPeriodNode, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + c.nodesInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.nodeAdd, + UpdateFunc: c.nodeUpdate, + DeleteFunc: c.nodeDelete, + }) } // Run starts background controller processes func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { c.initController() - wg.Add(4) + wg.Add(5) go c.runPodInformer(stopCh, wg) go c.runPostgresqlInformer(stopCh, wg) go c.clusterResync(stopCh, wg) go c.apiserver.Run(stopCh, wg) + go c.kubeNodesInformer(stopCh, wg) for i := range c.clusterEventQueues { wg.Add(1) @@ -212,3 +235,9 @@ func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.Wait func queueClusterKey(eventType spec.EventType, uid types.UID) string { return fmt.Sprintf("%s-%s", eventType, uid) } + +func (c *Controller) kubeNodesInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + + c.nodesInformer.Run(stopCh) +} diff --git a/pkg/controller/node.go b/pkg/controller/node.go new file mode 100644 index 000000000..abe74da05 --- /dev/null +++ b/pkg/controller/node.go @@ -0,0 +1,162 @@ +package controller + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/pkg/api/v1" + + "github.com/zalando-incubator/postgres-operator/pkg/cluster" + "github.com/zalando-incubator/postgres-operator/pkg/util" +) + +func (c *Controller) nodeListFunc(options metav1.ListOptions) (runtime.Object, error) { + opts := metav1.ListOptions{ + Watch: options.Watch, + ResourceVersion: options.ResourceVersion, + TimeoutSeconds: options.TimeoutSeconds, + } + + return c.KubeClient.Nodes().List(opts) +} + +func (c *Controller) nodeWatchFunc(options metav1.ListOptions) (watch.Interface, error) { + opts := metav1.ListOptions{ + Watch: options.Watch, + ResourceVersion: options.ResourceVersion, + TimeoutSeconds: options.TimeoutSeconds, + } + + return c.KubeClient.Nodes().Watch(opts) +} + +func (c *Controller) nodeAdd(obj interface{}) { + node, ok := obj.(*v1.Node) + if !ok { + return + } + + c.logger.Debugf("new node has been added: %q (%s)", util.NameFromMeta(node.ObjectMeta), node.Spec.ProviderID) +} + +func (c *Controller) nodeUpdate(prev, cur interface{}) { + nodePrev, ok := prev.(*v1.Node) + if !ok { + return + } + + nodeCur, ok := cur.(*v1.Node) + if !ok { + return + } + + if util.MapContains(nodeCur.Labels, map[string]string{"master": "true"}) { + return + } + + if nodePrev.Spec.Unschedulable && util.MapContains(nodePrev.Labels, c.opConfig.EOLNodeLabel) || + !nodeCur.Spec.Unschedulable || !util.MapContains(nodeCur.Labels, c.opConfig.EOLNodeLabel) { + return + } + + c.logger.Infof("node %q became unschedulable and has EOL labels: %q", util.NameFromMeta(nodeCur.ObjectMeta), + c.opConfig.EOLNodeLabel) + + opts := metav1.ListOptions{ + LabelSelector: labels.Set(c.opConfig.ClusterLabels).String(), + } + podList, err := c.KubeClient.Pods(c.opConfig.Namespace).List(opts) + if err != nil { + c.logger.Errorf("could not fetch list of the pods: %v", err) + return + } + + nodePods := make([]*v1.Pod, 0) + for i, pod := range podList.Items { + if pod.Spec.NodeName == nodeCur.Name { + nodePods = append(nodePods, &podList.Items[i]) + } + } + + clusters := make(map[*cluster.Cluster]bool) + masterPods := make(map[*v1.Pod]*cluster.Cluster) + replicaPods := make(map[*v1.Pod]*cluster.Cluster) + movedPods := 0 + for _, pod := range nodePods { + podName := util.NameFromMeta(pod.ObjectMeta) + + role, ok := pod.Labels[c.opConfig.PodRoleLabel] + if !ok { + c.logger.Warningf("could not move pod %q: pod has no role", podName) + continue + } + + clusterName := c.podClusterName(pod) + + c.clustersMu.RLock() + cl, ok := c.clusters[clusterName] + c.clustersMu.RUnlock() + if !ok { + c.logger.Warningf("could not move pod %q: pod does not belong to a known cluster", podName) + continue + } + + movedPods++ + + if !clusters[cl] { + clusters[cl] = true + } + + if cluster.PostgresRole(role) == cluster.Master { + masterPods[pod] = cl + } else { + replicaPods[pod] = cl + } + } + + for cl := range clusters { + cl.Lock() + } + + for pod, cl := range masterPods { + podName := util.NameFromMeta(pod.ObjectMeta) + + if err := cl.MigrateMasterPod(podName); err != nil { + c.logger.Errorf("could not move master pod %q: %v", podName, err) + movedPods-- + } + } + + for pod, cl := range replicaPods { + podName := util.NameFromMeta(pod.ObjectMeta) + + if err := cl.MigrateReplicaPod(podName, nodeCur.Name); err != nil { + c.logger.Errorf("could not move replica pod %q: %v", podName, err) + movedPods-- + } + } + + for cl := range clusters { + cl.Unlock() + } + + totalPods := len(nodePods) + + c.logger.Infof("%d/%d pods have been moved out from the %q node", + movedPods, totalPods, util.NameFromMeta(nodeCur.ObjectMeta)) + + if leftPods := totalPods - movedPods; leftPods > 0 { + c.logger.Warnf("could not move %d/%d pods from the %q node", + leftPods, totalPods, util.NameFromMeta(nodeCur.ObjectMeta)) + } +} + +func (c *Controller) nodeDelete(obj interface{}) { + node, ok := obj.(*v1.Node) + if !ok { + return + } + + c.logger.Debugf("node has been deleted: %q (%s)", util.NameFromMeta(node.ObjectMeta), node.Spec.ProviderID) +} diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index b37ea14a9..24b9f8687 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -11,12 +11,7 @@ import ( ) func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, error) { - var labelSelector string - var fieldSelector string - opts := metav1.ListOptions{ - LabelSelector: labelSelector, - FieldSelector: fieldSelector, Watch: options.Watch, ResourceVersion: options.ResourceVersion, TimeoutSeconds: options.TimeoutSeconds, @@ -26,12 +21,7 @@ func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, er } func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, error) { - var labelSelector string - var fieldSelector string - opts := metav1.ListOptions{ - LabelSelector: labelSelector, - FieldSelector: fieldSelector, Watch: options.Watch, ResourceVersion: options.ResourceVersion, TimeoutSeconds: options.TimeoutSeconds, diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 539b50e20..0b6114f8b 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -189,7 +189,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { lg.Infoln("update of the cluster started") if !clusterFound { - lg.Warnln("cluster does not exist") + lg.Warningln("cluster does not exist") return } c.curWorkerCluster.Store(event.WorkerID, cl) diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 2147f1824..d49ca7b97 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" + policyv1beta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" "k8s.io/client-go/rest" ) @@ -94,12 +95,13 @@ type Process struct { // ClusterStatus describes status of the cluster type ClusterStatus struct { - Team string - Cluster string - MasterService *v1.Service - ReplicaService *v1.Service - Endpoint *v1.Endpoints - StatefulSet *v1beta1.StatefulSet + Team string + Cluster string + MasterService *v1.Service + ReplicaService *v1.Service + Endpoint *v1.Endpoints + StatefulSet *v1beta1.StatefulSet + PodDisruptionBudget *policyv1beta1.PodDisruptionBudget CurrentProcess Process Worker uint32 diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index fccd27c4c..93cbaae34 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -17,22 +17,24 @@ type CRD struct { // Resources describes kubernetes resource specific configuration parameters type Resources struct { - ResourceCheckInterval time.Duration `name:"resource_check_interval" default:"3s"` - ResourceCheckTimeout time.Duration `name:"resource_check_timeout" default:"10m"` - PodLabelWaitTimeout time.Duration `name:"pod_label_wait_timeout" default:"10m"` - PodDeletionWaitTimeout time.Duration `name:"pod_deletion_wait_timeout" default:"10m"` - ClusterLabels map[string]string `name:"cluster_labels" default:"application:spilo"` - ClusterNameLabel string `name:"cluster_name_label" default:"cluster-name"` - PodRoleLabel string `name:"pod_role_label" default:"spilo-role"` - DefaultCPURequest string `name:"default_cpu_request" default:"100m"` - DefaultMemoryRequest string `name:"default_memory_request" default:"100Mi"` - DefaultCPULimit string `name:"default_cpu_limit" default:"3"` - DefaultMemoryLimit string `name:"default_memory_limit" default:"1Gi"` + ResourceCheckInterval time.Duration `name:"resource_check_interval" default:"3s"` + ResourceCheckTimeout time.Duration `name:"resource_check_timeout" default:"10m"` + PodLabelWaitTimeout time.Duration `name:"pod_label_wait_timeout" default:"10m"` + PodDeletionWaitTimeout time.Duration `name:"pod_deletion_wait_timeout" default:"10m"` + PodTerminateGracePeriod time.Duration `name:"pod_terminate_grace_period" default:"5m"` + ClusterLabels map[string]string `name:"cluster_labels" default:"application:spilo"` + ClusterNameLabel string `name:"cluster_name_label" default:"cluster-name"` + PodRoleLabel string `name:"pod_role_label" default:"spilo-role"` + DefaultCPURequest string `name:"default_cpu_request" default:"100m"` + DefaultMemoryRequest string `name:"default_memory_request" default:"100Mi"` + DefaultCPULimit string `name:"default_cpu_limit" default:"3"` + DefaultMemoryLimit string `name:"default_memory_limit" default:"1Gi"` + EOLNodeLabel map[string]string `name:"eol_node_label" default:"eol:true"` } // Auth describes authentication specific configuration parameters type Auth struct { - SecretNameTemplate stringTemplate `name:"secret_name_template" default:"{username}.{clustername}.credentials.{tprkind}.{tprgroup}"` + SecretNameTemplate stringTemplate `name:"secret_name_template" default:"{username}.{cluster}.credentials.{tprkind}.{tprgroup}"` PamRoleName string `name:"pam_role_name" default:"zalandos"` PamConfiguration string `name:"pam_configuration" default:"https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"` TeamsAPIUrl string `name:"teams_api_url" default:"https://teams.example.com/api/"` @@ -63,6 +65,7 @@ type Config struct { EnableLoadBalancer bool `name:"enable_load_balancer" default:"true"` MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` + PDBNameFormat stringTemplate `name:"pdb_name_format" default:"postgres-{cluster}-pdb"` Workers uint32 `name:"workers" default:"4"` APIPort int `name:"api_port" default:"8080"` RingLogLines int `name:"ring_log_lines" default:"100"` diff --git a/pkg/util/constants/kubernetes.go b/pkg/util/constants/kubernetes.go index 79c60cad2..7f25bb9e7 100644 --- a/pkg/util/constants/kubernetes.go +++ b/pkg/util/constants/kubernetes.go @@ -8,6 +8,7 @@ const ( StatefulsetDeletionInterval = 1 * time.Second StatefulsetDeletionTimeout = 30 * time.Second - QueueResyncPeriodPod = 5 * time.Minute - QueueResyncPeriodTPR = 5 * time.Minute + QueueResyncPeriodPod = 5 * time.Minute + QueueResyncPeriodTPR = 5 * time.Minute + QueueResyncPeriodNode = 5 * time.Minute ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 47aa291cf..fbfeb0098 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -11,6 +11,7 @@ import ( "k8s.io/client-go/kubernetes" v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + policyv1beta1 "k8s.io/client-go/kubernetes/typed/policy/v1beta1" "k8s.io/client-go/pkg/api" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -27,7 +28,9 @@ type KubernetesClient struct { v1core.PersistentVolumesGetter v1core.PersistentVolumeClaimsGetter v1core.ConfigMapsGetter + v1core.NodesGetter v1beta1.StatefulSetsGetter + policyv1beta1.PodDisruptionBudgetsGetter apiextbeta1.CustomResourceDefinitionsGetter RESTClient rest.Interface @@ -69,7 +72,9 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.ConfigMapsGetter = client.CoreV1() kubeClient.PersistentVolumeClaimsGetter = client.CoreV1() kubeClient.PersistentVolumesGetter = client.CoreV1() + kubeClient.NodesGetter = client.CoreV1() kubeClient.StatefulSetsGetter = client.AppsV1beta1() + kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1() kubeClient.RESTClient = client.CoreV1().RESTClient() cfg2 := *cfg