define readinessProbe on statefulSet when Patroni uses ConfigMaps

This commit is contained in:
Felix Kunde 2022-03-28 11:28:23 +02:00
parent 30f2ba6525
commit 4d25217777
3 changed files with 39 additions and 12 deletions

View File

@ -258,6 +258,8 @@ func (c *Cluster) Create() error {
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []PostgresRole{Master, Replica} {
// if kubernetes_use_configmaps is set Patroni will create configmaps
// otherwise it will use endpoints
if !c.patroniKubernetesUseConfigMaps() { if !c.patroniKubernetesUseConfigMaps() {
if c.Endpoints[role] != nil { if c.Endpoints[role] != nil {
return fmt.Errorf("%s endpoint already exists in the cluster", role) return fmt.Errorf("%s endpoint already exists in the cluster", role)
@ -1576,10 +1578,10 @@ func (c *Cluster) deletePatroniClusterObjects() error {
c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete") c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete")
} }
if !c.patroniKubernetesUseConfigMaps() { if c.patroniKubernetesUseConfigMaps() {
actionsList = append(actionsList, c.deletePatroniClusterEndpoints)
} else {
actionsList = append(actionsList, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps) actionsList = append(actionsList, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps)
} else {
actionsList = append(actionsList, c.deletePatroniClusterEndpoints)
} }
c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)") c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)")

View File

@ -26,6 +26,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/patroni"
"github.com/zalando/postgres-operator/pkg/util/retryutil" "github.com/zalando/postgres-operator/pkg/util/retryutil"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1" batchv1beta1 "k8s.io/api/batch/v1beta1"
@ -111,7 +112,7 @@ func (c *Cluster) servicePort(role PostgresRole) int32 {
return service.Spec.Ports[0].Port return service.Spec.Ports[0].Port
} }
c.logger.Warningf("No service for role %s - defaulting to port 5432", role) c.logger.Warningf("No service for role %s - defaulting to port %d", role, pgPort)
return pgPort return pgPort
} }
@ -558,15 +559,15 @@ func generateContainer(
Resources: *resourceRequirements, Resources: *resourceRequirements,
Ports: []v1.ContainerPort{ Ports: []v1.ContainerPort{
{ {
ContainerPort: 8008, ContainerPort: patroni.ApiPort,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
}, },
{ {
ContainerPort: 5432, ContainerPort: pgPort,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
}, },
{ {
ContainerPort: 8080, ContainerPort: patroni.ApiPort,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
}, },
}, },
@ -1058,6 +1059,22 @@ func extractPgVersionFromBinPath(binPath string, template string) (string, error
return fmt.Sprintf("%v", pgVersion), nil return fmt.Sprintf("%v", pgVersion), nil
} }
func generateSpiloReadinessProbe() *v1.Probe {
return &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Path: "/readiness",
Port: intstr.IntOrString{IntVal: patroni.ApiPort},
},
},
InitialDelaySeconds: 6,
PeriodSeconds: 10,
TimeoutSeconds: 5,
SuccessThreshold: 1,
FailureThreshold: 3,
}
}
func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.StatefulSet, error) { func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.StatefulSet, error) {
var ( var (
@ -1239,6 +1256,12 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
generateCapabilities(c.OpConfig.AdditionalPodCapabilities), generateCapabilities(c.OpConfig.AdditionalPodCapabilities),
) )
// if kubernetes_use_configmaps define a readinessProbe since the master service has a selector
// Patroni responds 200 to probe only if it either owns the leader lock or postgres is running and DCS is accessible
if c.patroniKubernetesUseConfigMaps() {
spiloContainer.ReadinessProbe = generateSpiloReadinessProbe()
}
// generate container specs for sidecars specified in the cluster manifest // generate container specs for sidecars specified in the cluster manifest
clusterSpecificSidecars := []v1.Container{} clusterSpecificSidecars := []v1.Container{}
if spec.Sidecars != nil && len(spec.Sidecars) > 0 { if spec.Sidecars != nil && len(spec.Sidecars) > 0 {
@ -1708,10 +1731,12 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *ac
func (c *Cluster) generateService(role PostgresRole, spec *acidv1.PostgresSpec) *v1.Service { func (c *Cluster) generateService(role PostgresRole, spec *acidv1.PostgresSpec) *v1.Service {
serviceSpec := v1.ServiceSpec{ serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}}, Ports: []v1.ServicePort{{Name: "postgresql", Port: pgPort, TargetPort: intstr.IntOrString{IntVal: pgPort}}},
Type: v1.ServiceTypeClusterIP, Type: v1.ServiceTypeClusterIP,
} }
// no selector for master, see https://github.com/zalando/postgres-operator/issues/340
// if kubernetes_use_configmaps is set master service needs a selector
if role == Replica || c.patroniKubernetesUseConfigMaps() { if role == Replica || c.patroniKubernetesUseConfigMaps() {
serviceSpec.Selector = c.roleLabelsSet(false, role) serviceSpec.Selector = c.roleLabelsSet(false, role)
} }
@ -1989,7 +2014,7 @@ func (c *Cluster) generatePodDisruptionBudget() *policybeta1.PodDisruptionBudget
// TODO: handle clusters in different namespaces // TODO: handle clusters in different namespaces
func (c *Cluster) getClusterServiceConnectionParameters(clusterName string) (host string, port string) { func (c *Cluster) getClusterServiceConnectionParameters(clusterName string) (host string, port string) {
host = clusterName host = clusterName
port = "5432" port = fmt.Sprintf("%d", pgPort)
return return
} }
@ -2170,7 +2195,7 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar {
}, },
{ {
Name: "PGPORT", Name: "PGPORT",
Value: "5432", Value: fmt.Sprintf("%d", pgPort),
}, },
{ {
Name: "PGUSER", Name: "PGUSER",

View File

@ -25,7 +25,7 @@ const (
clusterPath = "/cluster" clusterPath = "/cluster"
statusPath = "/patroni" statusPath = "/patroni"
restartPath = "/restart" restartPath = "/restart"
apiPort = 8008 ApiPort = 8008
timeout = 30 * time.Second timeout = 30 * time.Second
) )
@ -74,7 +74,7 @@ func apiURL(masterPod *v1.Pod) (string, error) {
return "", fmt.Errorf("%s is not a valid IPv4/IPv6 address", masterPod.Status.PodIP) return "", fmt.Errorf("%s is not a valid IPv4/IPv6 address", masterPod.Status.PodIP)
} }
} }
return fmt.Sprintf("http://%s", net.JoinHostPort(ip.String(), strconv.Itoa(apiPort))), nil return fmt.Sprintf("http://%s", net.JoinHostPort(ip.String(), strconv.Itoa(ApiPort))), nil
} }
func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) (err error) { func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) (err error) {