Skip syncing Pods
This commit is contained in:
		
							parent
							
								
									655f6dcadb
								
							
						
					
					
						commit
						db53134cbd
					
				|  | @ -115,7 +115,6 @@ bootstrap: | ||||||
| 		ImagePullPolicy: v1.PullAlways, | 		ImagePullPolicy: v1.PullAlways, | ||||||
| 		Resources: v1.ResourceRequirements{ | 		Resources: v1.ResourceRequirements{ | ||||||
| 			Requests: *resourceList, | 			Requests: *resourceList, | ||||||
| 			Limits: v1.ResourceList{}, |  | ||||||
| 		}, | 		}, | ||||||
| 		Ports: []v1.ContainerPort{ | 		Ports: []v1.ContainerPort{ | ||||||
| 			{ | 			{ | ||||||
|  |  | ||||||
|  | @ -190,9 +190,6 @@ func (c *Cluster) recreatePods() error { | ||||||
| 	var masterPod v1.Pod | 	var masterPod v1.Pod | ||||||
| 	for _, pod := range pods.Items { | 	for _, pod := range pods.Items { | ||||||
| 		role := util.PodSpiloRole(&pod) | 		role := util.PodSpiloRole(&pod) | ||||||
| 		if role == "" { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 
 | 
 | ||||||
| 		if role == constants.PodRoleMaster { | 		if role == constants.PodRoleMaster { | ||||||
| 			masterPod = pod | 			masterPod = pod | ||||||
|  |  | ||||||
|  | @ -68,19 +68,19 @@ func (c *Cluster) LoadResources() error { | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) ListResources() error { | func (c *Cluster) ListResources() error { | ||||||
| 	if c.Statefulset != nil { | 	if c.Statefulset != nil { | ||||||
| 		c.logger.Infof("StatefulSet: %s (uid: %s)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) | 		c.logger.Infof("Found StatefulSet: %s (uid: %s)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, obj := range c.Secrets { | 	for _, obj := range c.Secrets { | ||||||
| 		c.logger.Infof("Secret: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | 		c.logger.Infof("Found Secret: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if c.Endpoint != nil { | 	if c.Endpoint != nil { | ||||||
| 		c.logger.Infof("Endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) | 		c.logger.Infof("Found Endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if c.Service != nil { | 	if c.Service != nil { | ||||||
| 		c.logger.Infof("Service: %s (uid: %s)", util.NameFromMeta(c.Service.ObjectMeta), c.Service.UID) | 		c.logger.Infof("Found Service: %s (uid: %s)", util.NameFromMeta(c.Service.ObjectMeta), c.Service.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pods, err := c.listPods() | 	pods, err := c.listPods() | ||||||
|  | @ -89,7 +89,7 @@ func (c *Cluster) ListResources() error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, obj := range pods { | 	for _, obj := range pods { | ||||||
| 		c.logger.Infof("Pod: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | 		c.logger.Infof("Found Pod: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pvcs, err := c.listPersistentVolumeClaims() | 	pvcs, err := c.listPersistentVolumeClaims() | ||||||
|  | @ -98,7 +98,7 @@ func (c *Cluster) ListResources() error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, obj := range pvcs { | 	for _, obj := range pvcs { | ||||||
| 		c.logger.Infof("PVC: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | 		c.logger.Infof("Found PVC: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
|  |  | ||||||
|  | @ -3,11 +3,7 @@ package cluster | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 
 | 
 | ||||||
| 	"k8s.io/client-go/pkg/api/v1" |  | ||||||
| 
 |  | ||||||
| 	"github.bus.zalan.do/acid/postgres-operator/pkg/spec" |  | ||||||
| 	"github.bus.zalan.do/acid/postgres-operator/pkg/util" | 	"github.bus.zalan.do/acid/postgres-operator/pkg/util" | ||||||
| 	"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) SyncCluster() { | func (c *Cluster) SyncCluster() { | ||||||
|  | @ -30,11 +26,6 @@ func (c *Cluster) SyncCluster() { | ||||||
| 	if err := c.syncStatefulSet(); err != nil { | 	if err := c.syncStatefulSet(); err != nil { | ||||||
| 		c.logger.Errorf("Can't sync StatefulSets: %s", err) | 		c.logger.Errorf("Can't sync StatefulSets: %s", err) | ||||||
| 	} | 	} | ||||||
| 
 |  | ||||||
| 	c.logger.Debugf("Syncing Pods") |  | ||||||
| 	if err := c.syncPods(); err != nil { |  | ||||||
| 		c.logger.Errorf("Can't sync Pods: %s", err) |  | ||||||
| 	} |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) syncSecrets() error { | func (c *Cluster) syncSecrets() error { | ||||||
|  | @ -132,55 +123,3 @@ func (c *Cluster) syncStatefulSet() error { | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 |  | ||||||
| func (c *Cluster) syncPods() error { |  | ||||||
| 	curSs := c.Statefulset |  | ||||||
| 
 |  | ||||||
| 	ls := c.labelsSet() |  | ||||||
| 	namespace := c.Metadata.Namespace |  | ||||||
| 
 |  | ||||||
| 	listOptions := v1.ListOptions{ |  | ||||||
| 		LabelSelector: ls.String(), |  | ||||||
| 	} |  | ||||||
| 	pods, err := c.KubeClient.Pods(namespace).List(listOptions) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("Can't get list of Pods: %s", err) |  | ||||||
| 	} |  | ||||||
| 	if int32(len(pods.Items)) != *curSs.Spec.Replicas { |  | ||||||
| 		//TODO: wait for Pods being created by StatefulSet
 |  | ||||||
| 		return fmt.Errorf("Number of existing Pods does not match number of replicas of the StatefulSet") |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	//First check if we have left overs from the previous rolling update
 |  | ||||||
| 	for _, pod := range pods.Items { |  | ||||||
| 		podName := spec.PodName{ |  | ||||||
| 			Namespace: pod.Namespace, |  | ||||||
| 			Name:      pod.Name, |  | ||||||
| 		} |  | ||||||
| 		match, _ := podMatchesTemplate(&pod, curSs) |  | ||||||
| 		if match && pod.Status.Phase == v1.PodPending { |  | ||||||
| 			c.logger.Infof("Waiting for left over Pod '%s'", podName) |  | ||||||
| 			ch := c.registerPodSubscriber(podName) |  | ||||||
| 			c.waitForPodLabel(ch) |  | ||||||
| 			c.unregisterPodSubscriber(podName) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	for _, pod := range pods.Items { |  | ||||||
| 		if match, reason := podMatchesTemplate(&pod, curSs); match { |  | ||||||
| 			c.logger.Infof("Pod '%s' matches StatefulSet pod template", util.NameFromMeta(pod.ObjectMeta)) |  | ||||||
| 			continue |  | ||||||
| 		} else { |  | ||||||
| 			c.logPodChanges(&pod, curSs, reason) |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if util.PodSpiloRole(&pod) == constants.PodRoleMaster { |  | ||||||
| 			//TODO: do manual failover first
 |  | ||||||
| 		} |  | ||||||
| 		err = c.recreatePod(pod) |  | ||||||
| 
 |  | ||||||
| 		c.logger.Infof("Pod '%s' has been successfully recreated", util.NameFromMeta(pod.ObjectMeta)) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  | @ -3,7 +3,6 @@ package cluster | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"reflect" |  | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | @ -45,40 +44,6 @@ func normalizeUserFlags(userFlags []string) (flags []string, err error) { | ||||||
| 	return | 	return | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func podMatchesTemplate(pod *v1.Pod, ss *v1beta1.StatefulSet) (match bool, reason string) { |  | ||||||
| 	//TODO: improve me
 |  | ||||||
| 	match = false |  | ||||||
| 	reason = "" |  | ||||||
| 	if len(pod.Spec.Containers) != 1 { |  | ||||||
| 		reason = "new pod defines more than one container" |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	container := pod.Spec.Containers[0] |  | ||||||
| 	ssContainer := ss.Spec.Template.Spec.Containers[0] |  | ||||||
| 
 |  | ||||||
| 	switch { |  | ||||||
| 	case container.Image != ssContainer.Image: |  | ||||||
| 		{ |  | ||||||
| 			reason = "new pod's container image doesn't match the current one" |  | ||||||
| 		} |  | ||||||
| 	case !reflect.DeepEqual(container.Env, ssContainer.Env): |  | ||||||
| 		{ |  | ||||||
| 			reason = "new pod's container environment doesn't match the current one" |  | ||||||
| 		} |  | ||||||
| 	case !reflect.DeepEqual(container.Ports, ssContainer.Ports): |  | ||||||
| 		{ |  | ||||||
| 			reason = "new pod's container ports don't match the current ones" |  | ||||||
| 		} |  | ||||||
| 	case !reflect.DeepEqual(container.Resources, ssContainer.Resources): |  | ||||||
| 		{ |  | ||||||
| 			reason = "new pod's container resources don't match the current ones" |  | ||||||
| 		} |  | ||||||
| 	default: |  | ||||||
| 		match = true |  | ||||||
| 	} |  | ||||||
| 	return |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reason string) { | func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reason string) { | ||||||
| 	if isUpdate { | 	if isUpdate { | ||||||
| 		c.logger.Infof("StatefulSet '%s' has been changed", | 		c.logger.Infof("StatefulSet '%s' has been changed", | ||||||
|  | @ -121,21 +86,6 @@ func (c *Cluster) logVolumeChanges(old, new spec.Volume, reason string) { | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) logPodChanges(pod *v1.Pod, statefulset *v1beta1.StatefulSet, reason string) { |  | ||||||
| 	c.logger.Infof("Pod'%s does not match the StatefulSet's Pod template and needs to be recreated", |  | ||||||
| 		util.NameFromMeta(pod.ObjectMeta), |  | ||||||
| 	) |  | ||||||
| 
 |  | ||||||
| 	if len(pod.Spec.Containers) == 1 { |  | ||||||
| 		podContainer := pod.Spec.Containers[0] |  | ||||||
| 		templateContainer := statefulset.Spec.Template.Spec.Containers[0] |  | ||||||
| 		c.logger.Debugf("diff pod <-> statefulset\n%s", util.PrettyDiff(podContainer, templateContainer)) |  | ||||||
| 	} |  | ||||||
| 	if reason != "" { |  | ||||||
| 		c.logger.Infof("Reason: %s", reason) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Cluster) getTeamMembers() ([]string, error) { | func (c *Cluster) getTeamMembers() ([]string, error) { | ||||||
| 	if c.Spec.TeamId == "" { | 	if c.Spec.TeamId == "" { | ||||||
| 		return nil, fmt.Errorf("No teamId specified") | 		return nil, fmt.Errorf("No teamId specified") | ||||||
|  |  | ||||||
|  | @ -124,8 +124,6 @@ func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { | ||||||
| 			if subscriber, ok := c.clusters[event.ClusterName]; ok { | 			if subscriber, ok := c.clusters[event.ClusterName]; ok { | ||||||
| 				c.logger.Debugf("Sending %s event of Pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) | 				c.logger.Debugf("Sending %s event of Pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) | ||||||
| 				go subscriber.ReceivePodEvent(event) | 				go subscriber.ReceivePodEvent(event) | ||||||
| 			} else { |  | ||||||
| 				c.logger.Debugf("Skipping %s event of the '%s' pod", event.EventType, event.PodName) |  | ||||||
| 			} | 			} | ||||||
| 		case <-stopCh: | 		case <-stopCh: | ||||||
| 			return | 			return | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue