Merge branch 'master' into lazy-update

This commit is contained in:
Sergey Dudoladov 2020-04-27 09:00:12 +02:00
commit f6804fd4f6
30 changed files with 894 additions and 128 deletions

3
.gitignore vendored
View File

@ -7,6 +7,8 @@
_obj _obj
_test _test
_manifests _manifests
_tmp
github.com
# Architecture specific extensions/prefixes # Architecture specific extensions/prefixes
*.[568vq] *.[568vq]
@ -26,6 +28,7 @@ _testmain.go
/vendor/ /vendor/
/build/ /build/
/docker/build/ /docker/build/
/github.com/
.idea .idea
scm-source.json scm-source.json

View File

@ -74,6 +74,28 @@ spec:
- teamId - teamId
- postgresql - postgresql
properties: properties:
additionalVolumes:
type: array
items:
type: object
required:
- name
- mountPath
- volumeSource
properties:
name:
type: string
mountPath:
type: string
targetContainers:
type: array
nullable: true
items:
type: string
volumeSource:
type: object
subPath:
type: string
allowedSourceRanges: allowedSourceRanges:
type: array type: array
nullable: true nullable: true
@ -342,6 +364,21 @@ spec:
type: string type: string
teamId: teamId:
type: string type: string
tls:
type: object
required:
- secretName
properties:
secretName:
type: string
certificateFile:
type: string
privateKeyFile:
type: string
caFile:
type: string
caSecretName:
type: string
tolerations: tolerations:
type: array type: array
items: items:

View File

@ -42,6 +42,13 @@ rules:
- configmaps - configmaps
verbs: verbs:
- get - get
# to send events to the CRs
- apiGroups:
- ""
resources:
- events
verbs:
- create
# to manage endpoints which are also used by Patroni # to manage endpoints which are also used by Patroni
- apiGroups: - apiGroups:
- "" - ""

View File

@ -279,11 +279,11 @@ configConnectionPooler:
# docker image # docker image
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer" connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer"
# max db connections the pooler should hold # max db connections the pooler should hold
connection_pooler_max_db_connections: 60 connection_pooler_max_db_connections: "60"
# default pooling mode # default pooling mode
connection_pooler_mode: "transaction" connection_pooler_mode: "transaction"
# number of pooler instances # number of pooler instances
connection_pooler_number_of_instances: 2 connection_pooler_number_of_instances: "2"
# default resources # default resources
connection_pooler_default_cpu_request: 500m connection_pooler_default_cpu_request: 500m
connection_pooler_default_memory_request: 100Mi connection_pooler_default_memory_request: 100Mi
@ -296,6 +296,7 @@ rbac:
crd: crd:
# Specifies whether custom resource definitions should be created # Specifies whether custom resource definitions should be created
# When using helm3, this is ignored; instead use "--skip-crds" to skip.
create: true create: true
serviceAccount: serviceAccount:

View File

@ -256,11 +256,11 @@ configConnectionPooler:
# docker image # docker image
connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer" connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer"
# max db connections the pooler should hold # max db connections the pooler should hold
connection_pooler_max_db_connections: 60 connection_pooler_max_db_connections: "60"
# default pooling mode # default pooling mode
connection_pooler_mode: "transaction" connection_pooler_mode: "transaction"
# number of pooler instances # number of pooler instances
connection_pooler_number_of_instances: 2 connection_pooler_number_of_instances: "2"
# default resources # default resources
connection_pooler_default_cpu_request: 500m connection_pooler_default_cpu_request: 500m
connection_pooler_default_memory_request: 100Mi connection_pooler_default_memory_request: 100Mi
@ -273,6 +273,7 @@ rbac:
crd: crd:
# Specifies whether custom resource definitions should be created # Specifies whether custom resource definitions should be created
# When using helm3, this is ignored; instead use "--skip-crds" to skip.
create: true create: true
serviceAccount: serviceAccount:

View File

@ -154,6 +154,18 @@ These parameters are grouped directly under the `spec` key in the manifest.
[the reference schedule format](https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/#schedule) [the reference schedule format](https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/#schedule)
into account. Optional. Default is: "30 00 \* \* \*" into account. Optional. Default is: "30 00 \* \* \*"
* **additionalVolumes**
List of additional volumes to mount in each container of the statefulset pod.
Each item must contain a `name`, `mountPath`, and `volumeSource` which is a
[kubernetes volumeSource](https://godoc.org/k8s.io/api/core/v1#VolumeSource).
It allows you to mount existing PersistentVolumeClaims, ConfigMaps and Secrets inside the StatefulSet.
Also an `emptyDir` volume can be shared between initContainer and statefulSet.
Additionaly, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example).
You can also specify in which container the additional Volumes will be mounted with the `targetContainers` array option.
If `targetContainers` is empty, additional volumes will be mounted only in the `postgres` container.
If you set the `all` special item, it will be mounted in all containers (postgres + sidecars).
Else you can set the list of target containers in which the additional volumes will be mounted (eg : postgres, telegraf)
## Postgres parameters ## Postgres parameters
Those parameters are grouped under the `postgresql` top-level key, which is Those parameters are grouped under the `postgresql` top-level key, which is
@ -419,5 +431,16 @@ Those parameters are grouped under the `tls` top-level key.
Filename of the private key. Defaults to "tls.key". Filename of the private key. Defaults to "tls.key".
* **caFile** * **caFile**
Optional filename to the CA certificate. Useful when the client connects Optional filename to the CA certificate (e.g. "ca.crt"). Useful when the
with `sslmode=verify-ca` or `sslmode=verify-full`. Default is empty. client connects with `sslmode=verify-ca` or `sslmode=verify-full`.
Default is empty.
* **caSecretName**
By setting the `caSecretName` value, the ca certificate file defined by the
`caFile` will be fetched from this secret instead of `secretName` above.
This secret has to hold a file with that name in its root.
Optionally one can provide full path for any of them. By default it is
relative to the "/tls/", which is mount path of the tls secret.
If `caSecretName` is defined, the ca.crt path is relative to "/tlsca/",
otherwise to the same "/tls/".

View File

@ -584,7 +584,8 @@ don't know the value, use `103` which is the GID from the default spilo image
OpenShift allocates the users and groups dynamically (based on scc), and their OpenShift allocates the users and groups dynamically (based on scc), and their
range is different in every namespace. Due to this dynamic behaviour, it's not range is different in every namespace. Due to this dynamic behaviour, it's not
trivial to know at deploy time the uid/gid of the user in the cluster. trivial to know at deploy time the uid/gid of the user in the cluster.
This way, in OpenShift, you may want to skip the spilo_fsgroup setting. Therefore, instead of using a global `spilo_fsgroup` setting, use the `spiloFSGroup` field
per Postgres cluster.
Upload the cert as a kubernetes secret: Upload the cert as a kubernetes secret:
```sh ```sh
@ -593,7 +594,7 @@ kubectl create secret tls pg-tls \
--cert pg-tls.crt --cert pg-tls.crt
``` ```
Or with a CA: When doing client auth, CA can come optionally from the same secret:
```sh ```sh
kubectl create secret generic pg-tls \ kubectl create secret generic pg-tls \
--from-file=tls.crt=server.crt \ --from-file=tls.crt=server.crt \
@ -601,9 +602,6 @@ kubectl create secret generic pg-tls \
--from-file=ca.crt=ca.crt --from-file=ca.crt=ca.crt
``` ```
Alternatively it is also possible to use
[cert-manager](https://cert-manager.io/docs/) to generate these secrets.
Then configure the postgres resource with the TLS secret: Then configure the postgres resource with the TLS secret:
```yaml ```yaml
@ -618,5 +616,29 @@ spec:
caFile: "ca.crt" # add this if the secret is configured with a CA caFile: "ca.crt" # add this if the secret is configured with a CA
``` ```
Optionally, the CA can be provided by a different secret:
```sh
kubectl create secret generic pg-tls-ca \
--from-file=ca.crt=ca.crt
```
Then configure the postgres resource with the TLS secret:
```yaml
apiVersion: "acid.zalan.do/v1"
kind: postgresql
metadata:
name: acid-test-cluster
spec:
tls:
secretName: "pg-tls" # this should hold tls.key and tls.crt
caSecretName: "pg-tls-ca" # this should hold ca.crt
caFile: "ca.crt" # add this if the secret is configured with a CA
```
Alternatively, it is also possible to use
[cert-manager](https://cert-manager.io/docs/) to generate these secrets.
Certificate rotation is handled in the spilo image which checks every 5 Certificate rotation is handled in the spilo image which checks every 5
minutes if the certificates have changed and reloads postgres accordingly. minutes if the certificates have changed and reloads postgres accordingly.

View File

@ -410,7 +410,6 @@ class EndToEndTestCase(unittest.TestCase):
''' '''
k8s = self.k8s k8s = self.k8s
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
labels = 'spilo-role=master,' + cluster_label
readiness_label = 'lifecycle-status' readiness_label = 'lifecycle-status'
readiness_value = 'ready' readiness_value = 'ready'
@ -776,14 +775,16 @@ class K8s:
def wait_for_logical_backup_job_creation(self): def wait_for_logical_backup_job_creation(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=1) self.wait_for_logical_backup_job(expected_num_of_jobs=1)
def update_config(self, config_map_patch): def delete_operator_pod(self):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
operator_pod = self.api.core_v1.list_namespaced_pod( operator_pod = self.api.core_v1.list_namespaced_pod(
'default', label_selector="name=postgres-operator").items[0].metadata.name 'default', label_selector="name=postgres-operator").items[0].metadata.name
self.api.core_v1.delete_namespaced_pod(operator_pod, "default") # restart reloads the conf self.api.core_v1.delete_namespaced_pod(operator_pod, "default") # restart reloads the conf
self.wait_for_operator_pod_start() self.wait_for_operator_pod_start()
def update_config(self, config_map_patch):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
self.delete_operator_pod()
def create_with_kubectl(self, path): def create_with_kubectl(self, path):
return subprocess.run( return subprocess.run(
["kubectl", "create", "-f", path], ["kubectl", "create", "-f", path],

View File

@ -9,9 +9,6 @@ metadata:
spec: spec:
dockerImage: registry.opensource.zalan.do/acid/spilo-12:1.6-p2 dockerImage: registry.opensource.zalan.do/acid/spilo-12:1.6-p2
teamId: "acid" teamId: "acid"
volume:
size: 1Gi
# storageClass: my-sc
numberOfInstances: 2 numberOfInstances: 2
users: # Application/Robot users users: # Application/Robot users
zalando: zalando:
@ -30,6 +27,32 @@ spec:
shared_buffers: "32MB" shared_buffers: "32MB"
max_connections: "10" max_connections: "10"
log_statement: "all" log_statement: "all"
volume:
size: 1Gi
# storageClass: my-sc
additionalVolumes:
- name: data
mountPath: /home/postgres/pgdata/partitions
targetContainers:
- postgres
volumeSource:
PersistentVolumeClaim:
claimName: pvc-postgresql-data-partitions
readyOnly: false
- name: conf
mountPath: /etc/telegraf
subPath: telegraf.conf
targetContainers:
- telegraf-sidecar
volumeSource:
configMap:
name: my-config-map
- name: empty
mountPath: /opt/empty
targetContainers:
- all
volumeSource:
emptyDir: {}
enableShmVolume: true enableShmVolume: true
# spiloFSGroup: 103 # spiloFSGroup: 103
@ -125,5 +148,10 @@ spec:
certificateFile: "tls.crt" certificateFile: "tls.crt"
privateKeyFile: "tls.key" privateKeyFile: "tls.key"
caFile: "" # optionally configure Postgres with a CA certificate caFile: "" # optionally configure Postgres with a CA certificate
caSecretName: "" # optionally the ca.crt can come from this secret instead.
# file names can be also defined with absolute path, and will no longer be relative
# to the "/tls/" path where the secret is being mounted by default, and "/tlsca/"
# where the caSecret is mounted by default.
# When TLS is enabled, also set spiloFSGroup parameter above to the relevant value. # When TLS is enabled, also set spiloFSGroup parameter above to the relevant value.
# if unknown, set it to 103 which is the usual value in the default spilo images. # if unknown, set it to 103 which is the usual value in the default spilo images.
# In Openshift, there is no need to set spiloFSGroup/spilo_fsgroup.

View File

@ -43,6 +43,13 @@ rules:
- configmaps - configmaps
verbs: verbs:
- get - get
# to send events to the CRs
- apiGroups:
- ""
resources:
- events
verbs:
- create
# to manage endpoints which are also used by Patroni # to manage endpoints which are also used by Patroni
- apiGroups: - apiGroups:
- "" - ""

View File

@ -38,6 +38,28 @@ spec:
- teamId - teamId
- postgresql - postgresql
properties: properties:
additionalVolumes:
type: array
items:
type: object
required:
- name
- mountPath
- volumeSource
properties:
name:
type: string
mountPath:
type: string
targetContainers:
type: array
nullable: true
items:
type: string
volumeSource:
type: object
subPath:
type: string
allowedSourceRanges: allowedSourceRanges:
type: array type: array
nullable: true nullable: true
@ -319,6 +341,8 @@ spec:
type: string type: string
caFile: caFile:
type: string type: string
caSecretName:
type: string
tolerations: tolerations:
type: array type: array
items: items:

View File

@ -513,6 +513,9 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
"caFile": { "caFile": {
Type: "string", Type: "string",
}, },
"caSecretName": {
Type: "string",
},
}, },
}, },
"tolerations": { "tolerations": {
@ -682,6 +685,37 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{
}, },
}, },
}, },
"additionalVolumes": {
Type: "array",
Items: &apiextv1beta1.JSONSchemaPropsOrArray{
Schema: &apiextv1beta1.JSONSchemaProps{
Type: "object",
Required: []string{"name", "mountPath", "volumeSource"},
Properties: map[string]apiextv1beta1.JSONSchemaProps{
"name": {
Type: "string",
},
"mountPath": {
Type: "string",
},
"targetContainers": {
Type: "array",
Items: &apiextv1beta1.JSONSchemaPropsOrArray{
Schema: &apiextv1beta1.JSONSchemaProps{
Type: "string",
},
},
},
"volumeSource": {
Type: "object",
},
"subPath": {
Type: "string",
},
},
},
},
},
}, },
}, },
"status": { "status": {

View File

@ -67,6 +67,7 @@ type PostgresSpec struct {
PodAnnotations map[string]string `json:"podAnnotations"` PodAnnotations map[string]string `json:"podAnnotations"`
ServiceAnnotations map[string]string `json:"serviceAnnotations"` ServiceAnnotations map[string]string `json:"serviceAnnotations"`
TLS *TLSDescription `json:"tls"` TLS *TLSDescription `json:"tls"`
AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"`
// deprecated json tags // deprecated json tags
InitContainersOld []v1.Container `json:"init_containers,omitempty"` InitContainersOld []v1.Container `json:"init_containers,omitempty"`
@ -98,6 +99,14 @@ type Volume struct {
SubPath string `json:"subPath,omitempty"` SubPath string `json:"subPath,omitempty"`
} }
type AdditionalVolume struct {
Name string `json:"name"`
MountPath string `json:"mountPath"`
SubPath string `json:"subPath"`
TargetContainers []string `json:"targetContainers"`
VolumeSource v1.VolumeSource `json:"volume"`
}
// PostgresqlParam describes PostgreSQL version and pairs of configuration parameter name - values. // PostgresqlParam describes PostgreSQL version and pairs of configuration parameter name - values.
type PostgresqlParam struct { type PostgresqlParam struct {
PgVersion string `json:"version"` PgVersion string `json:"version"`
@ -139,6 +148,7 @@ type TLSDescription struct {
CertificateFile string `json:"certificateFile,omitempty"` CertificateFile string `json:"certificateFile,omitempty"`
PrivateKeyFile string `json:"privateKeyFile,omitempty"` PrivateKeyFile string `json:"privateKeyFile,omitempty"`
CAFile string `json:"caFile,omitempty"` CAFile string `json:"caFile,omitempty"`
CASecretName string `json:"caSecretName,omitempty"`
} }
// CloneDescription describes which cluster the new should clone and up to which point in time // CloneDescription describes which cluster the new should clone and up to which point in time

View File

@ -47,6 +47,28 @@ func (in *AWSGCPConfiguration) DeepCopy() *AWSGCPConfiguration {
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AdditionalVolume) DeepCopyInto(out *AdditionalVolume) {
*out = *in
if in.TargetContainers != nil {
in, out := &in.TargetContainers, &out.TargetContainers
*out = make([]string, len(*in))
copy(*out, *in)
}
in.VolumeSource.DeepCopyInto(&out.VolumeSource)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdditionalVolume.
func (in *AdditionalVolume) DeepCopy() *AdditionalVolume {
if in == nil {
return nil
}
out := new(AdditionalVolume)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CloneDescription) DeepCopyInto(out *CloneDescription) { func (in *CloneDescription) DeepCopyInto(out *CloneDescription) {
*out = *in *out = *in
@ -591,6 +613,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) {
*out = new(TLSDescription) *out = new(TLSDescription)
**out = **in **out = **in
} }
if in.AdditionalVolumes != nil {
in, out := &in.AdditionalVolumes, &out.AdditionalVolumes
*out = make([]AdditionalVolume, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.InitContainersOld != nil { if in.InitContainersOld != nil {
in, out := &in.InitContainersOld, &out.InitContainersOld in, out := &in.InitContainersOld, &out.InitContainersOld
*out = make([]corev1.Container, len(*in)) *out = make([]corev1.Container, len(*in))

View File

@ -21,8 +21,11 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
"github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/config"
@ -81,6 +84,7 @@ type Cluster struct {
acidv1.Postgresql acidv1.Postgresql
Config Config
logger *logrus.Entry logger *logrus.Entry
eventRecorder record.EventRecorder
patroni patroni.Interface patroni patroni.Interface
pgUsers map[string]spec.PgUser pgUsers map[string]spec.PgUser
systemUsers map[string]spec.PgUser systemUsers map[string]spec.PgUser
@ -109,7 +113,7 @@ type compareStatefulsetResult struct {
} }
// New creates a new cluster. This function should be called from a controller. // New creates a new cluster. This function should be called from a controller.
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry) *Cluster { func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster {
deletePropagationPolicy := metav1.DeletePropagationOrphan deletePropagationPolicy := metav1.DeletePropagationOrphan
podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) {
@ -140,7 +144,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName) cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName)
cluster.patroni = patroni.New(cluster.logger) cluster.patroni = patroni.New(cluster.logger)
cluster.eventRecorder = eventRecorder
return cluster return cluster
} }
@ -166,6 +170,16 @@ func (c *Cluster) setProcessName(procName string, args ...interface{}) {
} }
} }
// GetReference of Postgres CR object
// i.e. required to emit events to this resource
func (c *Cluster) GetReference() *v1.ObjectReference {
ref, err := reference.GetReference(scheme.Scheme, &c.Postgresql)
if err != nil {
c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", c.Postgresql.Namespace, c.Postgresql.Name, err)
}
return ref
}
// SetStatus of Postgres cluster // SetStatus of Postgres cluster
// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above // TODO: eventually switch to updateStatus() for kubernetes 1.11 and above
func (c *Cluster) setStatus(status string) { func (c *Cluster) setStatus(status string) {
@ -245,6 +259,7 @@ func (c *Cluster) Create() error {
}() }()
c.setStatus(acidv1.ClusterStatusCreating) c.setStatus(acidv1.ClusterStatusCreating)
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
if err = c.enforceMinResourceLimits(&c.Spec); err != nil { if err = c.enforceMinResourceLimits(&c.Spec); err != nil {
return fmt.Errorf("could not enforce minimum resource limits: %v", err) return fmt.Errorf("could not enforce minimum resource limits: %v", err)
@ -263,6 +278,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create %s endpoint: %v", role, err) return fmt.Errorf("could not create %s endpoint: %v", role, err)
} }
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Endpoints", "Endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
} }
if c.Services[role] != nil { if c.Services[role] != nil {
@ -273,6 +289,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create %s service: %v", role, err) return fmt.Errorf("could not create %s service: %v", role, err)
} }
c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta))
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Services", "The service %q for role %s has been successfully created", util.NameFromMeta(service.ObjectMeta), role)
} }
if err = c.initUsers(); err != nil { if err = c.initUsers(); err != nil {
@ -284,6 +301,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create secrets: %v", err) return fmt.Errorf("could not create secrets: %v", err)
} }
c.logger.Infof("secrets have been successfully created") c.logger.Infof("secrets have been successfully created")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created")
if c.PodDisruptionBudget != nil { if c.PodDisruptionBudget != nil {
return fmt.Errorf("pod disruption budget already exists in the cluster") return fmt.Errorf("pod disruption budget already exists in the cluster")
@ -302,6 +320,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create statefulset: %v", err) return fmt.Errorf("could not create statefulset: %v", err)
} }
c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
c.logger.Info("waiting for the cluster being ready") c.logger.Info("waiting for the cluster being ready")
@ -310,6 +329,7 @@ func (c *Cluster) Create() error {
return err return err
} }
c.logger.Infof("pods are ready") c.logger.Infof("pods are ready")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")
// create database objects unless we are running without pods or disabled // create database objects unless we are running without pods or disabled
// that feature explicitly // that feature explicitly
@ -566,6 +586,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error {
} }
if isSmaller { if isSmaller {
c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit) c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit)
spec.Resources.ResourceLimits.CPU = minCPULimit spec.Resources.ResourceLimits.CPU = minCPULimit
} }
} }
@ -578,6 +599,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error {
} }
if isSmaller { if isSmaller {
c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit) c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit)
spec.Resources.ResourceLimits.Memory = minMemoryLimit spec.Resources.ResourceLimits.Memory = minMemoryLimit
} }
} }
@ -609,6 +631,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if oldSpec.Spec.PostgresqlParam.PgVersion != newSpec.Spec.PostgresqlParam.PgVersion { // PG versions comparison if oldSpec.Spec.PostgresqlParam.PgVersion != newSpec.Spec.PostgresqlParam.PgVersion { // PG versions comparison
c.logger.Warningf("postgresql version change(%q -> %q) has no effect", c.logger.Warningf("postgresql version change(%q -> %q) has no effect",
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect",
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
//we need that hack to generate statefulset with the old version //we need that hack to generate statefulset with the old version
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
} }
@ -752,7 +776,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
} }
// sync connection pooler // sync connection pooler
if err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil { if _, err := c.syncConnectionPooler(oldSpec, newSpec,
c.installLookupFunction); err != nil {
return fmt.Errorf("could not sync connection pooler: %v", err) return fmt.Errorf("could not sync connection pooler: %v", err)
} }
@ -767,6 +792,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
func (c *Cluster) Delete() { func (c *Cluster) Delete() {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources")
// delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods // delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods
// deleting the cron job also removes pods and batch jobs it created // deleting the cron job also removes pods and batch jobs it created
@ -794,8 +820,10 @@ func (c *Cluster) Delete() {
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []PostgresRole{Master, Replica} {
if err := c.deleteEndpoint(role); err != nil { if !c.patroniKubernetesUseConfigMaps() {
c.logger.Warningf("could not delete %s endpoint: %v", role, err) if err := c.deleteEndpoint(role); err != nil {
c.logger.Warningf("could not delete %s endpoint: %v", role, err)
}
} }
if err := c.deleteService(role); err != nil { if err := c.deleteService(role); err != nil {
@ -888,9 +916,20 @@ func (c *Cluster) initSystemUsers() {
c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{} c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{}
} }
username := util.Coalesce( // Using superuser as pooler user is not a good idea. First of all it's
c.Spec.ConnectionPooler.User, // not going to be synced correctly with the current implementation,
c.OpConfig.ConnectionPooler.User) // and second it's a bad practice.
username := c.OpConfig.ConnectionPooler.User
isSuperUser := c.Spec.ConnectionPooler.User == c.OpConfig.SuperUsername
isProtectedUser := c.shouldAvoidProtectedOrSystemRole(
c.Spec.ConnectionPooler.User, "connection pool role")
if !isSuperUser && !isProtectedUser {
username = util.Coalesce(
c.Spec.ConnectionPooler.User,
c.OpConfig.ConnectionPooler.User)
}
// connection pooler application should be able to login with this role // connection pooler application should be able to login with this role
connectionPoolerUser := spec.PgUser{ connectionPoolerUser := spec.PgUser{
@ -1092,6 +1131,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
var err error var err error
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -1118,6 +1158,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil {
c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
if err = <-podLabelErr; err != nil { if err = <-podLabelErr; err != nil {
err = fmt.Errorf("could not get master pod label: %v", err) err = fmt.Errorf("could not get master pod label: %v", err)
} }
@ -1133,6 +1174,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
// close the label waiting channel no sooner than the waiting goroutine terminates. // close the label waiting channel no sooner than the waiting goroutine terminates.
close(podLabelErr) close(podLabelErr)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err)
return err return err
} }
@ -1160,11 +1202,19 @@ type clusterObjectDelete func(name string) error
func (c *Cluster) deletePatroniClusterObjects() error { func (c *Cluster) deletePatroniClusterObjects() error {
// TODO: figure out how to remove leftover patroni objects in other cases // TODO: figure out how to remove leftover patroni objects in other cases
var actionsList []simpleActionWithResult
if !c.patroniUsesKubernetes() { if !c.patroniUsesKubernetes() {
c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete") c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete")
} }
c.logger.Debugf("removing leftover Patroni objects (endpoints, services and configmaps)")
for _, deleter := range []simpleActionWithResult{c.deletePatroniClusterEndpoints, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps} { if !c.patroniKubernetesUseConfigMaps() {
actionsList = append(actionsList, c.deletePatroniClusterEndpoints)
}
actionsList = append(actionsList, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps)
c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)")
for _, deleter := range actionsList {
if err := deleter(); err != nil { if err := deleter(); err != nil {
return err return err
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/teams" "github.com/zalando/postgres-operator/pkg/util/teams"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
) )
const ( const (
@ -21,6 +22,8 @@ const (
) )
var logger = logrus.New().WithField("test", "cluster") var logger = logrus.New().WithField("test", "cluster")
var eventRecorder = record.NewFakeRecorder(1)
var cl = New( var cl = New(
Config{ Config{
OpConfig: config.Config{ OpConfig: config.Config{
@ -34,6 +37,7 @@ var cl = New(
k8sutil.NewMockKubernetesClient(), k8sutil.NewMockKubernetesClient(),
acidv1.Postgresql{}, acidv1.Postgresql{},
logger, logger,
eventRecorder,
) )
func TestInitRobotUsers(t *testing.T) { func TestInitRobotUsers(t *testing.T) {
@ -721,4 +725,38 @@ func TestInitSystemUsers(t *testing.T) {
if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; !exist { if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; !exist {
t.Errorf("%s, connection pooler user is not present", testName) t.Errorf("%s, connection pooler user is not present", testName)
} }
// superuser is not allowed as connection pool user
cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{
User: "postgres",
}
cl.OpConfig.SuperUsername = "postgres"
cl.OpConfig.ConnectionPooler.User = "pooler"
cl.initSystemUsers()
if _, exist := cl.pgUsers["pooler"]; !exist {
t.Errorf("%s, Superuser is not allowed to be a connection pool user", testName)
}
// neither protected users are
delete(cl.pgUsers, "pooler")
cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{
User: "admin",
}
cl.OpConfig.ProtectedRoles = []string{"admin"}
cl.initSystemUsers()
if _, exist := cl.pgUsers["pooler"]; !exist {
t.Errorf("%s, Protected user are not allowed to be a connection pool user", testName)
}
delete(cl.pgUsers, "pooler")
cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{
User: "standby",
}
cl.initSystemUsers()
if _, exist := cl.pgUsers["pooler"]; !exist {
t.Errorf("%s, System users are not allowed to be a connection pool user", testName)
}
} }

View File

@ -500,7 +500,7 @@ func mountShmVolumeNeeded(opConfig config.Config, spec *acidv1.PostgresSpec) *bo
return opConfig.ShmVolume return opConfig.ShmVolume
} }
func generatePodTemplate( func (c *Cluster) generatePodTemplate(
namespace string, namespace string,
labels labels.Set, labels labels.Set,
annotations map[string]string, annotations map[string]string,
@ -519,7 +519,7 @@ func generatePodTemplate(
podAntiAffinityTopologyKey string, podAntiAffinityTopologyKey string,
additionalSecretMount string, additionalSecretMount string,
additionalSecretMountPath string, additionalSecretMountPath string,
volumes []v1.Volume, additionalVolumes []acidv1.AdditionalVolume,
) (*v1.PodTemplateSpec, error) { ) (*v1.PodTemplateSpec, error) {
terminateGracePeriodSeconds := terminateGracePeriod terminateGracePeriodSeconds := terminateGracePeriod
@ -538,7 +538,6 @@ func generatePodTemplate(
InitContainers: initContainers, InitContainers: initContainers,
Tolerations: *tolerationsSpec, Tolerations: *tolerationsSpec,
SecurityContext: &securityContext, SecurityContext: &securityContext,
Volumes: volumes,
} }
if shmVolume != nil && *shmVolume { if shmVolume != nil && *shmVolume {
@ -559,6 +558,10 @@ func generatePodTemplate(
addSecretVolume(&podSpec, additionalSecretMount, additionalSecretMountPath) addSecretVolume(&podSpec, additionalSecretMount, additionalSecretMountPath)
} }
if additionalVolumes != nil {
c.addAdditionalVolumes(&podSpec, additionalVolumes)
}
template := v1.PodTemplateSpec{ template := v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: labels, Labels: labels,
@ -849,7 +852,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
sidecarContainers []v1.Container sidecarContainers []v1.Container
podTemplate *v1.PodTemplateSpec podTemplate *v1.PodTemplateSpec
volumeClaimTemplate *v1.PersistentVolumeClaim volumeClaimTemplate *v1.PersistentVolumeClaim
volumes []v1.Volume additionalVolumes = spec.AdditionalVolumes
) )
// Improve me. Please. // Improve me. Please.
@ -1002,8 +1005,10 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
// this is combined with the FSGroup in the section above // this is combined with the FSGroup in the section above
// to give read access to the postgres user // to give read access to the postgres user
defaultMode := int32(0640) defaultMode := int32(0640)
volumes = append(volumes, v1.Volume{ mountPath := "/tls"
Name: "tls-secret", additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{
Name: spec.TLS.SecretName,
MountPath: mountPath,
VolumeSource: v1.VolumeSource{ VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{ Secret: &v1.SecretVolumeSource{
SecretName: spec.TLS.SecretName, SecretName: spec.TLS.SecretName,
@ -1012,13 +1017,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
}, },
}) })
mountPath := "/tls"
volumeMounts = append(volumeMounts, v1.VolumeMount{
MountPath: mountPath,
Name: "tls-secret",
ReadOnly: true,
})
// use the same filenames as Secret resources by default // use the same filenames as Secret resources by default
certFile := ensurePath(spec.TLS.CertificateFile, mountPath, "tls.crt") certFile := ensurePath(spec.TLS.CertificateFile, mountPath, "tls.crt")
privateKeyFile := ensurePath(spec.TLS.PrivateKeyFile, mountPath, "tls.key") privateKeyFile := ensurePath(spec.TLS.PrivateKeyFile, mountPath, "tls.key")
@ -1029,11 +1027,31 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
) )
if spec.TLS.CAFile != "" { if spec.TLS.CAFile != "" {
caFile := ensurePath(spec.TLS.CAFile, mountPath, "") // support scenario when the ca.crt resides in a different secret, diff path
mountPathCA := mountPath
if spec.TLS.CASecretName != "" {
mountPathCA = mountPath + "ca"
}
caFile := ensurePath(spec.TLS.CAFile, mountPathCA, "")
spiloEnvVars = append( spiloEnvVars = append(
spiloEnvVars, spiloEnvVars,
v1.EnvVar{Name: "SSL_CA_FILE", Value: caFile}, v1.EnvVar{Name: "SSL_CA_FILE", Value: caFile},
) )
// the ca file from CASecretName secret takes priority
if spec.TLS.CASecretName != "" {
additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{
Name: spec.TLS.CASecretName,
MountPath: mountPathCA,
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: spec.TLS.CASecretName,
DefaultMode: &defaultMode,
},
},
})
}
} }
} }
@ -1084,7 +1102,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
annotations := c.generatePodAnnotations(spec) annotations := c.generatePodAnnotations(spec)
// generate pod template for the statefulset, based on the spilo container and sidecars // generate pod template for the statefulset, based on the spilo container and sidecars
podTemplate, err = generatePodTemplate( podTemplate, err = c.generatePodTemplate(
c.Namespace, c.Namespace,
c.labelsSet(true), c.labelsSet(true),
annotations, annotations,
@ -1103,8 +1121,8 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
c.OpConfig.PodAntiAffinityTopologyKey, c.OpConfig.PodAntiAffinityTopologyKey,
c.OpConfig.AdditionalSecretMount, c.OpConfig.AdditionalSecretMount,
c.OpConfig.AdditionalSecretMountPath, c.OpConfig.AdditionalSecretMountPath,
volumes, additionalVolumes)
)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not generate pod template: %v", err) return nil, fmt.Errorf("could not generate pod template: %v", err)
} }
@ -1298,6 +1316,69 @@ func addSecretVolume(podSpec *v1.PodSpec, additionalSecretMount string, addition
podSpec.Volumes = volumes podSpec.Volumes = volumes
} }
func (c *Cluster) addAdditionalVolumes(podSpec *v1.PodSpec,
additionalVolumes []acidv1.AdditionalVolume) {
volumes := podSpec.Volumes
mountPaths := map[string]acidv1.AdditionalVolume{}
for i, v := range additionalVolumes {
if previousVolume, exist := mountPaths[v.MountPath]; exist {
msg := "Volume %+v cannot be mounted to the same path as %+v"
c.logger.Warningf(msg, v, previousVolume)
continue
}
if v.MountPath == constants.PostgresDataMount {
msg := "Cannot mount volume on postgresql data directory, %+v"
c.logger.Warningf(msg, v)
continue
}
if v.TargetContainers == nil {
spiloContainer := podSpec.Containers[0]
additionalVolumes[i].TargetContainers = []string{spiloContainer.Name}
}
for _, target := range v.TargetContainers {
if target == "all" && len(v.TargetContainers) != 1 {
msg := `Target containers could be either "all" or a list
of containers, mixing those is not allowed, %+v`
c.logger.Warningf(msg, v)
continue
}
}
volumes = append(volumes,
v1.Volume{
Name: v.Name,
VolumeSource: v.VolumeSource,
},
)
mountPaths[v.MountPath] = v
}
c.logger.Infof("Mount additional volumes: %+v", additionalVolumes)
for i := range podSpec.Containers {
mounts := podSpec.Containers[i].VolumeMounts
for _, v := range additionalVolumes {
for _, target := range v.TargetContainers {
if podSpec.Containers[i].Name == target || target == "all" {
mounts = append(mounts, v1.VolumeMount{
Name: v.Name,
MountPath: v.MountPath,
SubPath: v.SubPath,
})
}
}
}
podSpec.Containers[i].VolumeMounts = mounts
}
podSpec.Volumes = volumes
}
func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) {
var storageClassName *string var storageClassName *string
@ -1545,11 +1626,11 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription)
c.logger.Info(msg, description.S3WalPath) c.logger.Info(msg, description.S3WalPath)
envs := []v1.EnvVar{ envs := []v1.EnvVar{
v1.EnvVar{ {
Name: "CLONE_WAL_S3_BUCKET", Name: "CLONE_WAL_S3_BUCKET",
Value: c.OpConfig.WALES3Bucket, Value: c.OpConfig.WALES3Bucket,
}, },
v1.EnvVar{ {
Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX", Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX",
Value: getBucketScopeSuffix(description.UID), Value: getBucketScopeSuffix(description.UID),
}, },
@ -1702,7 +1783,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
annotations := c.generatePodAnnotations(&c.Spec) annotations := c.generatePodAnnotations(&c.Spec)
// re-use the method that generates DB pod templates // re-use the method that generates DB pod templates
if podTemplate, err = generatePodTemplate( if podTemplate, err = c.generatePodTemplate(
c.Namespace, c.Namespace,
labels, labels,
annotations, annotations,
@ -1721,8 +1802,8 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
"", "",
c.OpConfig.AdditionalSecretMount, c.OpConfig.AdditionalSecretMount,
c.OpConfig.AdditionalSecretMountPath, c.OpConfig.AdditionalSecretMountPath,
nil); err != nil { []acidv1.AdditionalVolume{}); err != nil {
return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err) return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err)
} }
// overwrite specific params of logical backups pods // overwrite specific params of logical backups pods

View File

@ -37,7 +37,7 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) {
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
testName := "TestGenerateSpiloConfig" testName := "TestGenerateSpiloConfig"
tests := []struct { tests := []struct {
@ -102,7 +102,7 @@ func TestCreateLoadBalancerLogic(t *testing.T) {
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
testName := "TestCreateLoadBalancerLogic" testName := "TestCreateLoadBalancerLogic"
tests := []struct { tests := []struct {
@ -164,7 +164,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{ acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger), logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{ policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-pdb", Name: "postgres-myapp-database-pdb",
@ -187,7 +188,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{ acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}},
logger), logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{ policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-pdb", Name: "postgres-myapp-database-pdb",
@ -210,7 +212,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{ acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger), logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{ policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-pdb", Name: "postgres-myapp-database-pdb",
@ -233,7 +236,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{ acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger), logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{ policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-databass-budget", Name: "postgres-myapp-database-databass-budget",
@ -368,7 +372,7 @@ func TestCloneEnv(t *testing.T) {
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests { for _, tt := range tests {
envs := cluster.generateCloneEnvironment(tt.cloneOpts) envs := cluster.generateCloneEnvironment(tt.cloneOpts)
@ -502,7 +506,7 @@ func TestGetPgVersion(t *testing.T) {
ReplicationUsername: replicationUserName, ReplicationUsername: replicationUserName,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests { for _, tt := range tests {
pgVersion, err := cluster.getNewPgVersion(tt.pgContainer, tt.newPgVersion) pgVersion, err := cluster.getNewPgVersion(tt.pgContainer, tt.newPgVersion)
@ -678,7 +682,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
var clusterNoDefaultRes = New( var clusterNoDefaultRes = New(
Config{ Config{
@ -690,7 +694,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
}, },
ConnectionPooler: config.ConnectionPooler{}, ConnectionPooler: config.ConnectionPooler{},
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil }
@ -803,7 +807,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{ cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-sts", Name: "test-sts",
@ -904,7 +908,7 @@ func TestConnectionPoolerServiceSpec(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{ cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-sts", Name: "test-sts",
@ -961,6 +965,7 @@ func TestTLS(t *testing.T) {
var spec acidv1.PostgresSpec var spec acidv1.PostgresSpec
var cluster *Cluster var cluster *Cluster
var spiloFSGroup = int64(103) var spiloFSGroup = int64(103)
var additionalVolumes = spec.AdditionalVolumes
makeSpec := func(tls acidv1.TLSDescription) acidv1.PostgresSpec { makeSpec := func(tls acidv1.TLSDescription) acidv1.PostgresSpec {
return acidv1.PostgresSpec{ return acidv1.PostgresSpec{
@ -989,7 +994,7 @@ func TestTLS(t *testing.T) {
SpiloFSGroup: &spiloFSGroup, SpiloFSGroup: &spiloFSGroup,
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
spec = makeSpec(acidv1.TLSDescription{SecretName: "my-secret", CAFile: "ca.crt"}) spec = makeSpec(acidv1.TLSDescription{SecretName: "my-secret", CAFile: "ca.crt"})
s, err := cluster.generateStatefulSet(&spec) s, err := cluster.generateStatefulSet(&spec)
if err != nil { if err != nil {
@ -1000,8 +1005,20 @@ func TestTLS(t *testing.T) {
assert.Equal(t, &fsGroup, s.Spec.Template.Spec.SecurityContext.FSGroup, "has a default FSGroup assigned") assert.Equal(t, &fsGroup, s.Spec.Template.Spec.SecurityContext.FSGroup, "has a default FSGroup assigned")
defaultMode := int32(0640) defaultMode := int32(0640)
mountPath := "/tls"
additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{
Name: spec.TLS.SecretName,
MountPath: mountPath,
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: spec.TLS.SecretName,
DefaultMode: &defaultMode,
},
},
})
volume := v1.Volume{ volume := v1.Volume{
Name: "tls-secret", Name: "my-secret",
VolumeSource: v1.VolumeSource{ VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{ Secret: &v1.SecretVolumeSource{
SecretName: "my-secret", SecretName: "my-secret",
@ -1013,11 +1030,179 @@ func TestTLS(t *testing.T) {
assert.Contains(t, s.Spec.Template.Spec.Containers[0].VolumeMounts, v1.VolumeMount{ assert.Contains(t, s.Spec.Template.Spec.Containers[0].VolumeMounts, v1.VolumeMount{
MountPath: "/tls", MountPath: "/tls",
Name: "tls-secret", Name: "my-secret",
ReadOnly: true,
}, "the volume gets mounted in /tls") }, "the volume gets mounted in /tls")
assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_CERTIFICATE_FILE", Value: "/tls/tls.crt"}) assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_CERTIFICATE_FILE", Value: "/tls/tls.crt"})
assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_PRIVATE_KEY_FILE", Value: "/tls/tls.key"}) assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_PRIVATE_KEY_FILE", Value: "/tls/tls.key"})
assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_CA_FILE", Value: "/tls/ca.crt"}) assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_CA_FILE", Value: "/tls/ca.crt"})
} }
func TestAdditionalVolume(t *testing.T) {
testName := "TestAdditionalVolume"
tests := []struct {
subTest string
podSpec *v1.PodSpec
volumePos int
}{
{
subTest: "empty PodSpec",
podSpec: &v1.PodSpec{
Volumes: []v1.Volume{},
Containers: []v1.Container{
{
VolumeMounts: []v1.VolumeMount{},
},
},
},
volumePos: 0,
},
{
subTest: "non empty PodSpec",
podSpec: &v1.PodSpec{
Volumes: []v1.Volume{{}},
Containers: []v1.Container{
{
Name: "postgres",
VolumeMounts: []v1.VolumeMount{
{
Name: "data",
ReadOnly: false,
MountPath: "/data",
},
},
},
},
},
volumePos: 1,
},
{
subTest: "non empty PodSpec with sidecar",
podSpec: &v1.PodSpec{
Volumes: []v1.Volume{{}},
Containers: []v1.Container{
{
Name: "postgres",
VolumeMounts: []v1.VolumeMount{
{
Name: "data",
ReadOnly: false,
MountPath: "/data",
},
},
},
{
Name: "sidecar",
VolumeMounts: []v1.VolumeMount{
{
Name: "data",
ReadOnly: false,
MountPath: "/data",
},
},
},
},
},
volumePos: 1,
},
}
var cluster = New(
Config{
OpConfig: config.Config{
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests {
// Test with additional volume mounted in all containers
additionalVolumeMount := []acidv1.AdditionalVolume{
{
Name: "test",
MountPath: "/test",
TargetContainers: []string{"all"},
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
},
}
numMounts := len(tt.podSpec.Containers[0].VolumeMounts)
cluster.addAdditionalVolumes(tt.podSpec, additionalVolumeMount)
volumeName := tt.podSpec.Volumes[tt.volumePos].Name
if volumeName != additionalVolumeMount[0].Name {
t.Errorf("%s %s: Expected volume %v was not created, have %s instead",
testName, tt.subTest, additionalVolumeMount, volumeName)
}
for i := range tt.podSpec.Containers {
volumeMountName := tt.podSpec.Containers[i].VolumeMounts[tt.volumePos].Name
if volumeMountName != additionalVolumeMount[0].Name {
t.Errorf("%s %s: Expected mount %v was not created, have %s instead",
testName, tt.subTest, additionalVolumeMount, volumeMountName)
}
}
numMountsCheck := len(tt.podSpec.Containers[0].VolumeMounts)
if numMountsCheck != numMounts+1 {
t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v",
numMountsCheck, numMounts+1)
}
}
for _, tt := range tests {
// Test with additional volume mounted only in first container
additionalVolumeMount := []acidv1.AdditionalVolume{
{
Name: "test",
MountPath: "/test",
TargetContainers: []string{"postgres"},
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
},
}
numMounts := len(tt.podSpec.Containers[0].VolumeMounts)
cluster.addAdditionalVolumes(tt.podSpec, additionalVolumeMount)
volumeName := tt.podSpec.Volumes[tt.volumePos].Name
if volumeName != additionalVolumeMount[0].Name {
t.Errorf("%s %s: Expected volume %v was not created, have %s instead",
testName, tt.subTest, additionalVolumeMount, volumeName)
}
for _, container := range tt.podSpec.Containers {
if container.Name == "postgres" {
volumeMountName := container.VolumeMounts[tt.volumePos].Name
if volumeMountName != additionalVolumeMount[0].Name {
t.Errorf("%s %s: Expected mount %v was not created, have %s instead",
testName, tt.subTest, additionalVolumeMount, volumeMountName)
}
numMountsCheck := len(container.VolumeMounts)
if numMountsCheck != numMounts+1 {
t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v",
numMountsCheck, numMounts+1)
}
} else {
numMountsCheck := len(container.VolumeMounts)
if numMountsCheck == numMounts+1 {
t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v",
numMountsCheck, numMounts)
}
}
}
}
}

View File

@ -294,6 +294,27 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
return pod, nil return pod, nil
} }
func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
/*
Operator should not re-create pods if there is at least one replica being bootstrapped
because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag).
XXX operator cannot forbid replica re-init, so we might still fail if re-init is started
after this check succeeds but before a pod is re-created
*/
for _, pod := range pods.Items {
state, err := c.patroni.GetPatroniMemberState(&pod)
if err != nil || state == "creating replica" {
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
return false
}
}
return true
}
func (c *Cluster) recreatePods() error { func (c *Cluster) recreatePods() error {
c.setProcessName("starting to recreate pods") c.setProcessName("starting to recreate pods")
ls := c.labelsSet(false) ls := c.labelsSet(false)
@ -309,6 +330,10 @@ func (c *Cluster) recreatePods() error {
} }
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items)) c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))
if !c.isSafeToRecreatePods(pods) {
return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initilalized")
}
var ( var (
masterPod, newMasterPod, newPod *v1.Pod masterPod, newMasterPod, newPod *v1.Pod
) )

View File

@ -105,7 +105,12 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo
var msg string var msg string
c.setProcessName("creating connection pooler") c.setProcessName("creating connection pooler")
if c.ConnectionPooler == nil {
c.ConnectionPooler = &ConnectionPoolerObjects{}
}
schema := c.Spec.ConnectionPooler.Schema schema := c.Spec.ConnectionPooler.Schema
if schema == "" { if schema == "" {
schema = c.OpConfig.ConnectionPooler.Schema schema = c.OpConfig.ConnectionPooler.Schema
} }

View File

@ -36,7 +36,7 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{ cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -85,7 +85,7 @@ func TestNeedConnectionPooler(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi", ConnectionPoolerDefaultMemoryLimit: "100Mi",
}, },
}, },
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder)
cluster.Spec = acidv1.PostgresSpec{ cluster.Spec = acidv1.PostgresSpec{
ConnectionPooler: &acidv1.ConnectionPooler{}, ConnectionPooler: &acidv1.ConnectionPooler{},

View File

@ -111,7 +111,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
} }
// sync connection pooler // sync connection pooler
if err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil { if _, err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil {
return fmt.Errorf("could not sync connection pooler: %v", err) return fmt.Errorf("could not sync connection pooler: %v", err)
} }
@ -122,10 +122,11 @@ func (c *Cluster) syncServices() error {
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []PostgresRole{Master, Replica} {
c.logger.Debugf("syncing %s service", role) c.logger.Debugf("syncing %s service", role)
if err := c.syncEndpoint(role); err != nil { if !c.patroniKubernetesUseConfigMaps() {
return fmt.Errorf("could not sync %s endpoint: %v", role, err) if err := c.syncEndpoint(role); err != nil {
return fmt.Errorf("could not sync %s endpoint: %v", role, err)
}
} }
if err := c.syncService(role); err != nil { if err := c.syncService(role); err != nil {
return fmt.Errorf("could not sync %s service: %v", role, err) return fmt.Errorf("could not sync %s service: %v", role, err)
} }
@ -377,10 +378,12 @@ func (c *Cluster) syncStatefulSet() error {
// statefulset or those that got their configuration from the outdated statefulset) // statefulset or those that got their configuration from the outdated statefulset)
if podsRollingUpdateRequired { if podsRollingUpdateRequired {
c.logger.Debugln("performing rolling update") c.logger.Debugln("performing rolling update")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
if err := c.recreatePods(); err != nil { if err := c.recreatePods(); err != nil {
return fmt.Errorf("could not recreate pods: %v", err) return fmt.Errorf("could not recreate pods: %v", err)
} }
c.logger.Infof("pods have been recreated") c.logger.Infof("pods have been recreated")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil { if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil {
c.logger.Warningf("could not clear rolling update for the statefulset: %v", err) c.logger.Warningf("could not clear rolling update for the statefulset: %v", err)
} }
@ -655,7 +658,13 @@ func (c *Cluster) syncLogicalBackupJob() error {
return nil return nil
} }
func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, lookup InstallFunction) error { func (c *Cluster) syncConnectionPooler(oldSpec,
newSpec *acidv1.Postgresql,
lookup InstallFunction) (SyncReason, error) {
var reason SyncReason
var err error
if c.ConnectionPooler == nil { if c.ConnectionPooler == nil {
c.ConnectionPooler = &ConnectionPoolerObjects{} c.ConnectionPooler = &ConnectionPoolerObjects{}
} }
@ -692,20 +701,20 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, look
specUser, specUser,
c.OpConfig.ConnectionPooler.User) c.OpConfig.ConnectionPooler.User)
if err := lookup(schema, user); err != nil { if err = lookup(schema, user); err != nil {
return err return NoSync, err
} }
} }
if err := c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil { if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil {
c.logger.Errorf("could not sync connection pooler: %v", err) c.logger.Errorf("could not sync connection pooler: %v", err)
return err return reason, err
} }
} }
if oldNeedConnectionPooler && !newNeedConnectionPooler { if oldNeedConnectionPooler && !newNeedConnectionPooler {
// delete and cleanup resources // delete and cleanup resources
if err := c.deleteConnectionPooler(); err != nil { if err = c.deleteConnectionPooler(); err != nil {
c.logger.Warningf("could not remove connection pooler: %v", err) c.logger.Warningf("could not remove connection pooler: %v", err)
} }
} }
@ -716,20 +725,22 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, look
(c.ConnectionPooler.Deployment != nil || (c.ConnectionPooler.Deployment != nil ||
c.ConnectionPooler.Service != nil) { c.ConnectionPooler.Service != nil) {
if err := c.deleteConnectionPooler(); err != nil { if err = c.deleteConnectionPooler(); err != nil {
c.logger.Warningf("could not remove connection pooler: %v", err) c.logger.Warningf("could not remove connection pooler: %v", err)
} }
} }
} }
return nil return reason, nil
} }
// Synchronize connection pooler resources. Effectively we're interested only in // Synchronize connection pooler resources. Effectively we're interested only in
// synchronizing the corresponding deployment, but in case of deployment or // synchronizing the corresponding deployment, but in case of deployment or
// service is missing, create it. After checking, also remember an object for // service is missing, create it. After checking, also remember an object for
// the future references. // the future references.
func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) error { func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) (
SyncReason, error) {
deployment, err := c.KubeClient. deployment, err := c.KubeClient.
Deployments(c.Namespace). Deployments(c.Namespace).
Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{}) Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{})
@ -741,7 +752,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec) deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec)
if err != nil { if err != nil {
msg = "could not generate deployment for connection pooler: %v" msg = "could not generate deployment for connection pooler: %v"
return fmt.Errorf(msg, err) return NoSync, fmt.Errorf(msg, err)
} }
deployment, err := c.KubeClient. deployment, err := c.KubeClient.
@ -749,18 +760,35 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) Create(context.TODO(), deploymentSpec, metav1.CreateOptions{})
if err != nil { if err != nil {
return err return NoSync, err
} }
c.ConnectionPooler.Deployment = deployment c.ConnectionPooler.Deployment = deployment
} else if err != nil { } else if err != nil {
return fmt.Errorf("could not get connection pooler deployment to sync: %v", err) msg := "could not get connection pooler deployment to sync: %v"
return NoSync, fmt.Errorf(msg, err)
} else { } else {
c.ConnectionPooler.Deployment = deployment c.ConnectionPooler.Deployment = deployment
// actual synchronization // actual synchronization
oldConnectionPooler := oldSpec.Spec.ConnectionPooler oldConnectionPooler := oldSpec.Spec.ConnectionPooler
newConnectionPooler := newSpec.Spec.ConnectionPooler newConnectionPooler := newSpec.Spec.ConnectionPooler
// sync implementation below assumes that both old and new specs are
// not nil, but it can happen. To avoid any confusion like updating a
// deployment because the specification changed from nil to an empty
// struct (that was initialized somewhere before) replace any nil with
// an empty spec.
if oldConnectionPooler == nil {
oldConnectionPooler = &acidv1.ConnectionPooler{}
}
if newConnectionPooler == nil {
newConnectionPooler = &acidv1.ConnectionPooler{}
}
c.logger.Infof("Old: %+v, New %+v", oldConnectionPooler, newConnectionPooler)
specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler)
defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment) defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment)
reason := append(specReason, defaultsReason...) reason := append(specReason, defaultsReason...)
@ -771,7 +799,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec) newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec)
if err != nil { if err != nil {
msg := "could not generate deployment for connection pooler: %v" msg := "could not generate deployment for connection pooler: %v"
return fmt.Errorf(msg, err) return reason, fmt.Errorf(msg, err)
} }
oldDeploymentSpec := c.ConnectionPooler.Deployment oldDeploymentSpec := c.ConnectionPooler.Deployment
@ -781,11 +809,11 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
newDeploymentSpec) newDeploymentSpec)
if err != nil { if err != nil {
return err return reason, err
} }
c.ConnectionPooler.Deployment = deployment c.ConnectionPooler.Deployment = deployment
return nil return reason, nil
} }
} }
@ -803,16 +831,17 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) Create(context.TODO(), serviceSpec, metav1.CreateOptions{})
if err != nil { if err != nil {
return err return NoSync, err
} }
c.ConnectionPooler.Service = service c.ConnectionPooler.Service = service
} else if err != nil { } else if err != nil {
return fmt.Errorf("could not get connection pooler service to sync: %v", err) msg := "could not get connection pooler service to sync: %v"
return NoSync, fmt.Errorf(msg, err)
} else { } else {
// Service updates are not supported and probably not that useful anyway // Service updates are not supported and probably not that useful anyway
c.ConnectionPooler.Service = service c.ConnectionPooler.Service = service
} }
return nil return NoSync, nil
} }

View File

@ -2,6 +2,7 @@ package cluster
import ( import (
"fmt" "fmt"
"strings"
"testing" "testing"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
@ -17,7 +18,7 @@ func int32ToPointer(value int32) *int32 {
return &value return &value
} }
func deploymentUpdated(cluster *Cluster, err error) error { func deploymentUpdated(cluster *Cluster, err error, reason SyncReason) error {
if cluster.ConnectionPooler.Deployment.Spec.Replicas == nil || if cluster.ConnectionPooler.Deployment.Spec.Replicas == nil ||
*cluster.ConnectionPooler.Deployment.Spec.Replicas != 2 { *cluster.ConnectionPooler.Deployment.Spec.Replicas != 2 {
return fmt.Errorf("Wrong nubmer of instances") return fmt.Errorf("Wrong nubmer of instances")
@ -26,7 +27,7 @@ func deploymentUpdated(cluster *Cluster, err error) error {
return nil return nil
} }
func objectsAreSaved(cluster *Cluster, err error) error { func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error {
if cluster.ConnectionPooler == nil { if cluster.ConnectionPooler == nil {
return fmt.Errorf("Connection pooler resources are empty") return fmt.Errorf("Connection pooler resources are empty")
} }
@ -42,7 +43,7 @@ func objectsAreSaved(cluster *Cluster, err error) error {
return nil return nil
} }
func objectsAreDeleted(cluster *Cluster, err error) error { func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error {
if cluster.ConnectionPooler != nil { if cluster.ConnectionPooler != nil {
return fmt.Errorf("Connection pooler was not deleted") return fmt.Errorf("Connection pooler was not deleted")
} }
@ -50,6 +51,16 @@ func objectsAreDeleted(cluster *Cluster, err error) error {
return nil return nil
} }
func noEmptySync(cluster *Cluster, err error, reason SyncReason) error {
for _, msg := range reason {
if strings.HasPrefix(msg, "update [] from '<nil>' to '") {
return fmt.Errorf("There is an empty reason, %s", msg)
}
}
return nil
}
func TestConnectionPoolerSynchronization(t *testing.T) { func TestConnectionPoolerSynchronization(t *testing.T) {
testName := "Test connection pooler synchronization" testName := "Test connection pooler synchronization"
var cluster = New( var cluster = New(
@ -68,7 +79,7 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
NumberOfInstances: int32ToPointer(1), NumberOfInstances: int32ToPointer(1),
}, },
}, },
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{ cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -91,15 +102,15 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
clusterNewDefaultsMock := *cluster clusterNewDefaultsMock := *cluster
clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient()
cluster.OpConfig.ConnectionPooler.Image = "pooler:2.0"
cluster.OpConfig.ConnectionPooler.NumberOfInstances = int32ToPointer(2)
tests := []struct { tests := []struct {
subTest string subTest string
oldSpec *acidv1.Postgresql oldSpec *acidv1.Postgresql
newSpec *acidv1.Postgresql newSpec *acidv1.Postgresql
cluster *Cluster cluster *Cluster
check func(cluster *Cluster, err error) error defaultImage string
defaultInstances int32
check func(cluster *Cluster, err error, reason SyncReason) error
}{ }{
{ {
subTest: "create if doesn't exist", subTest: "create if doesn't exist",
@ -113,8 +124,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
ConnectionPooler: &acidv1.ConnectionPooler{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
}, },
cluster: &clusterMissingObjects, cluster: &clusterMissingObjects,
check: objectsAreSaved, defaultImage: "pooler:1.0",
defaultInstances: 1,
check: objectsAreSaved,
}, },
{ {
subTest: "create if doesn't exist with a flag", subTest: "create if doesn't exist with a flag",
@ -126,8 +139,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
EnableConnectionPooler: boolToPointer(true), EnableConnectionPooler: boolToPointer(true),
}, },
}, },
cluster: &clusterMissingObjects, cluster: &clusterMissingObjects,
check: objectsAreSaved, defaultImage: "pooler:1.0",
defaultInstances: 1,
check: objectsAreSaved,
}, },
{ {
subTest: "create from scratch", subTest: "create from scratch",
@ -139,8 +154,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
ConnectionPooler: &acidv1.ConnectionPooler{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
}, },
cluster: &clusterMissingObjects, cluster: &clusterMissingObjects,
check: objectsAreSaved, defaultImage: "pooler:1.0",
defaultInstances: 1,
check: objectsAreSaved,
}, },
{ {
subTest: "delete if not needed", subTest: "delete if not needed",
@ -152,8 +169,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
newSpec: &acidv1.Postgresql{ newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{}, Spec: acidv1.PostgresSpec{},
}, },
cluster: &clusterMock, cluster: &clusterMock,
check: objectsAreDeleted, defaultImage: "pooler:1.0",
defaultInstances: 1,
check: objectsAreDeleted,
}, },
{ {
subTest: "cleanup if still there", subTest: "cleanup if still there",
@ -163,8 +182,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
newSpec: &acidv1.Postgresql{ newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{}, Spec: acidv1.PostgresSpec{},
}, },
cluster: &clusterDirtyMock, cluster: &clusterDirtyMock,
check: objectsAreDeleted, defaultImage: "pooler:1.0",
defaultInstances: 1,
check: objectsAreDeleted,
}, },
{ {
subTest: "update deployment", subTest: "update deployment",
@ -182,8 +203,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
}, },
}, },
}, },
cluster: &clusterMock, cluster: &clusterMock,
check: deploymentUpdated, defaultImage: "pooler:1.0",
defaultInstances: 1,
check: deploymentUpdated,
}, },
{ {
subTest: "update image from changed defaults", subTest: "update image from changed defaults",
@ -197,14 +220,40 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
ConnectionPooler: &acidv1.ConnectionPooler{}, ConnectionPooler: &acidv1.ConnectionPooler{},
}, },
}, },
cluster: &clusterNewDefaultsMock, cluster: &clusterNewDefaultsMock,
check: deploymentUpdated, defaultImage: "pooler:2.0",
defaultInstances: 2,
check: deploymentUpdated,
},
{
subTest: "there is no sync from nil to an empty spec",
oldSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(true),
ConnectionPooler: nil,
},
},
newSpec: &acidv1.Postgresql{
Spec: acidv1.PostgresSpec{
EnableConnectionPooler: boolToPointer(true),
ConnectionPooler: &acidv1.ConnectionPooler{},
},
},
cluster: &clusterMock,
defaultImage: "pooler:1.0",
defaultInstances: 1,
check: noEmptySync,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
err := tt.cluster.syncConnectionPooler(tt.oldSpec, tt.newSpec, mockInstallLookupFunction) tt.cluster.OpConfig.ConnectionPooler.Image = tt.defaultImage
tt.cluster.OpConfig.ConnectionPooler.NumberOfInstances =
int32ToPointer(tt.defaultInstances)
if err := tt.check(tt.cluster, err); err != nil { reason, err := tt.cluster.syncConnectionPooler(tt.oldSpec,
tt.newSpec, mockInstallLookupFunction)
if err := tt.check(tt.cluster, err, reason); err != nil {
t.Errorf("%s [%s]: Could not synchronize, %+v", t.Errorf("%s [%s]: Could not synchronize, %+v",
testName, tt.subTest, err) testName, tt.subTest, err)
} }

View File

@ -73,3 +73,8 @@ type ClusterStatus struct {
type TemplateParams map[string]interface{} type TemplateParams map[string]interface{}
type InstallFunction func(schema string, user string) error type InstallFunction func(schema string, user string) error
type SyncReason []string
// no sync happened, empty value
var NoSync SyncReason = []string{}

View File

@ -7,24 +7,24 @@ import (
"sync" "sync"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/apiserver" "github.com/zalando/postgres-operator/pkg/apiserver"
"github.com/zalando/postgres-operator/pkg/cluster" "github.com/zalando/postgres-operator/pkg/cluster"
acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/ringlog" "github.com/zalando/postgres-operator/pkg/util/ringlog"
v1 "k8s.io/api/core/v1"
acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
) )
// Controller represents operator controller // Controller represents operator controller
@ -36,6 +36,9 @@ type Controller struct {
KubeClient k8sutil.KubernetesClient KubeClient k8sutil.KubernetesClient
apiserver *apiserver.Server apiserver *apiserver.Server
eventRecorder record.EventRecorder
eventBroadcaster record.EventBroadcaster
stopCh chan struct{} stopCh chan struct{}
controllerID string controllerID string
@ -67,10 +70,21 @@ type Controller struct {
func NewController(controllerConfig *spec.ControllerConfig, controllerId string) *Controller { func NewController(controllerConfig *spec.ControllerConfig, controllerId string) *Controller {
logger := logrus.New() logger := logrus.New()
var myComponentName = "postgres-operator"
if controllerId != "" {
myComponentName += "/" + controllerId
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logger.Debugf)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: myComponentName})
c := &Controller{ c := &Controller{
config: *controllerConfig, config: *controllerConfig,
opConfig: &config.Config{}, opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"), logger: logger.WithField("pkg", "controller"),
eventRecorder: recorder,
eventBroadcaster: eventBroadcaster,
controllerID: controllerId, controllerID: controllerId,
curWorkerCluster: sync.Map{}, curWorkerCluster: sync.Map{},
clusterWorkers: make(map[spec.NamespacedName]uint32), clusterWorkers: make(map[spec.NamespacedName]uint32),
@ -93,6 +107,11 @@ func (c *Controller) initClients() {
if err != nil { if err != nil {
c.logger.Fatalf("could not create kubernetes clients: %v", err) c.logger.Fatalf("could not create kubernetes clients: %v", err)
} }
c.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.KubeClient.EventsGetter.Events("")})
if err != nil {
c.logger.Fatalf("could not setup kubernetes event sink: %v", err)
}
} }
func (c *Controller) initOperatorConfig() { func (c *Controller) initOperatorConfig() {

View File

@ -170,6 +170,11 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
fromCRD.ConnectionPooler.User, fromCRD.ConnectionPooler.User,
constants.ConnectionPoolerUserName) constants.ConnectionPoolerUserName)
if result.ConnectionPooler.User == result.SuperUsername {
msg := "Connection pool user is not allowed to be the same as super user, username: %s"
panic(fmt.Errorf(msg, result.ConnectionPooler.User))
}
result.ConnectionPooler.Image = util.Coalesce( result.ConnectionPooler.Image = util.Coalesce(
fromCRD.ConnectionPooler.Image, fromCRD.ConnectionPooler.Image,
"registry.opensource.zalan.do/acid/pgbouncer") "registry.opensource.zalan.do/acid/pgbouncer")

View File

@ -11,6 +11,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -157,7 +158,7 @@ func (c *Controller) acquireInitialListOfClusters() error {
} }
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) *cluster.Cluster { func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) *cluster.Cluster {
cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg) cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder)
cl.Run(c.stopCh) cl.Run(c.stopCh)
teamName := strings.ToLower(cl.Spec.TeamID) teamName := strings.ToLower(cl.Spec.TeamID)
@ -236,6 +237,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
if err := cl.Create(); err != nil { if err := cl.Create(); err != nil {
cl.Error = fmt.Sprintf("could not create cluster: %v", err) cl.Error = fmt.Sprintf("could not create cluster: %v", err)
lg.Error(cl.Error) lg.Error(cl.Error)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error)
return return
} }
@ -274,6 +276,8 @@ func (c *Controller) processEvent(event ClusterEvent) {
c.curWorkerCluster.Store(event.WorkerID, cl) c.curWorkerCluster.Store(event.WorkerID, cl)
cl.Delete() cl.Delete()
// Fixme - no error handling for delete ?
// c.eventRecorder.Eventf(cl.GetReference, v1.EventTypeWarning, "Delete", "%v", cl.Error)
func() { func() {
defer c.clustersMu.Unlock() defer c.clustersMu.Unlock()
@ -304,6 +308,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
c.curWorkerCluster.Store(event.WorkerID, cl) c.curWorkerCluster.Store(event.WorkerID, cl)
if err := cl.Sync(event.NewSpec); err != nil { if err := cl.Sync(event.NewSpec); err != nil {
cl.Error = fmt.Sprintf("could not sync cluster: %v", err) cl.Error = fmt.Sprintf("could not sync cluster: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error)
lg.Error(cl.Error) lg.Error(cl.Error)
return return
} }

View File

@ -219,5 +219,11 @@ func validate(cfg *Config) (err error) {
msg := "number of connection pooler instances should be higher than %d" msg := "number of connection pooler instances should be higher than %d"
err = fmt.Errorf(msg, constants.ConnectionPoolerMinInstances) err = fmt.Errorf(msg, constants.ConnectionPoolerMinInstances)
} }
if cfg.ConnectionPooler.User == cfg.SuperUsername {
msg := "Connection pool user is not allowed to be the same as super user, username: %s"
err = fmt.Errorf(msg, cfg.ConnectionPooler.User)
}
return return
} }

View File

@ -45,6 +45,7 @@ type KubernetesClient struct {
corev1.NodesGetter corev1.NodesGetter
corev1.NamespacesGetter corev1.NamespacesGetter
corev1.ServiceAccountsGetter corev1.ServiceAccountsGetter
corev1.EventsGetter
appsv1.StatefulSetsGetter appsv1.StatefulSetsGetter
appsv1.DeploymentsGetter appsv1.DeploymentsGetter
rbacv1.RoleBindingsGetter rbacv1.RoleBindingsGetter
@ -142,6 +143,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RESTClient = client.CoreV1().RESTClient()
kubeClient.RoleBindingsGetter = client.RbacV1() kubeClient.RoleBindingsGetter = client.RbacV1()
kubeClient.CronJobsGetter = client.BatchV1beta1() kubeClient.CronJobsGetter = client.BatchV1beta1()
kubeClient.EventsGetter = client.CoreV1()
apiextClient, err := apiextclient.NewForConfig(cfg) apiextClient, err := apiextclient.NewForConfig(cfg)
if err != nil { if err != nil {

View File

@ -3,6 +3,7 @@ package patroni
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net" "net"
@ -11,7 +12,7 @@ import (
"time" "time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
) )
const ( const (
@ -25,6 +26,7 @@ const (
type Interface interface { type Interface interface {
Switchover(master *v1.Pod, candidate string) error Switchover(master *v1.Pod, candidate string) error
SetPostgresParameters(server *v1.Pod, options map[string]string) error SetPostgresParameters(server *v1.Pod, options map[string]string) error
GetPatroniMemberState(pod *v1.Pod) (string, error)
} }
// Patroni API client // Patroni API client
@ -123,3 +125,36 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
} }
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
} }
//GetPatroniMemberState returns a state of member of a Patroni cluster
func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) {
apiURLString, err := apiURL(server)
if err != nil {
return "", err
}
response, err := p.httpClient.Get(apiURLString)
if err != nil {
return "", fmt.Errorf("could not perform Get request: %v", err)
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", fmt.Errorf("could not read response: %v", err)
}
data := make(map[string]interface{})
err = json.Unmarshal(body, &data)
if err != nil {
return "", err
}
state, ok := data["state"].(string)
if !ok {
return "", errors.New("Patroni Get call response contains wrong type for 'state' field")
}
return state, nil
}