Kube cluster upgrade
This commit is contained in:
		
							parent
							
								
									1dbf259c76
								
							
						
					
					
						commit
						eba23279c8
					
				|  | @ -1,8 +1,8 @@ | |||
| hash: 42ffa063321a691ec1de30532989e66e81fb7a080d6d4867bbb2c9d7f2a008ce | ||||
| updated: 2017-10-06T15:06:00.742579+02:00 | ||||
| hash: bb2d336f3efb57376916e47b585ad55611b5023b79044ced59c762ea35427f19 | ||||
| updated: 2017-10-10T10:40:56.894070487+02:00 | ||||
| imports: | ||||
| - name: github.com/aws/aws-sdk-go | ||||
|   version: da415b5fa0ff3f91d4707348a8ea1be53f700c22 | ||||
|   version: 760741802ad40f49ae9fc4a69ef6706d2527d62e | ||||
|   subpackages: | ||||
|   - aws | ||||
|   - aws/awserr | ||||
|  |  | |||
|  | @ -17,10 +17,8 @@ import: | |||
|   version: release-1.7 | ||||
|   subpackages: | ||||
|   - pkg/api/errors | ||||
|   - pkg/api/meta | ||||
|   - pkg/api/resource | ||||
|   - pkg/apis/meta/v1 | ||||
|   - pkg/fields | ||||
|   - pkg/labels | ||||
|   - pkg/runtime | ||||
|   - pkg/runtime/schema | ||||
|  | @ -33,6 +31,10 @@ import: | |||
|   version: ^4.0.0 | ||||
|   subpackages: | ||||
|   - kubernetes | ||||
|   - kubernetes/scheme | ||||
|   - kubernetes/typed/apps/v1beta1 | ||||
|   - kubernetes/typed/core/v1 | ||||
|   - kubernetes/typed/extensions/v1beta1 | ||||
|   - pkg/api | ||||
|   - pkg/api/v1 | ||||
|   - pkg/apis/apps/v1beta1 | ||||
|  |  | |||
|  | @ -12,7 +12,7 @@ data: | |||
|   dns_name_format: '{cluster}.{team}.staging.{hostedzone}' | ||||
|   docker_image: registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4 | ||||
|   etcd_host: etcd-client.default.svc.cluster.local:2379 | ||||
|   secret_name_template: '{username}.{clustername}.credentials.{tprkind}.{tprgroup}' | ||||
|   secret_name_template: '{username}.{cluster}.credentials.{tprkind}.{tprgroup}' | ||||
|   infrastructure_roles_secret_name: postgresql-infrastructure-roles | ||||
|   oauth_token_secret_name: postgresql-operator | ||||
|   pam_configuration: | | ||||
|  | @ -37,3 +37,5 @@ data: | |||
|   ring_log_lines: "100" | ||||
|   cluster_history_entries: "1000" | ||||
|   pod_terminate_grace_period: 5m | ||||
|   pdb_name_format: "postgres-{cluster}-pdb" | ||||
|   eol_node_label: "eol:true" | ||||
|  |  | |||
|  | @ -12,7 +12,7 @@ spec: | |||
|       serviceAccountName: operator | ||||
|       containers: | ||||
|       - name: postgres-operator | ||||
|         image: pierone.example.com/acid/postgres-operator:0.1 | ||||
|         image: pierone.stups.zalan.do/acid/postgres-operator:workerassgn | ||||
|         imagePullPolicy: IfNotPresent | ||||
|         env: | ||||
|         - name: WATCH_NAMESPACE | ||||
|  |  | |||
|  | @ -2,7 +2,7 @@ apiVersion: "acid.zalan.do/v1" | |||
| kind: postgresql | ||||
| 
 | ||||
| metadata: | ||||
|   name: acid-testcluster | ||||
|   name: acid-testcluster17 | ||||
| spec: | ||||
|   teamId: "ACID" | ||||
|   volume: | ||||
|  |  | |||
|  | @ -16,6 +16,7 @@ import ( | |||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
| 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | ||||
| 	policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" | ||||
| 	"k8s.io/client-go/rest" | ||||
| 	"k8s.io/client-go/tools/cache" | ||||
| 
 | ||||
|  | @ -44,10 +45,11 @@ type Config struct { | |||
| } | ||||
| 
 | ||||
| type kubeResources struct { | ||||
| 	Services    map[PostgresRole]*v1.Service | ||||
| 	Endpoint    *v1.Endpoints | ||||
| 	Secrets     map[types.UID]*v1.Secret | ||||
| 	Statefulset *v1beta1.StatefulSet | ||||
| 	Services            map[PostgresRole]*v1.Service | ||||
| 	Endpoint            *v1.Endpoints | ||||
| 	Secrets             map[types.UID]*v1.Secret | ||||
| 	Statefulset         *v1beta1.StatefulSet | ||||
| 	PodDisruptionBudget *policybeta1.PodDisruptionBudget | ||||
| 	//Pods are treated separately
 | ||||
| 	//PVCs are treated separately
 | ||||
| } | ||||
|  | @ -259,6 +261,12 @@ func (c *Cluster) Create() error { | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	pdb, err := c.createPodDisruptionBudget() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not create pod disruption budget: %v", err) | ||||
| 	} | ||||
| 	c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(pdb.ObjectMeta)) | ||||
| 
 | ||||
| 	err = c.listResources() | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("could not list resources: %v", err) | ||||
|  | @ -334,6 +342,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp | |||
| 		needsRollUpdate = true | ||||
| 		reasons = append(reasons, "new statefulset's terminationGracePeriodSeconds  doesn't match the current one") | ||||
| 	} | ||||
| 	if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.Affinity, statefulSet.Spec.Template.Spec.Affinity) { | ||||
| 		needsReplace = true | ||||
| 		needsRollUpdate = true | ||||
| 		reasons = append(reasons, "new statefulset's pod affinity doesn't match the current one") | ||||
| 	} | ||||
| 
 | ||||
| 	// Some generated fields like creationTimestamp make it not possible to use DeepCompare on Spec.Template.ObjectMeta
 | ||||
| 	if !reflect.DeepEqual(c.Statefulset.Spec.Template.Labels, statefulSet.Spec.Template.Labels) { | ||||
| 		needsReplace = true | ||||
|  | @ -522,6 +536,15 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { | |||
| 		c.logger.Infof("volumes have been updated successfully") | ||||
| 	} | ||||
| 
 | ||||
| 	newPDB := c.generatePodDisruptionBudget() | ||||
| 	if match, reason := c.samePDBWith(newPDB); !match { | ||||
| 		c.logPDBChanges(c.PodDisruptionBudget, newPDB, true, reason) | ||||
| 		if err := c.updatePodDisruptionBudget(newPDB); err != nil { | ||||
| 			c.setStatus(spec.ClusterStatusUpdateFailed) | ||||
| 			return fmt.Errorf("could not update pod disruption budget: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	c.setStatus(spec.ClusterStatusRunning) | ||||
| 
 | ||||
| 	return nil | ||||
|  | @ -555,6 +578,10 @@ func (c *Cluster) Delete() error { | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if err := c.deletePodDisruptionBudget(); err != nil { | ||||
| 		return fmt.Errorf("could not delete pod disruption budget: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
|  | @ -690,11 +717,12 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus { | |||
| 		Status:  c.Status, | ||||
| 		Spec:    c.Spec, | ||||
| 
 | ||||
| 		MasterService:  c.GetServiceMaster(), | ||||
| 		ReplicaService: c.GetServiceReplica(), | ||||
| 		Endpoint:       c.GetEndpoint(), | ||||
| 		StatefulSet:    c.GetStatefulSet(), | ||||
| 		CurrentProcess: c.GetCurrentProcess(), | ||||
| 		MasterService:       c.GetServiceMaster(), | ||||
| 		ReplicaService:      c.GetServiceReplica(), | ||||
| 		Endpoint:            c.GetEndpoint(), | ||||
| 		StatefulSet:         c.GetStatefulSet(), | ||||
| 		PodDisruptionBudget: c.GetPodDisruptionBudget(), | ||||
| 		CurrentProcess:      c.GetCurrentProcess(), | ||||
| 
 | ||||
| 		Error: c.Error, | ||||
| 	} | ||||
|  | @ -733,3 +761,13 @@ func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedNam | |||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Lock locks the cluster
 | ||||
| func (c *Cluster) Lock() { | ||||
| 	c.mu.Lock() | ||||
| } | ||||
| 
 | ||||
| // Unlock unlocks the cluster
 | ||||
| func (c *Cluster) Unlock() { | ||||
| 	c.mu.Unlock() | ||||
| } | ||||
|  |  | |||
|  | @ -10,6 +10,7 @@ import ( | |||
| 	"k8s.io/apimachinery/pkg/util/intstr" | ||||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
| 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | ||||
| 	policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" | ||||
| 
 | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util/constants" | ||||
|  | @ -67,6 +68,10 @@ func (c *Cluster) serviceName(role PostgresRole) string { | |||
| 	return name | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) podDisruptionBudgetName() string { | ||||
| 	return c.OpConfig.PDBNameFormat.Format("cluster", c.Spec.ClusterName) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) resourceRequirements(resources spec.Resources) (*v1.ResourceRequirements, error) { | ||||
| 	var err error | ||||
| 
 | ||||
|  | @ -226,6 +231,25 @@ PATRONI_INITDB_PARAMS: | |||
| 	return string(result) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) nodeAffinity() *v1.Affinity { | ||||
| 	matchExpressions := make([]v1.NodeSelectorRequirement, 0) | ||||
| 	for k, v := range c.OpConfig.EOLNodeLabel { | ||||
| 		matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{ | ||||
| 			Key:      k, | ||||
| 			Operator: v1.NodeSelectorOpNotIn, | ||||
| 			Values:   []string{v}, | ||||
| 		}) | ||||
| 	} | ||||
| 
 | ||||
| 	return &v1.Affinity{ | ||||
| 		NodeAffinity: &v1.NodeAffinity{ | ||||
| 			RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ | ||||
| 				NodeSelectorTerms: []v1.NodeSelectorTerm{{MatchExpressions: matchExpressions}}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequirements, | ||||
| 	pgParameters *spec.PostgresqlParam, | ||||
| 	patroniParameters *spec.Patroni, | ||||
|  | @ -347,6 +371,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme | |||
| 		ServiceAccountName:            c.OpConfig.ServiceAccountName, | ||||
| 		TerminationGracePeriodSeconds: &terminateGracePeriodSeconds, | ||||
| 		Containers:                    []v1.Container{container}, | ||||
| 		Affinity:                      c.nodeAffinity(), | ||||
| 	} | ||||
| 
 | ||||
| 	template := v1.PodTemplateSpec{ | ||||
|  | @ -573,6 +598,25 @@ func (c *Cluster) generateCloneEnvironment(description *spec.CloneDescription) [ | |||
| 	return result | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) generatePodDisruptionBudget() *policybeta1.PodDisruptionBudget { | ||||
| 	minAvailable := intstr.FromInt(1) | ||||
| 	matchLabels := c.OpConfig.ClusterLabels | ||||
| 	matchLabels[c.OpConfig.ClusterNameLabel] = c.Name | ||||
| 
 | ||||
| 	return &policybeta1.PodDisruptionBudget{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      c.podDisruptionBudgetName(), | ||||
| 			Namespace: c.Namespace, | ||||
| 		}, | ||||
| 		Spec: policybeta1.PodDisruptionBudgetSpec{ | ||||
| 			MinAvailable: &minAvailable, | ||||
| 			Selector: &metav1.LabelSelector{ | ||||
| 				MatchLabels: matchLabels, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // getClusterServiceConnectionParameters fetches cluster host name and port
 | ||||
| // TODO: perhaps we need to query the service (i.e. if non-standard port is used?)
 | ||||
| // TODO: handle clusters in different namespaces
 | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ package cluster | |||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"math/rand" | ||||
| 
 | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
|  | @ -71,7 +72,7 @@ func (c *Cluster) deletePods() error { | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) deletePod(podName spec.NamespacedName) error { | ||||
| 	c.setProcessName("deleting %q pod", podName) | ||||
| 	c.setProcessName("deleting pod %q", podName) | ||||
| 	ch := c.registerPodSubscriber(podName) | ||||
| 	defer c.unregisterPodSubscriber(podName) | ||||
| 
 | ||||
|  | @ -87,6 +88,7 @@ func (c *Cluster) deletePod(podName spec.NamespacedName) error { | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { | ||||
| 	c.logger.Debugf("unsubscribing from pod %q events", podName) | ||||
| 	c.podSubscribersMu.Lock() | ||||
| 	defer c.podSubscribersMu.Unlock() | ||||
| 
 | ||||
|  | @ -99,6 +101,7 @@ func (c *Cluster) unregisterPodSubscriber(podName spec.NamespacedName) { | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.PodEvent { | ||||
| 	c.logger.Debugf("subscribing to pod %q", podName) | ||||
| 	c.podSubscribersMu.Lock() | ||||
| 	defer c.podSubscribersMu.Unlock() | ||||
| 
 | ||||
|  | @ -111,9 +114,135 @@ func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.P | |||
| 	return ch | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) recreatePod(podName spec.NamespacedName) error { | ||||
| 	c.setProcessName("recreating %q pod", podName) | ||||
| func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) { | ||||
| 	podName := util.NameFromMeta(pod.ObjectMeta) | ||||
| 
 | ||||
| 	if eol, err := c.podIsEndOfLife(pod); err != nil { | ||||
| 		return nil, fmt.Errorf("could not get node %q: %v", pod.Spec.NodeName, err) | ||||
| 	} else if !eol { | ||||
| 		c.logger.Infof("pod %q is already on a live node", podName) | ||||
| 		return pod, nil | ||||
| 	} | ||||
| 
 | ||||
| 	c.setProcessName("moving pod %q out of end-of-life node %q", podName, pod.Spec.NodeName) | ||||
| 	c.logger.Infof("moving pod %q out of the end-of-life node %q", podName, pod.Spec.NodeName) | ||||
| 
 | ||||
| 	if err := c.recreatePod(podName); err != nil { | ||||
| 		return nil, fmt.Errorf("could not move pod: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	newPod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("could not get pod: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if newPod.Spec.NodeName == pod.Spec.NodeName { | ||||
| 		return nil, fmt.Errorf("pod %q remained on the same node", podName) | ||||
| 	} | ||||
| 
 | ||||
| 	if eol, err := c.podIsEndOfLife(newPod); err != nil { | ||||
| 		return nil, fmt.Errorf("could not get node %q: %v", pod.Spec.NodeName, err) | ||||
| 	} else if eol { | ||||
| 		c.logger.Warningf("pod %q moved to end-of-life node %q", podName, newPod.Spec.NodeName) | ||||
| 		return newPod, nil | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Infof("pod %q moved from node %q to node %q", podName, pod.Spec.NodeName, newPod.Spec.NodeName) | ||||
| 
 | ||||
| 	return newPod, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) { | ||||
| 	replicas, err := c.getRolePods(Replica) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("could not get replica pods: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	for i, pod := range replicas { | ||||
| 		// look for replicas running on live nodes. Ignore errors when querying the nodes.
 | ||||
| 		if pod.Spec.NodeName != oldNodeName { | ||||
| 			eol, err := c.podIsEndOfLife(&pod) | ||||
| 			if err == nil && !eol { | ||||
| 				return &replicas[i], nil | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	c.logger.Debug("no available master candidates on live nodes") | ||||
| 	return &replicas[rand.Intn(len(replicas))], nil | ||||
| } | ||||
| 
 | ||||
| // MigrateMasterPod migrates master pod via failover to a replica
 | ||||
| func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { | ||||
| 	oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get pod: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Infof("migrating master pod %q", podName) | ||||
| 
 | ||||
| 	if eol, err := c.podIsEndOfLife(oldMaster); err != nil { | ||||
| 		return fmt.Errorf("could not get node %q: %v", oldMaster.Spec.NodeName, err) | ||||
| 	} else if !eol { | ||||
| 		c.logger.Debugf("pod is already on a live node") | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	if role := PostgresRole(oldMaster.Labels[c.OpConfig.PodRoleLabel]); role != Master { | ||||
| 		c.logger.Warningf("pod %q is not a master", podName) | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	masterCandidatePod, err := c.masterCandidate(oldMaster.Spec.NodeName) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get new master candidate: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	pod, err := c.movePodFromEndOfLifeNode(masterCandidatePod) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not move pod: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	masterCandidateName := util.NameFromMeta(pod.ObjectMeta) | ||||
| 	if err := c.ManualFailover(oldMaster, masterCandidateName); err != nil { | ||||
| 		return fmt.Errorf("could not failover to pod %q: %v", masterCandidateName, err) | ||||
| 	} | ||||
| 
 | ||||
| 	_, err = c.movePodFromEndOfLifeNode(oldMaster) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not move pod: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // MigrateReplicaPod recreates pod on a new node
 | ||||
| func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName string) error { | ||||
| 	replicaPod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get pod: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Infof("migrating replica pod %q", podName) | ||||
| 
 | ||||
| 	if replicaPod.Spec.NodeName != fromNodeName { | ||||
| 		c.logger.Infof("pod %q has already migrated to node %q", podName, replicaPod.Spec.NodeName) | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	if role := PostgresRole(replicaPod.Labels[c.OpConfig.PodRoleLabel]); role != Replica { | ||||
| 		return fmt.Errorf("pod %q is not a replica", podName) | ||||
| 	} | ||||
| 
 | ||||
| 	_, err = c.movePodFromEndOfLifeNode(replicaPod) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not move pod: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) recreatePod(podName spec.NamespacedName) error { | ||||
| 	ch := c.registerPodSubscriber(podName) | ||||
| 	defer c.unregisterPodSubscriber(podName) | ||||
| 
 | ||||
|  | @ -127,7 +256,7 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) error { | |||
| 	if err := c.waitForPodLabel(ch, nil); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	c.logger.Infof("pod %q is ready", podName) | ||||
| 	c.logger.Infof("pod %q has been recreated", podName) | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
|  | @ -184,3 +313,12 @@ func (c *Cluster) recreatePods() error { | |||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) podIsEndOfLife(pod *v1.Pod) (bool, error) { | ||||
| 	node, err := c.KubeClient.Nodes().Get(pod.Spec.NodeName, metav1.GetOptions{}) | ||||
| 	if err != nil { | ||||
| 		return false, err | ||||
| 	} | ||||
| 	return node.Spec.Unschedulable || util.MapContains(node.Labels, c.OpConfig.EOLNodeLabel), nil | ||||
| 
 | ||||
| } | ||||
|  |  | |||
|  | @ -9,6 +9,7 @@ import ( | |||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
| 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | ||||
| 	policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" | ||||
| 
 | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util" | ||||
|  | @ -61,10 +62,21 @@ func (c *Cluster) loadResources() error { | |||
| 		c.logger.Errorf("could not get statefulset: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	pdb, err := c.KubeClient.PodDisruptionBudgets(ns).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}) | ||||
| 	if err == nil { | ||||
| 		c.PodDisruptionBudget = pdb | ||||
| 	} else if !k8sutil.ResourceNotFound(err) { | ||||
| 		c.logger.Errorf("could not get pod disruption budget: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) listResources() error { | ||||
| 	if c.PodDisruptionBudget != nil { | ||||
| 		c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) | ||||
| 	} | ||||
| 
 | ||||
| 	if c.Statefulset != nil { | ||||
| 		c.logger.Infof("found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) | ||||
| 	} | ||||
|  | @ -400,6 +412,54 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { | |||
| 	return endpoints, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) createPodDisruptionBudget() (*policybeta1.PodDisruptionBudget, error) { | ||||
| 	if c.PodDisruptionBudget != nil { | ||||
| 		return nil, fmt.Errorf("pod disruption budget already exists in the cluster") | ||||
| 	} | ||||
| 	podDisruptionBudgetSpec := c.generatePodDisruptionBudget() | ||||
| 	podDisruptionBudget, err := c.KubeClient. | ||||
| 		PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). | ||||
| 		Create(podDisruptionBudgetSpec) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	c.PodDisruptionBudget = podDisruptionBudget | ||||
| 
 | ||||
| 	return podDisruptionBudget, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) updatePodDisruptionBudget(pdb *policybeta1.PodDisruptionBudget) error { | ||||
| 	if c.podEventsQueue == nil { | ||||
| 		return fmt.Errorf("there is no pod disruption budget in the cluster") | ||||
| 	} | ||||
| 
 | ||||
| 	newPdb, err := c.KubeClient.PodDisruptionBudgets(pdb.Namespace).Update(pdb) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not update pod disruption budget: %v", err) | ||||
| 	} | ||||
| 	c.PodDisruptionBudget = newPdb | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) deletePodDisruptionBudget() error { | ||||
| 	c.logger.Debug("deleting pod disruption budget") | ||||
| 	if c.PodDisruptionBudget == nil { | ||||
| 		return fmt.Errorf("there is no pod disruption budget in the cluster") | ||||
| 	} | ||||
| 	err := c.KubeClient. | ||||
| 		PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). | ||||
| 		Delete(c.PodDisruptionBudget.Namespace, c.deleteOptions) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not delete pod disruption budget: %v", err) | ||||
| 	} | ||||
| 	c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) | ||||
| 	c.PodDisruptionBudget = nil | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) deleteEndpoint() error { | ||||
| 	c.setProcessName("deleting endpoint") | ||||
| 	c.logger.Debugln("deleting endpoint") | ||||
|  | @ -408,7 +468,7 @@ func (c *Cluster) deleteEndpoint() error { | |||
| 	} | ||||
| 	err := c.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, c.deleteOptions) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 		return fmt.Errorf("could not delete endpoint: %v", err) | ||||
| 	} | ||||
| 	c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) | ||||
| 	c.Endpoint = nil | ||||
|  | @ -492,3 +552,8 @@ func (c *Cluster) GetEndpoint() *v1.Endpoints { | |||
| func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet { | ||||
| 	return c.Statefulset | ||||
| } | ||||
| 
 | ||||
| // GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget
 | ||||
| func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { | ||||
| 	return c.PodDisruptionBudget | ||||
| } | ||||
|  |  | |||
|  | @ -2,6 +2,9 @@ package cluster | |||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 
 | ||||
| 	policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" | ||||
| 
 | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util" | ||||
|  | @ -95,6 +98,12 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { | |||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Debug("syncing pod disruption budgets") | ||||
| 	if err = c.syncPodDisruptionBudget(); err != nil { | ||||
| 		err = fmt.Errorf("could not sync pod disruption budget: %v", err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	return | ||||
| } | ||||
| 
 | ||||
|  | @ -140,6 +149,20 @@ func (c *Cluster) syncEndpoint() error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncPodDisruptionBudget() error { | ||||
| 	if c.PodDisruptionBudget == nil { | ||||
| 		c.logger.Infof("could not find the cluster's pod disruption budget") | ||||
| 		pdb, err := c.createPodDisruptionBudget() | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not create pod disruption budget: %v", err) | ||||
| 		} | ||||
| 		c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncStatefulSet() error { | ||||
| 	cSpec := c.Spec | ||||
| 	var rollUpdate bool | ||||
|  | @ -248,6 +271,17 @@ func (c *Cluster) syncVolumes() error { | |||
| 	if err := c.resizeVolumes(c.Spec.Volume, []volumes.VolumeResizer{&volumes.EBSVolumeResizer{}}); err != nil { | ||||
| 		return fmt.Errorf("could not sync volumes: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Infof("volumes have been synced successfully") | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) samePDBWith(pdb *policybeta1.PodDisruptionBudget) (match bool, reason string) { | ||||
| 	match = reflect.DeepEqual(pdb.Spec, c.PodDisruptionBudget.Spec) | ||||
| 	if !match { | ||||
| 		reason = "new service spec doesn't match the current one" | ||||
| 	} | ||||
| 
 | ||||
| 	return | ||||
| } | ||||
|  |  | |||
|  | @ -11,6 +11,7 @@ import ( | |||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
| 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | ||||
| 	policybeta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" | ||||
| 
 | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/spec" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util" | ||||
|  | @ -97,11 +98,21 @@ func metadataAnnotationsPatch(annotations map[string]string) string { | |||
| 	return fmt.Sprintf(constants.ServiceMetadataAnnotationReplaceFormat, annotationsString) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { | ||||
| func (c *Cluster) logPDBChanges(old, new *policybeta1.PodDisruptionBudget, isUpdate bool, reason string) { | ||||
| 	if isUpdate { | ||||
| 		c.logger.Infof("statefulset %q has been changed", | ||||
| 		c.logger.Infof("pod disruption budget %q has been changed", util.NameFromMeta(old.ObjectMeta)) | ||||
| 	} else { | ||||
| 		c.logger.Infof("pod disruption budget %q is not in the desired state and needs to be updated", | ||||
| 			util.NameFromMeta(old.ObjectMeta), | ||||
| 		) | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { | ||||
| 	if isUpdate { | ||||
| 		c.logger.Infof("statefulset %q has been changed", util.NameFromMeta(old.ObjectMeta)) | ||||
| 	} else { | ||||
| 		c.logger.Infof("statefulset %q is not in the desired state and needs to be updated", | ||||
| 			util.NameFromMeta(old.ObjectMeta), | ||||
|  | @ -340,15 +351,11 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st | |||
| 
 | ||||
| 	return c.OpConfig.SecretNameTemplate.Format( | ||||
| 		"username", strings.Replace(username, "_", "-", -1), | ||||
| 		"clustername", clusterName, | ||||
| 		"cluster", clusterName, | ||||
| 		"tprkind", constants.CRDKind, | ||||
| 		"tprgroup", constants.CRDGroup) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) podSpiloRole(pod *v1.Pod) PostgresRole { | ||||
| 	return PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) | ||||
| } | ||||
| 
 | ||||
| func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName { | ||||
| 	return replicas[rand.Intn(len(replicas))] | ||||
| } | ||||
|  |  | |||
|  | @ -41,6 +41,7 @@ type Controller struct { | |||
| 
 | ||||
| 	postgresqlInformer cache.SharedIndexInformer | ||||
| 	podInformer        cache.SharedIndexInformer | ||||
| 	nodesInformer      cache.SharedIndexInformer | ||||
| 	podCh              chan spec.PodEvent | ||||
| 
 | ||||
| 	clusterEventQueues  []*cache.FIFO // [workerID]Queue
 | ||||
|  | @ -111,6 +112,7 @@ func (c *Controller) initOperatorConfig() { | |||
| func (c *Controller) initController() { | ||||
| 	c.initClients() | ||||
| 	c.initOperatorConfig() | ||||
| 	c.initSharedInformers() | ||||
| 
 | ||||
| 	c.logger.Infof("config: %s", c.opConfig.MustMarshal()) | ||||
| 
 | ||||
|  | @ -128,6 +130,23 @@ func (c *Controller) initController() { | |||
| 		c.config.InfrastructureRoles = infraRoles | ||||
| 	} | ||||
| 
 | ||||
| 	c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) | ||||
| 	c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers) | ||||
| 	for i := range c.clusterEventQueues { | ||||
| 		c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) { | ||||
| 			e, ok := obj.(spec.ClusterEvent) | ||||
| 			if !ok { | ||||
| 				return "", fmt.Errorf("could not cast to ClusterEvent") | ||||
| 			} | ||||
| 
 | ||||
| 			return queueClusterKey(e.EventType, e.UID), nil | ||||
| 		}) | ||||
| 	} | ||||
| 
 | ||||
| 	c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) initSharedInformers() { | ||||
| 	// Postgresqls
 | ||||
| 	c.postgresqlInformer = cache.NewSharedIndexInformer( | ||||
| 		&cache.ListWatch{ | ||||
|  | @ -162,31 +181,35 @@ func (c *Controller) initController() { | |||
| 		DeleteFunc: c.podDelete, | ||||
| 	}) | ||||
| 
 | ||||
| 	c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) | ||||
| 	c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers) | ||||
| 	for i := range c.clusterEventQueues { | ||||
| 		c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) { | ||||
| 			e, ok := obj.(spec.ClusterEvent) | ||||
| 			if !ok { | ||||
| 				return "", fmt.Errorf("could not cast to ClusterEvent") | ||||
| 			} | ||||
| 
 | ||||
| 			return queueClusterKey(e.EventType, e.UID), nil | ||||
| 		}) | ||||
| 	// Kubernetes Nodes
 | ||||
| 	nodeLw := &cache.ListWatch{ | ||||
| 		ListFunc:  c.nodeListFunc, | ||||
| 		WatchFunc: c.nodeWatchFunc, | ||||
| 	} | ||||
| 
 | ||||
| 	c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger) | ||||
| 	c.nodesInformer = cache.NewSharedIndexInformer( | ||||
| 		nodeLw, | ||||
| 		&v1.Node{}, | ||||
| 		constants.QueueResyncPeriodNode, | ||||
| 		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) | ||||
| 
 | ||||
| 	c.nodesInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||||
| 		AddFunc:    c.nodeAdd, | ||||
| 		UpdateFunc: c.nodeUpdate, | ||||
| 		DeleteFunc: c.nodeDelete, | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| // Run starts background controller processes
 | ||||
| func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	c.initController() | ||||
| 
 | ||||
| 	wg.Add(4) | ||||
| 	wg.Add(5) | ||||
| 	go c.runPodInformer(stopCh, wg) | ||||
| 	go c.runPostgresqlInformer(stopCh, wg) | ||||
| 	go c.clusterResync(stopCh, wg) | ||||
| 	go c.apiserver.Run(stopCh, wg) | ||||
| 	go c.kubeNodesInformer(stopCh, wg) | ||||
| 
 | ||||
| 	for i := range c.clusterEventQueues { | ||||
| 		wg.Add(1) | ||||
|  | @ -212,3 +235,9 @@ func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.Wait | |||
| func queueClusterKey(eventType spec.EventType, uid types.UID) string { | ||||
| 	return fmt.Sprintf("%s-%s", eventType, uid) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) kubeNodesInformer(stopCh <-chan struct{}, wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
| 
 | ||||
| 	c.nodesInformer.Run(stopCh) | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,162 @@ | |||
| package controller | ||||
| 
 | ||||
| import ( | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/watch" | ||||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
| 
 | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/cluster" | ||||
| 	"github.com/zalando-incubator/postgres-operator/pkg/util" | ||||
| ) | ||||
| 
 | ||||
| func (c *Controller) nodeListFunc(options metav1.ListOptions) (runtime.Object, error) { | ||||
| 	opts := metav1.ListOptions{ | ||||
| 		Watch:           options.Watch, | ||||
| 		ResourceVersion: options.ResourceVersion, | ||||
| 		TimeoutSeconds:  options.TimeoutSeconds, | ||||
| 	} | ||||
| 
 | ||||
| 	return c.KubeClient.Nodes().List(opts) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) nodeWatchFunc(options metav1.ListOptions) (watch.Interface, error) { | ||||
| 	opts := metav1.ListOptions{ | ||||
| 		Watch:           options.Watch, | ||||
| 		ResourceVersion: options.ResourceVersion, | ||||
| 		TimeoutSeconds:  options.TimeoutSeconds, | ||||
| 	} | ||||
| 
 | ||||
| 	return c.KubeClient.Nodes().Watch(opts) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) nodeAdd(obj interface{}) { | ||||
| 	node, ok := obj.(*v1.Node) | ||||
| 	if !ok { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Debugf("new node has been added: %q (%s)", util.NameFromMeta(node.ObjectMeta), node.Spec.ProviderID) | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) nodeUpdate(prev, cur interface{}) { | ||||
| 	nodePrev, ok := prev.(*v1.Node) | ||||
| 	if !ok { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	nodeCur, ok := cur.(*v1.Node) | ||||
| 	if !ok { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	if util.MapContains(nodeCur.Labels, map[string]string{"master": "true"}) { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	if nodePrev.Spec.Unschedulable && util.MapContains(nodePrev.Labels, c.opConfig.EOLNodeLabel) || | ||||
| 		!nodeCur.Spec.Unschedulable || !util.MapContains(nodeCur.Labels, c.opConfig.EOLNodeLabel) { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Infof("node %q became unschedulable and has EOL labels: %q", util.NameFromMeta(nodeCur.ObjectMeta), | ||||
| 		c.opConfig.EOLNodeLabel) | ||||
| 
 | ||||
| 	opts := metav1.ListOptions{ | ||||
| 		LabelSelector: labels.Set(c.opConfig.ClusterLabels).String(), | ||||
| 	} | ||||
| 	podList, err := c.KubeClient.Pods(c.opConfig.Namespace).List(opts) | ||||
| 	if err != nil { | ||||
| 		c.logger.Errorf("could not fetch list of the pods: %v", err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	nodePods := make([]*v1.Pod, 0) | ||||
| 	for i, pod := range podList.Items { | ||||
| 		if pod.Spec.NodeName == nodeCur.Name { | ||||
| 			nodePods = append(nodePods, &podList.Items[i]) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	clusters := make(map[*cluster.Cluster]bool) | ||||
| 	masterPods := make(map[*v1.Pod]*cluster.Cluster) | ||||
| 	replicaPods := make(map[*v1.Pod]*cluster.Cluster) | ||||
| 	movedPods := 0 | ||||
| 	for _, pod := range nodePods { | ||||
| 		podName := util.NameFromMeta(pod.ObjectMeta) | ||||
| 
 | ||||
| 		role, ok := pod.Labels[c.opConfig.PodRoleLabel] | ||||
| 		if !ok { | ||||
| 			c.logger.Warningf("could not move pod %q: pod has no role", podName) | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		clusterName := c.podClusterName(pod) | ||||
| 
 | ||||
| 		c.clustersMu.RLock() | ||||
| 		cl, ok := c.clusters[clusterName] | ||||
| 		c.clustersMu.RUnlock() | ||||
| 		if !ok { | ||||
| 			c.logger.Warningf("could not move pod %q: pod does not belong to a known cluster", podName) | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		movedPods++ | ||||
| 
 | ||||
| 		if !clusters[cl] { | ||||
| 			clusters[cl] = true | ||||
| 		} | ||||
| 
 | ||||
| 		if cluster.PostgresRole(role) == cluster.Master { | ||||
| 			masterPods[pod] = cl | ||||
| 		} else { | ||||
| 			replicaPods[pod] = cl | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	for cl := range clusters { | ||||
| 		cl.Lock() | ||||
| 	} | ||||
| 
 | ||||
| 	for pod, cl := range masterPods { | ||||
| 		podName := util.NameFromMeta(pod.ObjectMeta) | ||||
| 
 | ||||
| 		if err := cl.MigrateMasterPod(podName); err != nil { | ||||
| 			c.logger.Errorf("could not move master pod %q: %v", podName, err) | ||||
| 			movedPods-- | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	for pod, cl := range replicaPods { | ||||
| 		podName := util.NameFromMeta(pod.ObjectMeta) | ||||
| 
 | ||||
| 		if err := cl.MigrateReplicaPod(podName, nodeCur.Name); err != nil { | ||||
| 			c.logger.Errorf("could not move replica pod %q: %v", podName, err) | ||||
| 			movedPods-- | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	for cl := range clusters { | ||||
| 		cl.Unlock() | ||||
| 	} | ||||
| 
 | ||||
| 	totalPods := len(nodePods) | ||||
| 
 | ||||
| 	c.logger.Infof("%d/%d pods have been moved out from the %q node", | ||||
| 		movedPods, totalPods, util.NameFromMeta(nodeCur.ObjectMeta)) | ||||
| 
 | ||||
| 	if leftPods := totalPods - movedPods; leftPods > 0 { | ||||
| 		c.logger.Warnf("could not move %d/%d pods from the %q node", | ||||
| 			leftPods, totalPods, util.NameFromMeta(nodeCur.ObjectMeta)) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Controller) nodeDelete(obj interface{}) { | ||||
| 	node, ok := obj.(*v1.Node) | ||||
| 	if !ok { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	c.logger.Debugf("node has been deleted: %q (%s)", util.NameFromMeta(node.ObjectMeta), node.Spec.ProviderID) | ||||
| } | ||||
|  | @ -11,12 +11,7 @@ import ( | |||
| ) | ||||
| 
 | ||||
| func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, error) { | ||||
| 	var labelSelector string | ||||
| 	var fieldSelector string | ||||
| 
 | ||||
| 	opts := metav1.ListOptions{ | ||||
| 		LabelSelector:   labelSelector, | ||||
| 		FieldSelector:   fieldSelector, | ||||
| 		Watch:           options.Watch, | ||||
| 		ResourceVersion: options.ResourceVersion, | ||||
| 		TimeoutSeconds:  options.TimeoutSeconds, | ||||
|  | @ -26,12 +21,7 @@ func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, er | |||
| } | ||||
| 
 | ||||
| func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, error) { | ||||
| 	var labelSelector string | ||||
| 	var fieldSelector string | ||||
| 
 | ||||
| 	opts := metav1.ListOptions{ | ||||
| 		LabelSelector:   labelSelector, | ||||
| 		FieldSelector:   fieldSelector, | ||||
| 		Watch:           options.Watch, | ||||
| 		ResourceVersion: options.ResourceVersion, | ||||
| 		TimeoutSeconds:  options.TimeoutSeconds, | ||||
|  |  | |||
|  | @ -189,7 +189,7 @@ func (c *Controller) processEvent(event spec.ClusterEvent) { | |||
| 		lg.Infoln("update of the cluster started") | ||||
| 
 | ||||
| 		if !clusterFound { | ||||
| 			lg.Warnln("cluster does not exist") | ||||
| 			lg.Warningln("cluster does not exist") | ||||
| 			return | ||||
| 		} | ||||
| 		c.curWorkerCluster.Store(event.WorkerID, cl) | ||||
|  |  | |||
|  | @ -10,6 +10,7 @@ import ( | |||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/client-go/pkg/api/v1" | ||||
| 	"k8s.io/client-go/pkg/apis/apps/v1beta1" | ||||
| 	policyv1beta1 "k8s.io/client-go/pkg/apis/policy/v1beta1" | ||||
| 	"k8s.io/client-go/rest" | ||||
| ) | ||||
| 
 | ||||
|  | @ -94,12 +95,13 @@ type Process struct { | |||
| 
 | ||||
| // ClusterStatus describes status of the cluster
 | ||||
| type ClusterStatus struct { | ||||
| 	Team           string | ||||
| 	Cluster        string | ||||
| 	MasterService  *v1.Service | ||||
| 	ReplicaService *v1.Service | ||||
| 	Endpoint       *v1.Endpoints | ||||
| 	StatefulSet    *v1beta1.StatefulSet | ||||
| 	Team                string | ||||
| 	Cluster             string | ||||
| 	MasterService       *v1.Service | ||||
| 	ReplicaService      *v1.Service | ||||
| 	Endpoint            *v1.Endpoints | ||||
| 	StatefulSet         *v1beta1.StatefulSet | ||||
| 	PodDisruptionBudget *policyv1beta1.PodDisruptionBudget | ||||
| 
 | ||||
| 	CurrentProcess Process | ||||
| 	Worker         uint32 | ||||
|  |  | |||
|  | @ -17,22 +17,24 @@ type CRD struct { | |||
| 
 | ||||
| // Resources describes kubernetes resource specific configuration parameters
 | ||||
| type Resources struct { | ||||
| 	ResourceCheckInterval  time.Duration     `name:"resource_check_interval" default:"3s"` | ||||
| 	ResourceCheckTimeout   time.Duration     `name:"resource_check_timeout" default:"10m"` | ||||
| 	PodLabelWaitTimeout    time.Duration     `name:"pod_label_wait_timeout" default:"10m"` | ||||
| 	PodDeletionWaitTimeout time.Duration     `name:"pod_deletion_wait_timeout" default:"10m"` | ||||
| 	ClusterLabels          map[string]string `name:"cluster_labels" default:"application:spilo"` | ||||
| 	ClusterNameLabel       string            `name:"cluster_name_label" default:"cluster-name"` | ||||
| 	PodRoleLabel           string            `name:"pod_role_label" default:"spilo-role"` | ||||
| 	DefaultCPURequest      string            `name:"default_cpu_request" default:"100m"` | ||||
| 	DefaultMemoryRequest   string            `name:"default_memory_request" default:"100Mi"` | ||||
| 	DefaultCPULimit        string            `name:"default_cpu_limit" default:"3"` | ||||
| 	DefaultMemoryLimit     string            `name:"default_memory_limit" default:"1Gi"` | ||||
| 	ResourceCheckInterval   time.Duration     `name:"resource_check_interval" default:"3s"` | ||||
| 	ResourceCheckTimeout    time.Duration     `name:"resource_check_timeout" default:"10m"` | ||||
| 	PodLabelWaitTimeout     time.Duration     `name:"pod_label_wait_timeout" default:"10m"` | ||||
| 	PodDeletionWaitTimeout  time.Duration     `name:"pod_deletion_wait_timeout" default:"10m"` | ||||
| 	PodTerminateGracePeriod time.Duration     `name:"pod_terminate_grace_period" default:"5m"` | ||||
| 	ClusterLabels           map[string]string `name:"cluster_labels" default:"application:spilo"` | ||||
| 	ClusterNameLabel        string            `name:"cluster_name_label" default:"cluster-name"` | ||||
| 	PodRoleLabel            string            `name:"pod_role_label" default:"spilo-role"` | ||||
| 	DefaultCPURequest       string            `name:"default_cpu_request" default:"100m"` | ||||
| 	DefaultMemoryRequest    string            `name:"default_memory_request" default:"100Mi"` | ||||
| 	DefaultCPULimit         string            `name:"default_cpu_limit" default:"3"` | ||||
| 	DefaultMemoryLimit      string            `name:"default_memory_limit" default:"1Gi"` | ||||
| 	EOLNodeLabel            map[string]string `name:"eol_node_label" default:"eol:true"` | ||||
| } | ||||
| 
 | ||||
| // Auth describes authentication specific configuration parameters
 | ||||
| type Auth struct { | ||||
| 	SecretNameTemplate            stringTemplate      `name:"secret_name_template" default:"{username}.{clustername}.credentials.{tprkind}.{tprgroup}"` | ||||
| 	SecretNameTemplate            stringTemplate      `name:"secret_name_template" default:"{username}.{cluster}.credentials.{tprkind}.{tprgroup}"` | ||||
| 	PamRoleName                   string              `name:"pam_role_name" default:"zalandos"` | ||||
| 	PamConfiguration              string              `name:"pam_configuration" default:"https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"` | ||||
| 	TeamsAPIUrl                   string              `name:"teams_api_url" default:"https://teams.example.com/api/"` | ||||
|  | @ -63,6 +65,7 @@ type Config struct { | |||
| 	EnableLoadBalancer    bool           `name:"enable_load_balancer" default:"true"` | ||||
| 	MasterDNSNameFormat   stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` | ||||
| 	ReplicaDNSNameFormat  stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` | ||||
| 	PDBNameFormat         stringTemplate `name:"pdb_name_format" default:"postgres-{cluster}-pdb"` | ||||
| 	Workers               uint32         `name:"workers" default:"4"` | ||||
| 	APIPort               int            `name:"api_port" default:"8080"` | ||||
| 	RingLogLines          int            `name:"ring_log_lines" default:"100"` | ||||
|  |  | |||
|  | @ -8,6 +8,7 @@ const ( | |||
| 	StatefulsetDeletionInterval = 1 * time.Second | ||||
| 	StatefulsetDeletionTimeout  = 30 * time.Second | ||||
| 
 | ||||
| 	QueueResyncPeriodPod = 5 * time.Minute | ||||
| 	QueueResyncPeriodTPR = 5 * time.Minute | ||||
| 	QueueResyncPeriodPod  = 5 * time.Minute | ||||
| 	QueueResyncPeriodTPR  = 5 * time.Minute | ||||
| 	QueueResyncPeriodNode = 5 * time.Minute | ||||
| ) | ||||
|  |  | |||
|  | @ -11,6 +11,7 @@ import ( | |||
| 	"k8s.io/client-go/kubernetes" | ||||
| 	v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" | ||||
| 	v1core "k8s.io/client-go/kubernetes/typed/core/v1" | ||||
| 	policyv1beta1 "k8s.io/client-go/kubernetes/typed/policy/v1beta1" | ||||
| 	"k8s.io/client-go/pkg/api" | ||||
| 	"k8s.io/client-go/rest" | ||||
| 	"k8s.io/client-go/tools/clientcmd" | ||||
|  | @ -27,7 +28,9 @@ type KubernetesClient struct { | |||
| 	v1core.PersistentVolumesGetter | ||||
| 	v1core.PersistentVolumeClaimsGetter | ||||
| 	v1core.ConfigMapsGetter | ||||
| 	v1core.NodesGetter | ||||
| 	v1beta1.StatefulSetsGetter | ||||
| 	policyv1beta1.PodDisruptionBudgetsGetter | ||||
| 	apiextbeta1.CustomResourceDefinitionsGetter | ||||
| 
 | ||||
| 	RESTClient rest.Interface | ||||
|  | @ -69,7 +72,9 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { | |||
| 	kubeClient.ConfigMapsGetter = client.CoreV1() | ||||
| 	kubeClient.PersistentVolumeClaimsGetter = client.CoreV1() | ||||
| 	kubeClient.PersistentVolumesGetter = client.CoreV1() | ||||
| 	kubeClient.NodesGetter = client.CoreV1() | ||||
| 	kubeClient.StatefulSetsGetter = client.AppsV1beta1() | ||||
| 	kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1() | ||||
| 	kubeClient.RESTClient = client.CoreV1().RESTClient() | ||||
| 
 | ||||
| 	cfg2 := *cfg | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue