diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 3edb3a56b..065c7c336 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -316,7 +316,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp reasons = append(reasons, "new statefulset's container specification doesn't match the current one") } if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { - c.logger.Warningf("statefulset %q has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) return &compareStatefulsetResult{} } @@ -432,10 +431,8 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { defer c.mu.Unlock() c.setStatus(spec.ClusterStatusUpdating) - c.logger.Debugf("cluster update from version %q to %q", - c.ResourceVersion, newSpec.ResourceVersion) - /* Make sure we update when this function exists */ + /* Make sure we update when this function exits */ defer func() { c.Postgresql = *newSpec }() diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 77949e229..d562fa60b 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -11,12 +11,11 @@ import ( ) func (c *Cluster) listPods() ([]v1.Pod, error) { - ns := c.Namespace listOptions := metav1.ListOptions{ LabelSelector: c.labelsSet().String(), } - pods, err := c.KubeClient.Pods(ns).List(listOptions) + pods, err := c.KubeClient.Pods(c.Namespace).List(listOptions) if err != nil { return nil, fmt.Errorf("could not get list of pods: %v", err) } @@ -24,6 +23,27 @@ func (c *Cluster) listPods() ([]v1.Pod, error) { return pods.Items, nil } +func (c *Cluster) getRolePods(role PostgresRole) ([]v1.Pod, error) { + listOptions := metav1.ListOptions{ + LabelSelector: c.roleLabelsSet(role).String(), + } + + pods, err := c.KubeClient.Pods(c.Namespace).List(listOptions) + if err != nil { + return nil, fmt.Errorf("could not get list of pods: %v", err) + } + + if len(pods.Items) == 0 { + return nil, fmt.Errorf("no pods") + } + + if role == Master && len(pods.Items) > 1 { + return nil, fmt.Errorf("too many masters") + } + + return pods.Items, nil +} + func (c *Cluster) deletePods() error { c.logger.Debugln("deleting pods") pods, err := c.listPods() diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 80a5d02b4..0fc29b2ea 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -2,6 +2,8 @@ package cluster import ( "fmt" + "strconv" + "strings" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -119,6 +121,56 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { return statefulSet, nil } +func getPodIndex(podName string) (int32, error) { + parts := strings.Split(podName, "-") + if len(parts) == 0 { + return 0, fmt.Errorf("pod has no index part") + } + + postfix := parts[len(parts)-1] + res, err := strconv.ParseInt(postfix, 10, 32) + if err != nil { + return 0, fmt.Errorf("could not parse pod index: %v", err) + } + + return int32(res), nil +} + +func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error { + masterPod, err := c.getRolePods(Master) + if err != nil { + return fmt.Errorf("could not get master pod: %v", err) + } + + podNum, err := getPodIndex(masterPod[0].Name) + if err != nil { + return fmt.Errorf("could not get pod number: %v", err) + } + + //Check if scale down affects current master pod + if *newStatefulSet.Spec.Replicas >= podNum+1 { + return nil + } + + podName := fmt.Sprintf("%s-0", c.Statefulset.Name) + masterCandidatePod, err := c.KubeClient.Pods(c.OpConfig.Namespace).Get(podName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("could not get master candidate pod: %v", err) + } + + // some sanity check + if !util.MapContains(masterCandidatePod.Labels, c.OpConfig.ClusterLabels) || + !util.MapContains(masterCandidatePod.Labels, map[string]string{c.OpConfig.ClusterNameLabel: c.Name}) { + return fmt.Errorf("pod %q does not belong to cluster", podName) + } + + if err := c.patroni.Failover(&masterPod[0], masterCandidatePod.Name); err != nil { + return fmt.Errorf("could not failover: %v", err) + } + + return nil +} + func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { c.setProcessName("updating statefulset") if c.Statefulset == nil { @@ -126,6 +178,12 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { } statefulSetName := util.NameFromMeta(c.Statefulset.ObjectMeta) + //scale down + if *c.Statefulset.Spec.Replicas > *newStatefulSet.Spec.Replicas { + if err := c.preScaleDown(newStatefulSet); err != nil { + c.logger.Warningf("could not scale down: %v", err) + } + } c.logger.Debugf("updating statefulset") patchData, err := specPatch(newStatefulSet.Spec) diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index dfda7551f..b37ea14a9 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -45,10 +45,6 @@ func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event spe cluster, ok := c.clusters[clusterName] c.clustersMu.RUnlock() if ok { - c.logger.WithField("cluster-name", clusterName). - Debugf("sending %q event of pod %q to the cluster channel", - event.EventType, - event.PodName) cluster.ReceivePodEvent(event) } } diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index 6c254b41a..5789755aa 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -41,7 +41,7 @@ func New(logger *logrus.Entry) *Patroni { } } -func (p *Patroni) apiURL(masterPod *v1.Pod) string { +func apiURL(masterPod *v1.Pod) string { return fmt.Sprintf("http://%s:%d", masterPod.Status.PodIP, apiPort) } @@ -54,7 +54,7 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error { return fmt.Errorf("could not encode json: %v", err) } - request, err := http.NewRequest(http.MethodPost, p.apiURL(master)+failoverPath, buf) + request, err := http.NewRequest(http.MethodPost, apiURL(master)+failoverPath, buf) if err != nil { return fmt.Errorf("could not create request: %v", err) } @@ -77,4 +77,4 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error { } return nil -} +} \ No newline at end of file diff --git a/pkg/util/util.go b/pkg/util/util.go index 518603b57..c003ef477 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -103,3 +103,19 @@ func FindNamedStringSubmatch(r *regexp.Regexp, s string) map[string]string { return res } + +// MapContains returns true if and only if haystack contains all the keys from the needle with matching corresponding values +func MapContains(haystack, needle map[string]string) bool { + if len(haystack) < len(needle) { + return false + } + + for k, v := range needle { + v2, ok := haystack[k] + if !ok || v2 != v { + return false + } + } + + return true +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index b94852dfb..53ac13768 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -46,6 +46,18 @@ var substractTest = []struct { {[]string{"a", "b", "c", "d"}, []string{"a", "bb", "c", "d"}, []string{"b"}, false}, } +var mapContaintsTest = []struct { + inA map[string]string + inB map[string]string + out bool +}{ + {map[string]string{"1": "a", "2": "b", "3": "c", "4": "c"}, map[string]string{"1": "a", "2": "b", "3": "c"}, true}, + {map[string]string{"1": "a", "2": "b", "3": "c", "4": "c"}, map[string]string{"1": "a", "2": "b", "3": "d"}, false}, + {map[string]string{}, map[string]string{}, true}, + {map[string]string{"3": "c", "4": "c"}, map[string]string{"1": "a", "2": "b", "3": "c"}, false}, + {map[string]string{"3": "c", "4": "c"}, map[string]string{}, true}, +} + var substringMatch = []struct { inRegex *regexp.Regexp inStr string @@ -122,3 +134,12 @@ func TestFindNamedStringSubmatch(t *testing.T) { } } } + +func TestMapContains(t *testing.T) { + for _, tt := range mapContaintsTest { + res := MapContains(tt.inA, tt.inB) + if res != tt.out { + t.Errorf("MapContains expected: %#v, got: %#v", tt.out, res) + } + } +}