handle case when Patroni returns that lag is unknown (#1724)
* handle case when Patroni returns that lag is unknown * remove some prints from e2e test
This commit is contained in:
parent
087c379687
commit
411abbe31e
|
|
@ -899,7 +899,6 @@ class EndToEndTestCase(unittest.TestCase):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# patch current master node with the label
|
# patch current master node with the label
|
||||||
print('patching master node: {}'.format(master_node))
|
|
||||||
k8s.api.core_v1.patch_node(master_node, node_label_body)
|
k8s.api.core_v1.patch_node(master_node, node_label_body)
|
||||||
|
|
||||||
# add node affinity to cluster
|
# add node affinity to cluster
|
||||||
|
|
@ -1636,12 +1635,8 @@ class EndToEndTestCase(unittest.TestCase):
|
||||||
try:
|
try:
|
||||||
q = exec_query.format(db_list_query, "postgres")
|
q = exec_query.format(db_list_query, "postgres")
|
||||||
q = "su postgres -c \"{}\"".format(q)
|
q = "su postgres -c \"{}\"".format(q)
|
||||||
print('Get databases: {}'.format(q))
|
|
||||||
result = k8s.exec_with_kubectl(pod_name, q)
|
result = k8s.exec_with_kubectl(pod_name, q)
|
||||||
db_list = clean_list(result.stdout.split(b'\n'))
|
db_list = clean_list(result.stdout.split(b'\n'))
|
||||||
print('db_list: {}, stdout: {}, stderr {}'.format(
|
|
||||||
db_list, result.stdout, result.stderr
|
|
||||||
))
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print('Could not get databases: {}'.format(ex))
|
print('Could not get databases: {}'.format(ex))
|
||||||
print('Stdout: {}'.format(result.stdout))
|
print('Stdout: {}'.format(result.stdout))
|
||||||
|
|
@ -1665,12 +1660,8 @@ class EndToEndTestCase(unittest.TestCase):
|
||||||
try:
|
try:
|
||||||
q = exec_query.format(query, db_name)
|
q = exec_query.format(query, db_name)
|
||||||
q = "su postgres -c \"{}\"".format(q)
|
q = "su postgres -c \"{}\"".format(q)
|
||||||
print('Send query: {}'.format(q))
|
|
||||||
result = k8s.exec_with_kubectl(pod_name, q)
|
result = k8s.exec_with_kubectl(pod_name, q)
|
||||||
result_set = clean_list(result.stdout.split(b'\n'))
|
result_set = clean_list(result.stdout.split(b'\n'))
|
||||||
print('result: {}, stdout: {}, stderr {}'.format(
|
|
||||||
result_set, result.stdout, result.stderr
|
|
||||||
))
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print('Error on query execution: {}'.format(ex))
|
print('Error on query execution: {}'.format(ex))
|
||||||
print('Stdout: {}'.format(result.stdout))
|
print('Stdout: {}'.format(result.stdout))
|
||||||
|
|
|
||||||
|
|
@ -462,11 +462,11 @@ spec:
|
||||||
type: integer
|
type: integer
|
||||||
standby:
|
standby:
|
||||||
type: object
|
type: object
|
||||||
required:
|
|
||||||
- s3_wal_path
|
|
||||||
properties:
|
properties:
|
||||||
s3_wal_path:
|
s3_wal_path:
|
||||||
type: string
|
type: string
|
||||||
|
gs_wal_path:
|
||||||
|
type: string
|
||||||
teamId:
|
teamId:
|
||||||
type: string
|
type: string
|
||||||
tls:
|
tls:
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
)
|
)
|
||||||
|
|
@ -47,11 +46,6 @@ type ExpectedValue struct {
|
||||||
envVarValue string
|
envVarValue string
|
||||||
}
|
}
|
||||||
|
|
||||||
func toIntStr(val int) *intstr.IntOrString {
|
|
||||||
b := intstr.FromInt(val)
|
|
||||||
return &b
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGenerateSpiloJSONConfiguration(t *testing.T) {
|
func TestGenerateSpiloJSONConfiguration(t *testing.T) {
|
||||||
var cluster = New(
|
var cluster = New(
|
||||||
Config{
|
Config{
|
||||||
|
|
@ -311,7 +305,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
|
||||||
Labels: map[string]string{"team": "myapp", "cluster-name": "myapp-database"},
|
Labels: map[string]string{"team": "myapp", "cluster-name": "myapp-database"},
|
||||||
},
|
},
|
||||||
Spec: policyv1beta1.PodDisruptionBudgetSpec{
|
Spec: policyv1beta1.PodDisruptionBudgetSpec{
|
||||||
MinAvailable: toIntStr(1),
|
MinAvailable: util.ToIntStr(1),
|
||||||
Selector: &metav1.LabelSelector{
|
Selector: &metav1.LabelSelector{
|
||||||
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"},
|
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"},
|
||||||
},
|
},
|
||||||
|
|
@ -335,7 +329,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
|
||||||
Labels: map[string]string{"team": "myapp", "cluster-name": "myapp-database"},
|
Labels: map[string]string{"team": "myapp", "cluster-name": "myapp-database"},
|
||||||
},
|
},
|
||||||
Spec: policyv1beta1.PodDisruptionBudgetSpec{
|
Spec: policyv1beta1.PodDisruptionBudgetSpec{
|
||||||
MinAvailable: toIntStr(0),
|
MinAvailable: util.ToIntStr(0),
|
||||||
Selector: &metav1.LabelSelector{
|
Selector: &metav1.LabelSelector{
|
||||||
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"},
|
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"},
|
||||||
},
|
},
|
||||||
|
|
@ -359,7 +353,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
|
||||||
Labels: map[string]string{"team": "myapp", "cluster-name": "myapp-database"},
|
Labels: map[string]string{"team": "myapp", "cluster-name": "myapp-database"},
|
||||||
},
|
},
|
||||||
Spec: policyv1beta1.PodDisruptionBudgetSpec{
|
Spec: policyv1beta1.PodDisruptionBudgetSpec{
|
||||||
MinAvailable: toIntStr(0),
|
MinAvailable: util.ToIntStr(0),
|
||||||
Selector: &metav1.LabelSelector{
|
Selector: &metav1.LabelSelector{
|
||||||
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"},
|
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"},
|
||||||
},
|
},
|
||||||
|
|
@ -383,7 +377,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
|
||||||
Labels: map[string]string{"team": "myapp", "cluster-name": "myapp-database"},
|
Labels: map[string]string{"team": "myapp", "cluster-name": "myapp-database"},
|
||||||
},
|
},
|
||||||
Spec: policyv1beta1.PodDisruptionBudgetSpec{
|
Spec: policyv1beta1.PodDisruptionBudgetSpec{
|
||||||
MinAvailable: toIntStr(1),
|
MinAvailable: util.ToIntStr(1),
|
||||||
Selector: &metav1.LabelSelector{
|
Selector: &metav1.LabelSelector{
|
||||||
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"},
|
MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"},
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -514,11 +514,15 @@ func (c *Cluster) getSwitchoverCandidate(master *v1.Pod) (spec.NamespacedName, e
|
||||||
// pick candidate with lowest lag
|
// pick candidate with lowest lag
|
||||||
// if sync_standby replicas were found assume synchronous_mode is enabled and ignore other candidates list
|
// if sync_standby replicas were found assume synchronous_mode is enabled and ignore other candidates list
|
||||||
if len(syncCandidates) > 0 {
|
if len(syncCandidates) > 0 {
|
||||||
sort.Slice(syncCandidates, func(i, j int) bool { return syncCandidates[i].LagInMb < syncCandidates[j].LagInMb })
|
sort.Slice(syncCandidates, func(i, j int) bool {
|
||||||
|
return util.IntFromIntStr(syncCandidates[i].Lag) < util.IntFromIntStr(syncCandidates[j].Lag)
|
||||||
|
})
|
||||||
return spec.NamespacedName{Namespace: master.Namespace, Name: syncCandidates[0].Name}, nil
|
return spec.NamespacedName{Namespace: master.Namespace, Name: syncCandidates[0].Name}, nil
|
||||||
}
|
}
|
||||||
if len(candidates) > 0 {
|
if len(candidates) > 0 {
|
||||||
sort.Slice(candidates, func(i, j int) bool { return candidates[i].LagInMb < candidates[j].LagInMb })
|
sort.Slice(candidates, func(i, j int) bool {
|
||||||
|
return util.IntFromIntStr(candidates[i].Lag) < util.IntFromIntStr(candidates[j].Lag)
|
||||||
|
})
|
||||||
return spec.NamespacedName{Namespace: master.Namespace, Name: candidates[0].Name}, nil
|
return spec.NamespacedName{Namespace: master.Namespace, Name: candidates[0].Name}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -184,11 +185,11 @@ type ClusterMembers struct {
|
||||||
|
|
||||||
// ClusterMember cluster member data from Patroni API
|
// ClusterMember cluster member data from Patroni API
|
||||||
type ClusterMember struct {
|
type ClusterMember struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Role string `json:"role"`
|
Role string `json:"role"`
|
||||||
State string `json:"state"`
|
State string `json:"state"`
|
||||||
Timeline int `json:"timeline"`
|
Timeline int `json:"timeline"`
|
||||||
LagInMb int `json:"lag"`
|
Lag intstr.IntOrString `json:"lag,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// MemberDataPatroni child element
|
// MemberDataPatroni child element
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ import (
|
||||||
|
|
||||||
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
)
|
)
|
||||||
|
|
||||||
var logger = logrus.New().WithField("test", "patroni")
|
var logger = logrus.New().WithField("test", "patroni")
|
||||||
|
|
@ -95,22 +96,21 @@ func TestGetClusterMembers(t *testing.T) {
|
||||||
Role: "leader",
|
Role: "leader",
|
||||||
State: "running",
|
State: "running",
|
||||||
Timeline: 1,
|
Timeline: 1,
|
||||||
LagInMb: 0,
|
|
||||||
}, {
|
}, {
|
||||||
Name: "acid-test-cluster-1",
|
Name: "acid-test-cluster-1",
|
||||||
Role: "sync_standby",
|
Role: "sync_standby",
|
||||||
State: "running",
|
State: "running",
|
||||||
Timeline: 1,
|
Timeline: 1,
|
||||||
LagInMb: 0,
|
Lag: intstr.IntOrString{IntVal: 0},
|
||||||
}, {
|
}, {
|
||||||
Name: "acid-test-cluster-2",
|
Name: "acid-test-cluster-2",
|
||||||
Role: "replica",
|
Role: "replica",
|
||||||
State: "running",
|
State: "running",
|
||||||
Timeline: 1,
|
Timeline: 1,
|
||||||
LagInMb: 0,
|
Lag: intstr.IntOrString{Type: 1, StrVal: "unknown"},
|
||||||
}}
|
}}
|
||||||
|
|
||||||
json := `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "sync_standby", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 0}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 0}]}`
|
json := `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "sync_standby", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 0}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": "unknown"}]}`
|
||||||
r := ioutil.NopCloser(bytes.NewReader([]byte(json)))
|
r := ioutil.NopCloser(bytes.NewReader([]byte(json)))
|
||||||
|
|
||||||
response := http.Response{
|
response := http.Response{
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"math/big"
|
"math/big"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
@ -19,6 +20,7 @@ import (
|
||||||
"github.com/motomux/pretty"
|
"github.com/motomux/pretty"
|
||||||
resource "k8s.io/apimachinery/pkg/api/resource"
|
resource "k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
|
|
||||||
"github.com/zalando/postgres-operator/pkg/spec"
|
"github.com/zalando/postgres-operator/pkg/spec"
|
||||||
"golang.org/x/crypto/pbkdf2"
|
"golang.org/x/crypto/pbkdf2"
|
||||||
|
|
@ -322,6 +324,20 @@ func testNil(values ...*int32) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Convert int to IntOrString type
|
||||||
|
func ToIntStr(val int) *intstr.IntOrString {
|
||||||
|
b := intstr.FromInt(val)
|
||||||
|
return &b
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get int from IntOrString and return max int if string
|
||||||
|
func IntFromIntStr(intOrStr intstr.IntOrString) int {
|
||||||
|
if intOrStr.Type == 1 {
|
||||||
|
return math.MaxInt
|
||||||
|
}
|
||||||
|
return intOrStr.IntValue()
|
||||||
|
}
|
||||||
|
|
||||||
// MaxInt32 : Return maximum of two integers provided via pointers. If one value
|
// MaxInt32 : Return maximum of two integers provided via pointers. If one value
|
||||||
// is not defined, return the other one. If both are not defined, result is also
|
// is not defined, return the other one. If both are not defined, result is also
|
||||||
// undefined, caller needs to check for that.
|
// undefined, caller needs to check for that.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue