Merge branch 'master' into machine424-stand

This commit is contained in:
Felix Kunde 2022-03-31 16:05:43 +02:00
commit bd96226f1a
17 changed files with 471 additions and 78 deletions

View File

@ -9,7 +9,7 @@ metadata:
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/instance: {{ .Release.Name }}
rules:
# Patroni needs to watch and manage endpoints
# Patroni needs to watch and manage config maps or endpoints
{{- if toString .Values.configGeneral.kubernetes_use_configmaps | eq "true" }}
- apiGroups:
- ""
@ -24,12 +24,6 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
{{- else }}
- apiGroups:
- ""

View File

@ -89,12 +89,6 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
{{- else }}
# to read configuration from ConfigMaps
- apiGroups:

View File

@ -37,7 +37,7 @@ The Postgres Operator can be deployed in the following ways:
* Kustomization
* Helm chart
### Manual deployment setup
### Manual deployment setup on Kubernetes
The Postgres Operator can be installed simply by applying yaml manifests. Note,
we provide the `/manifests` directory as an example only; you should consider
@ -71,6 +71,18 @@ manifest.
./run_operator_locally.sh
```
### Manual deployment setup on OpenShift
To install the Postgres Operator in OpenShift you have to change the config
parameter `kubernetes_use_configmaps` to `"true"`. Otherwise, the operator
and Patroni will store leader and config keys in `Endpoints` that are not
supported in OpenShift. This requires also a slightly different set of rules
for the `postgres-operator` and `postgres-pod` cluster roles.
```bash
oc create -f manifests/operator-service-account-rbac-openshift.yaml
```
### Helm chart
Alternatively, the operator can be installed by using the provided [Helm](https://helm.sh/)

View File

@ -737,10 +737,14 @@ source cluster. If you create it in the same Kubernetes environment, use a
different name.
```yaml
apiVersion: "acid.zalan.do/v1"
kind: postgresql
metadata:
name: acid-minimal-cluster-clone
spec:
clone:
uid: "efd12e58-5786-11e8-b5a7-06148230260c"
cluster: "acid-batman"
cluster: "acid-minimal-cluster"
timestamp: "2017-12-19T12:40:33+01:00"
s3_wal_path: "s3://<bucketname>/spilo/<source_db_cluster>/<UID>/wal/<PGVERSION>"
```
@ -756,7 +760,7 @@ specified `uid`. You can find the UID of the source cluster in its metadata:
apiVersion: acid.zalan.do/v1
kind: postgresql
metadata:
name: acid-batman
name: acid-minimal-cluster
uid: efd12e58-5786-11e8-b5a7-06148230260c
```
@ -767,7 +771,7 @@ implementations:
spec:
clone:
uid: "efd12e58-5786-11e8-b5a7-06148230260c"
cluster: "acid-batman"
cluster: "acid-minimal-cluster"
timestamp: "2017-12-19T12:40:33+01:00"
s3_endpoint: https://s3.acme.org
s3_access_key_id: 0123456789abcdef0123456789abcdef
@ -788,11 +792,49 @@ namespace.
```yaml
spec:
clone:
cluster: "acid-batman"
cluster: "acid-minimal-cluster"
```
Be aware that on a busy source database this can result in an elevated load!
## Restore in place
There is also a possibility to restore a database without cloning it. The
advantage to this is that there is no need to change anything on the
application side. However, as it involves deleting the database first, this
process is of course riskier than cloning (which involves adjusting the
connection parameters of the app).
First, make sure there is no writing activity on your DB, and save the UID.
Then delete the `postgresql` K8S resource:
```bash
zkubectl delete postgresql acid-test-restore
```
Then deploy a new manifest with the same name, referring to itself
(both name and UID) in the `clone` section:
```yaml
metadata:
name: acid-minimal-cluster
# [...]
spec:
# [...]
clone:
cluster: "acid-minimal-cluster" # the same as metadata.name above!
uid: "<original_UID>"
timestamp: "2022-04-01T10:11:12.000+00:00"
```
This will create a new database cluster with the same name but different UID,
whereas the database will be in the state it was at the specified time.
:warning: The backups and WAL files for the original DB are retained under the
original UID, making it possible retry restoring. However, it is probably
better to create a temporary clone for experimenting or finding out to which
point you should restore.
## Setting up a standby cluster
Standby cluster is a [Patroni feature](https://github.com/zalando/patroni/blob/master/docs/replica_bootstrap.rst#standby-cluster)

View File

@ -1759,6 +1759,8 @@ class EndToEndTestCase(unittest.TestCase):
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0, "Manifest not deleted")
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
# check if everything has been deleted
self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_label), 0, "Pods not deleted")
self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Service not deleted")

View File

@ -132,7 +132,7 @@ spec:
# with an empty/absent timestamp, clone from an existing alive cluster using pg_basebackup
# clone:
# uid: "efd12e58-5786-11e8-b5a7-06148230260c"
# cluster: "acid-batman"
# cluster: "acid-minimal-cluster"
# timestamp: "2017-12-19T12:40:33+01:00" # timezone required (offset relative to UTC, see RFC 3339 section 5.6)
# s3_wal_path: "s3://custom/path/to/bucket"

View File

@ -63,13 +63,13 @@ data:
# etcd_host: ""
external_traffic_policy: "Cluster"
# gcp_credentials: ""
# kubernetes_use_configmaps: "false"
# ignored_annotations: ""
# infrastructure_roles_secret_name: "postgresql-infrastructure-roles"
# infrastructure_roles_secrets: "secretname:monitoring-roles,userkey:user,passwordkey:password,rolekey:inrole"
# inherited_annotations: owned-by
# inherited_labels: application,environment
# kube_iam_role: ""
# kubernetes_use_configmaps: "false"
# log_s3_bucket: ""
logical_backup_docker_image: "registry.opensource.zalan.do/acid/logical-backup:v1.7.1"
# logical_backup_google_application_credentials: ""

View File

@ -0,0 +1,283 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: postgres-operator
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: postgres-operator
rules:
# all verbs allowed for custom operator resources
- apiGroups:
- acid.zalan.do
resources:
- postgresqls
- postgresqls/status
- operatorconfigurations
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
# operator only reads PostgresTeams
- apiGroups:
- acid.zalan.do
resources:
- postgresteams
verbs:
- get
- list
- watch
# all verbs allowed for event streams (Zalando-internal feature)
# - apiGroups:
# - zalando.org
# resources:
# - fabriceventstreams
# verbs:
# - create
# - delete
# - deletecollection
# - get
# - list
# - patch
# - update
# - watch
# to create or get/update CRDs when starting up
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs:
- create
- get
- patch
- update
# to read configuration and manage ConfigMaps used by Patroni
- apiGroups:
- ""
resources:
- configmaps
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
# to send events to the CRs
- apiGroups:
- ""
resources:
- events
verbs:
- create
- get
- list
- patch
- update
- watch
# to CRUD secrets for database access
- apiGroups:
- ""
resources:
- secrets
verbs:
- create
- delete
- get
- update
# to check nodes for node readiness label
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
# to read or delete existing PVCs. Creation via StatefulSet
- apiGroups:
- ""
resources:
- persistentvolumeclaims
verbs:
- delete
- get
- list
- patch
- update
# to read existing PVs. Creation should be done via dynamic provisioning
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- list
- update # only for resizing AWS volumes
# to watch Spilo pods and do rolling updates. Creation via StatefulSet
- apiGroups:
- ""
resources:
- pods
verbs:
- delete
- get
- list
- patch
- update
- watch
# to resize the filesystem in Spilo pods when increasing volume size
- apiGroups:
- ""
resources:
- pods/exec
verbs:
- create
# to CRUD services to point to Postgres cluster instances
- apiGroups:
- ""
resources:
- services
verbs:
- create
- delete
- get
- patch
- update
# to CRUD the StatefulSet which controls the Postgres cluster instances
- apiGroups:
- apps
resources:
- statefulsets
- deployments
verbs:
- create
- delete
- get
- list
- patch
# to CRUD cron jobs for logical backups
- apiGroups:
- batch
resources:
- cronjobs
verbs:
- create
- delete
- get
- list
- patch
- update
# to get namespaces operator resources can run in
- apiGroups:
- ""
resources:
- namespaces
verbs:
- get
# to define PDBs. Update happens via delete/create
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- create
- delete
- get
# to create ServiceAccounts in each namespace the operator watches
- apiGroups:
- ""
resources:
- serviceaccounts
verbs:
- get
- create
# to create role bindings to the postgres-pod service account
- apiGroups:
- rbac.authorization.k8s.io
resources:
- rolebindings
verbs:
- get
- create
# to grant privilege to run privileged pods (not needed by default)
#- apiGroups:
# - extensions
# resources:
# - podsecuritypolicies
# resourceNames:
# - privileged
# verbs:
# - use
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: postgres-operator
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: postgres-operator
subjects:
- kind: ServiceAccount
name: postgres-operator
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: postgres-pod
rules:
# Patroni needs to watch and manage config maps
- apiGroups:
- ""
resources:
- configmaps
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
# Patroni needs to watch pods
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- patch
- update
- watch
# to let Patroni create a headless service
- apiGroups:
- ""
resources:
- services
verbs:
- create
# to grant privilege to run privileged pods (not needed by default)
#- apiGroups:
# - extensions
# resources:
# - podsecuritypolicies
# resourceNames:
# - privileged
# verbs:
# - use

View File

@ -43,7 +43,7 @@ var (
alphaNumericRegexp = regexp.MustCompile("^[a-zA-Z][a-zA-Z0-9]*$")
databaseNameRegexp = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$")
userRegexp = regexp.MustCompile(`^[a-z0-9]([-_a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-_a-z0-9]*[a-z0-9])?)*$`)
patroniObjectSuffixes = []string{"config", "failover", "sync", "leader"}
patroniObjectSuffixes = []string{"leader", "config", "sync", "failover"}
)
// Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
@ -91,6 +91,7 @@ type Cluster struct {
currentProcess Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
streamApplications []string
ConnectionPooler map[PostgresRole]*ConnectionPoolerObjects
EBSVolumes map[string]volumes.VolumeProperties
VolumeResizer volumes.VolumeResizer
@ -258,6 +259,8 @@ func (c *Cluster) Create() error {
for _, role := range []PostgresRole{Master, Replica} {
// if kubernetes_use_configmaps is set Patroni will create configmaps
// otherwise it will use endpoints
if !c.patroniKubernetesUseConfigMaps() {
if c.Endpoints[role] != nil {
return fmt.Errorf("%s endpoint already exists in the cluster", role)
@ -1482,22 +1485,26 @@ func (c *Cluster) GetCurrentProcess() Process {
// GetStatus provides status of the cluster
func (c *Cluster) GetStatus() *ClusterStatus {
return &ClusterStatus{
Cluster: c.Spec.ClusterName,
Team: c.Spec.TeamID,
Status: c.Status,
Spec: c.Spec,
status := &ClusterStatus{
Cluster: c.Spec.ClusterName,
Team: c.Spec.TeamID,
Status: c.Status,
Spec: c.Spec,
MasterService: c.GetServiceMaster(),
ReplicaService: c.GetServiceReplica(),
MasterEndpoint: c.GetEndpointMaster(),
ReplicaEndpoint: c.GetEndpointReplica(),
StatefulSet: c.GetStatefulSet(),
PodDisruptionBudget: c.GetPodDisruptionBudget(),
CurrentProcess: c.GetCurrentProcess(),
Error: fmt.Errorf("error: %s", c.Error),
}
if !c.patroniKubernetesUseConfigMaps() {
status.MasterEndpoint = c.GetEndpointMaster()
status.ReplicaEndpoint = c.GetEndpointReplica()
}
return status
}
// Switchover does a switchover (via Patroni) to a candidate pod
@ -1562,7 +1569,7 @@ func (c *Cluster) Unlock() {
c.mu.Unlock()
}
type simpleActionWithResult func() error
type simpleActionWithResult func()
type clusterObjectGet func(name string) (spec.NamespacedName, error)
@ -1576,46 +1583,47 @@ func (c *Cluster) deletePatroniClusterObjects() error {
c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete")
}
if !c.patroniKubernetesUseConfigMaps() {
actionsList = append(actionsList, c.deletePatroniClusterEndpoints)
actionsList = append(actionsList, c.deletePatroniClusterServices)
if c.patroniKubernetesUseConfigMaps() {
actionsList = append(actionsList, c.deletePatroniClusterConfigMaps)
} else {
actionsList = append(actionsList, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps)
actionsList = append(actionsList, c.deletePatroniClusterEndpoints)
}
c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)")
for _, deleter := range actionsList {
if err := deleter(); err != nil {
return err
}
deleter()
}
return nil
}
func (c *Cluster) deleteClusterObject(
func deleteClusterObject(
get clusterObjectGet,
del clusterObjectDelete,
objType string) error {
objType string,
clusterName string,
logger *logrus.Entry) {
for _, suffix := range patroniObjectSuffixes {
name := fmt.Sprintf("%s-%s", c.Name, suffix)
name := fmt.Sprintf("%s-%s", clusterName, suffix)
if namespacedName, err := get(name); err == nil {
c.logger.Debugf("deleting Patroni cluster object %q with name %q",
namespacedName, err := get(name)
if err == nil {
logger.Debugf("deleting %s %q",
objType, namespacedName)
if err = del(name); err != nil {
return fmt.Errorf("could not delete Patroni cluster object %q with name %q: %v",
logger.Warningf("could not delete %s %q: %v",
objType, namespacedName, err)
}
} else if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not fetch Patroni Endpoint %q: %v",
namespacedName, err)
logger.Warningf("could not fetch %s %q: %v",
objType, namespacedName, err)
}
}
return nil
}
func (c *Cluster) deletePatroniClusterServices() error {
func (c *Cluster) deletePatroniClusterServices() {
get := func(name string) (spec.NamespacedName, error) {
svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(svc.ObjectMeta), err
@ -1625,10 +1633,10 @@ func (c *Cluster) deletePatroniClusterServices() error {
return c.KubeClient.Services(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}
return c.deleteClusterObject(get, deleteServiceFn, "service")
deleteClusterObject(get, deleteServiceFn, "service", c.Name, c.logger)
}
func (c *Cluster) deletePatroniClusterEndpoints() error {
func (c *Cluster) deletePatroniClusterEndpoints() {
get := func(name string) (spec.NamespacedName, error) {
ep, err := c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(ep.ObjectMeta), err
@ -1638,10 +1646,10 @@ func (c *Cluster) deletePatroniClusterEndpoints() error {
return c.KubeClient.Endpoints(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}
return c.deleteClusterObject(get, deleteEndpointFn, "endpoint")
deleteClusterObject(get, deleteEndpointFn, "endpoint", c.Name, c.logger)
}
func (c *Cluster) deletePatroniClusterConfigMaps() error {
func (c *Cluster) deletePatroniClusterConfigMaps() {
get := func(name string) (spec.NamespacedName, error) {
cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(cm.ObjectMeta), err
@ -1651,5 +1659,5 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error {
return c.KubeClient.ConfigMaps(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}
return c.deleteClusterObject(get, deleteConfigMapFn, "configmap")
deleteClusterObject(get, deleteConfigMapFn, "configmap", c.Name, c.logger)
}

View File

@ -26,6 +26,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/patroni"
"github.com/zalando/postgres-operator/pkg/util/retryutil"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
@ -111,7 +112,7 @@ func (c *Cluster) servicePort(role PostgresRole) int32 {
return service.Spec.Ports[0].Port
}
c.logger.Warningf("No service for role %s - defaulting to port 5432", role)
c.logger.Warningf("No service for role %s - defaulting to port %d", role, pgPort)
return pgPort
}
@ -558,15 +559,15 @@ func generateContainer(
Resources: *resourceRequirements,
Ports: []v1.ContainerPort{
{
ContainerPort: 8008,
ContainerPort: patroni.ApiPort,
Protocol: v1.ProtocolTCP,
},
{
ContainerPort: 5432,
ContainerPort: pgPort,
Protocol: v1.ProtocolTCP,
},
{
ContainerPort: 8080,
ContainerPort: patroni.ApiPort,
Protocol: v1.ProtocolTCP,
},
},
@ -1063,6 +1064,22 @@ func extractPgVersionFromBinPath(binPath string, template string) (string, error
return fmt.Sprintf("%v", pgVersion), nil
}
func generateSpiloReadinessProbe() *v1.Probe {
return &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Path: "/readiness",
Port: intstr.IntOrString{IntVal: patroni.ApiPort},
},
},
InitialDelaySeconds: 6,
PeriodSeconds: 10,
TimeoutSeconds: 5,
SuccessThreshold: 1,
FailureThreshold: 3,
}
}
func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.StatefulSet, error) {
var (
@ -1239,6 +1256,9 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
generateCapabilities(c.OpConfig.AdditionalPodCapabilities),
)
// Patroni responds 200 to probe only if it either owns the leader lock or postgres is running and DCS is accessible
spiloContainer.ReadinessProbe = generateSpiloReadinessProbe()
// generate container specs for sidecars specified in the cluster manifest
clusterSpecificSidecars := []v1.Container{}
if spec.Sidecars != nil && len(spec.Sidecars) > 0 {
@ -1708,10 +1728,12 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *ac
func (c *Cluster) generateService(role PostgresRole, spec *acidv1.PostgresSpec) *v1.Service {
serviceSpec := v1.ServiceSpec{
Ports: []v1.ServicePort{{Name: "postgresql", Port: 5432, TargetPort: intstr.IntOrString{IntVal: 5432}}},
Ports: []v1.ServicePort{{Name: "postgresql", Port: pgPort, TargetPort: intstr.IntOrString{IntVal: pgPort}}},
Type: v1.ServiceTypeClusterIP,
}
// no selector for master, see https://github.com/zalando/postgres-operator/issues/340
// if kubernetes_use_configmaps is set master service needs a selector
if role == Replica || c.patroniKubernetesUseConfigMaps() {
serviceSpec.Selector = c.roleLabelsSet(false, role)
}
@ -1998,7 +2020,7 @@ func (c *Cluster) generatePodDisruptionBudget() *policybeta1.PodDisruptionBudget
// TODO: handle clusters in different namespaces
func (c *Cluster) getClusterServiceConnectionParameters(clusterName string) (host string, port string) {
host = clusterName
port = "5432"
port = fmt.Sprintf("%d", pgPort)
return
}
@ -2179,7 +2201,7 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar {
},
{
Name: "PGPORT",
Value: "5432",
Value: fmt.Sprintf("%d", pgPort),
},
{
Name: "PGUSER",

View File

@ -2,6 +2,7 @@ package cluster
import (
"fmt"
"strings"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util"
@ -105,14 +106,28 @@ func (c *Cluster) majorVersionUpgrade() error {
c.logger.Infof("triggering major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "Starting major version upgrade on pod %s of %d pods", masterPod.Name, numberOfPods)
upgradeCommand := fmt.Sprintf("/usr/bin/python3 /scripts/inplace_upgrade.py %d 2>&1 | tee last_upgrade.log", numberOfPods)
result, err := c.ExecCommand(podName, "/bin/su", "postgres", "-c", upgradeCommand)
if err != nil {
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "Upgrade from %d to %d FAILED: %v", c.currentMajorVersion, desiredVersion, err)
return err
c.logger.Debugf("checking if the spilo image runs with root or non-root (check for user id=0)")
resultIdCheck, errIdCheck := c.ExecCommand(podName, "/bin/bash", "-c", "/usr/bin/id -u")
if errIdCheck != nil {
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Major Version Upgrade", "Checking user id to run upgrade from %d to %d FAILED: %v", c.currentMajorVersion, desiredVersion, errIdCheck)
}
resultIdCheck = strings.TrimSuffix(resultIdCheck, "\n")
var result string
if resultIdCheck != "0" {
c.logger.Infof("User id was identified as: %s, hence default user is non-root already", resultIdCheck)
result, err = c.ExecCommand(podName, "/bin/bash", "-c", upgradeCommand)
} else {
c.logger.Infof("User id was identified as: %s, using su to reach the postgres user", resultIdCheck)
result, err = c.ExecCommand(podName, "/bin/su", "postgres", "-c", upgradeCommand)
}
if err != nil {
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Major Version Upgrade", "Upgrade from %d to %d FAILED: %v", c.currentMajorVersion, desiredVersion, err)
return err
}
c.logger.Infof("upgrade action triggered and command completed: %s", result[:100])
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Major Version Upgrade", "Upgrade from %d to %d finished", c.currentMajorVersion, desiredVersion)
}
}

View File

@ -35,8 +35,10 @@ func (c *Cluster) listResources() error {
c.logger.Infof("found secret: %q (uid: %q) namesapce: %s", util.NameFromMeta(obj.ObjectMeta), obj.UID, obj.ObjectMeta.Namespace)
}
for role, endpoint := range c.Endpoints {
c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
if !c.patroniKubernetesUseConfigMaps() {
for role, endpoint := range c.Endpoints {
c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
}
}
for role, service := range c.Services {
@ -589,7 +591,7 @@ func (c *Cluster) GetEndpointMaster() *v1.Endpoints {
return c.Endpoints[Master]
}
// GetEndpointReplica returns cluster's kubernetes master Endpoint
// GetEndpointReplica returns cluster's kubernetes replica Endpoint
func (c *Cluster) GetEndpointReplica() *v1.Endpoints {
return c.Endpoints[Replica]
}

View File

@ -45,9 +45,17 @@ func (c *Cluster) deleteStreams() error {
return nil
}
err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("could not delete event stream custom resource: %v", err)
errors := make([]string, 0)
for _, appId := range c.streamApplications {
fesName := fmt.Sprintf("%s-%s", c.Name, appId)
err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), fesName, metav1.DeleteOptions{})
if err != nil {
errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", fesName, err))
}
}
if len(errors) > 0 {
return fmt.Errorf("could not delete all event stream custom resources: %v", strings.Join(errors, `', '`))
}
return nil
@ -265,6 +273,11 @@ func (c *Cluster) syncStreams() error {
return nil
}
// fetch different application IDs from streams section
// there will be a separate event stream resource for each ID
appIds := gatherApplicationIds(c.Spec.Streams)
c.streamApplications = appIds
slots := make(map[string]map[string]string)
publications := make(map[string]map[string]acidv1.StreamTable)
@ -329,9 +342,7 @@ func (c *Cluster) syncStreams() error {
}
func (c *Cluster) createOrUpdateStreams() error {
appIds := gatherApplicationIds(c.Spec.Streams)
for _, appId := range appIds {
for _, appId := range c.streamApplications {
fesName := fmt.Sprintf("%s-%s", c.Name, appId)
effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{})
if err != nil {

View File

@ -196,6 +196,9 @@ func TestGenerateFabricEventStream(t *testing.T) {
_, err := cluster.createStatefulSet()
assert.NoError(t, err)
// createOrUpdateStreams will loop over existing apps
cluster.streamApplications = []string{appId}
// create the streams
err = cluster.createOrUpdateStreams()
assert.NoError(t, err)
@ -327,6 +330,10 @@ func TestUpdateFabricEventStream(t *testing.T) {
_, err := cluster.KubeClient.Postgresqls(namespace).Create(
context.TODO(), &pg, metav1.CreateOptions{})
assert.NoError(t, err)
// createOrUpdateStreams will loop over existing apps
cluster.streamApplications = []string{appId}
err = cluster.createOrUpdateStreams()
assert.NoError(t, err)

View File

@ -544,7 +544,8 @@ func (c *Controller) postgresqlCheck(obj interface{}) *acidv1.Postgresql {
Ensures the pod service account and role bindings exists in a namespace
before a PG cluster is created there so that a user does not have to deploy
these credentials manually. StatefulSets require the service account to
create pods; Patroni requires relevant RBAC bindings to access endpoints.
create pods; Patroni requires relevant RBAC bindings to access endpoints
or config maps.
The operator does not sync accounts/role bindings after creation.
*/

View File

@ -25,7 +25,7 @@ const (
clusterPath = "/cluster"
statusPath = "/patroni"
restartPath = "/restart"
apiPort = 8008
ApiPort = 8008
timeout = 30 * time.Second
)
@ -74,7 +74,7 @@ func apiURL(masterPod *v1.Pod) (string, error) {
return "", fmt.Errorf("%s is not a valid IPv4/IPv6 address", masterPod.Status.PodIP)
}
}
return fmt.Sprintf("http://%s", net.JoinHostPort(ip.String(), strconv.Itoa(apiPort))), nil
return fmt.Sprintf("http://%s", net.JoinHostPort(ip.String(), strconv.Itoa(ApiPort))), nil
}
func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) (err error) {

View File

@ -36,17 +36,17 @@ func TestApiURL(t *testing.T) {
}{
{
"127.0.0.1",
fmt.Sprintf("http://127.0.0.1:%d", apiPort),
fmt.Sprintf("http://127.0.0.1:%d", ApiPort),
nil,
},
{
"0000:0000:0000:0000:0000:0000:0000:0001",
fmt.Sprintf("http://[::1]:%d", apiPort),
fmt.Sprintf("http://[::1]:%d", ApiPort),
nil,
},
{
"::1",
fmt.Sprintf("http://[::1]:%d", apiPort),
fmt.Sprintf("http://[::1]:%d", ApiPort),
nil,
},
{