diff --git a/.gitignore b/.gitignore index 991fe754f..0fdb50756 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ _obj _test _manifests +_tmp +github.com # Architecture specific extensions/prefixes *.[568vq] @@ -26,6 +28,7 @@ _testmain.go /vendor/ /build/ /docker/build/ +/github.com/ .idea scm-source.json diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index f64080ed5..78850ee3b 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -74,6 +74,28 @@ spec: - teamId - postgresql properties: + additionalVolumes: + type: array + items: + type: object + required: + - name + - mountPath + - volumeSource + properties: + name: + type: string + mountPath: + type: string + targetContainers: + type: array + nullable: true + items: + type: string + volumeSource: + type: object + subPath: + type: string allowedSourceRanges: type: array nullable: true @@ -342,6 +364,21 @@ spec: type: string teamId: type: string + tls: + type: object + required: + - secretName + properties: + secretName: + type: string + certificateFile: + type: string + privateKeyFile: + type: string + caFile: + type: string + caSecretName: + type: string tolerations: type: array items: diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index 38ce85e7a..0defcab41 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -42,6 +42,13 @@ rules: - configmaps verbs: - get +# to send events to the CRs +- apiGroups: + - "" + resources: + - events + verbs: + - create # to manage endpoints which are also used by Patroni - apiGroups: - "" diff --git a/charts/postgres-operator/values-crd.yaml b/charts/postgres-operator/values-crd.yaml index 4d71f0e37..9971439c0 100644 --- a/charts/postgres-operator/values-crd.yaml +++ b/charts/postgres-operator/values-crd.yaml @@ -279,11 +279,11 @@ configConnectionPooler: # docker image connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer" # max db connections the pooler should hold - connection_pooler_max_db_connections: 60 + connection_pooler_max_db_connections: "60" # default pooling mode connection_pooler_mode: "transaction" # number of pooler instances - connection_pooler_number_of_instances: 2 + connection_pooler_number_of_instances: "2" # default resources connection_pooler_default_cpu_request: 500m connection_pooler_default_memory_request: 100Mi @@ -296,6 +296,7 @@ rbac: crd: # Specifies whether custom resource definitions should be created + # When using helm3, this is ignored; instead use "--skip-crds" to skip. create: true serviceAccount: diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index 08799b6a9..2a715f991 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -256,11 +256,11 @@ configConnectionPooler: # docker image connection_pooler_image: "registry.opensource.zalan.do/acid/pgbouncer" # max db connections the pooler should hold - connection_pooler_max_db_connections: 60 + connection_pooler_max_db_connections: "60" # default pooling mode connection_pooler_mode: "transaction" # number of pooler instances - connection_pooler_number_of_instances: 2 + connection_pooler_number_of_instances: "2" # default resources connection_pooler_default_cpu_request: 500m connection_pooler_default_memory_request: 100Mi @@ -273,6 +273,7 @@ rbac: crd: # Specifies whether custom resource definitions should be created + # When using helm3, this is ignored; instead use "--skip-crds" to skip. create: true serviceAccount: diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 83dedabd3..c87728812 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -154,6 +154,18 @@ These parameters are grouped directly under the `spec` key in the manifest. [the reference schedule format](https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/#schedule) into account. Optional. Default is: "30 00 \* \* \*" +* **additionalVolumes** + List of additional volumes to mount in each container of the statefulset pod. + Each item must contain a `name`, `mountPath`, and `volumeSource` which is a + [kubernetes volumeSource](https://godoc.org/k8s.io/api/core/v1#VolumeSource). + It allows you to mount existing PersistentVolumeClaims, ConfigMaps and Secrets inside the StatefulSet. + Also an `emptyDir` volume can be shared between initContainer and statefulSet. + Additionaly, you can provide a `SubPath` for volume mount (a file in a configMap source volume, for example). + You can also specify in which container the additional Volumes will be mounted with the `targetContainers` array option. + If `targetContainers` is empty, additional volumes will be mounted only in the `postgres` container. + If you set the `all` special item, it will be mounted in all containers (postgres + sidecars). + Else you can set the list of target containers in which the additional volumes will be mounted (eg : postgres, telegraf) + ## Postgres parameters Those parameters are grouped under the `postgresql` top-level key, which is @@ -419,5 +431,16 @@ Those parameters are grouped under the `tls` top-level key. Filename of the private key. Defaults to "tls.key". * **caFile** - Optional filename to the CA certificate. Useful when the client connects - with `sslmode=verify-ca` or `sslmode=verify-full`. Default is empty. + Optional filename to the CA certificate (e.g. "ca.crt"). Useful when the + client connects with `sslmode=verify-ca` or `sslmode=verify-full`. + Default is empty. + +* **caSecretName** + By setting the `caSecretName` value, the ca certificate file defined by the + `caFile` will be fetched from this secret instead of `secretName` above. + This secret has to hold a file with that name in its root. + + Optionally one can provide full path for any of them. By default it is + relative to the "/tls/", which is mount path of the tls secret. + If `caSecretName` is defined, the ca.crt path is relative to "/tlsca/", + otherwise to the same "/tls/". diff --git a/docs/user.md b/docs/user.md index 1be50a01a..2c1c4fd1f 100644 --- a/docs/user.md +++ b/docs/user.md @@ -584,7 +584,8 @@ don't know the value, use `103` which is the GID from the default spilo image OpenShift allocates the users and groups dynamically (based on scc), and their range is different in every namespace. Due to this dynamic behaviour, it's not trivial to know at deploy time the uid/gid of the user in the cluster. -This way, in OpenShift, you may want to skip the spilo_fsgroup setting. +Therefore, instead of using a global `spilo_fsgroup` setting, use the `spiloFSGroup` field +per Postgres cluster. Upload the cert as a kubernetes secret: ```sh @@ -593,7 +594,7 @@ kubectl create secret tls pg-tls \ --cert pg-tls.crt ``` -Or with a CA: +When doing client auth, CA can come optionally from the same secret: ```sh kubectl create secret generic pg-tls \ --from-file=tls.crt=server.crt \ @@ -601,9 +602,6 @@ kubectl create secret generic pg-tls \ --from-file=ca.crt=ca.crt ``` -Alternatively it is also possible to use -[cert-manager](https://cert-manager.io/docs/) to generate these secrets. - Then configure the postgres resource with the TLS secret: ```yaml @@ -618,5 +616,29 @@ spec: caFile: "ca.crt" # add this if the secret is configured with a CA ``` +Optionally, the CA can be provided by a different secret: +```sh +kubectl create secret generic pg-tls-ca \ + --from-file=ca.crt=ca.crt +``` + +Then configure the postgres resource with the TLS secret: + +```yaml +apiVersion: "acid.zalan.do/v1" +kind: postgresql + +metadata: + name: acid-test-cluster +spec: + tls: + secretName: "pg-tls" # this should hold tls.key and tls.crt + caSecretName: "pg-tls-ca" # this should hold ca.crt + caFile: "ca.crt" # add this if the secret is configured with a CA +``` + +Alternatively, it is also possible to use +[cert-manager](https://cert-manager.io/docs/) to generate these secrets. + Certificate rotation is handled in the spilo image which checks every 5 minutes if the certificates have changed and reloads postgres accordingly. diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index f0e27994b..24d006d05 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -410,7 +410,6 @@ class EndToEndTestCase(unittest.TestCase): ''' k8s = self.k8s cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - labels = 'spilo-role=master,' + cluster_label readiness_label = 'lifecycle-status' readiness_value = 'ready' @@ -776,14 +775,16 @@ class K8s: def wait_for_logical_backup_job_creation(self): self.wait_for_logical_backup_job(expected_num_of_jobs=1) - def update_config(self, config_map_patch): - self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) - + def delete_operator_pod(self): operator_pod = self.api.core_v1.list_namespaced_pod( 'default', label_selector="name=postgres-operator").items[0].metadata.name self.api.core_v1.delete_namespaced_pod(operator_pod, "default") # restart reloads the conf self.wait_for_operator_pod_start() + def update_config(self, config_map_patch): + self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) + self.delete_operator_pod() + def create_with_kubectl(self, path): return subprocess.run( ["kubectl", "create", "-f", path], diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 3a87254cf..b469a7564 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -9,9 +9,6 @@ metadata: spec: dockerImage: registry.opensource.zalan.do/acid/spilo-12:1.6-p2 teamId: "acid" - volume: - size: 1Gi -# storageClass: my-sc numberOfInstances: 2 users: # Application/Robot users zalando: @@ -30,6 +27,32 @@ spec: shared_buffers: "32MB" max_connections: "10" log_statement: "all" + volume: + size: 1Gi +# storageClass: my-sc + additionalVolumes: + - name: data + mountPath: /home/postgres/pgdata/partitions + targetContainers: + - postgres + volumeSource: + PersistentVolumeClaim: + claimName: pvc-postgresql-data-partitions + readyOnly: false + - name: conf + mountPath: /etc/telegraf + subPath: telegraf.conf + targetContainers: + - telegraf-sidecar + volumeSource: + configMap: + name: my-config-map + - name: empty + mountPath: /opt/empty + targetContainers: + - all + volumeSource: + emptyDir: {} enableShmVolume: true # spiloFSGroup: 103 @@ -125,5 +148,10 @@ spec: certificateFile: "tls.crt" privateKeyFile: "tls.key" caFile: "" # optionally configure Postgres with a CA certificate + caSecretName: "" # optionally the ca.crt can come from this secret instead. +# file names can be also defined with absolute path, and will no longer be relative +# to the "/tls/" path where the secret is being mounted by default, and "/tlsca/" +# where the caSecret is mounted by default. # When TLS is enabled, also set spiloFSGroup parameter above to the relevant value. # if unknown, set it to 103 which is the usual value in the default spilo images. +# In Openshift, there is no need to set spiloFSGroup/spilo_fsgroup. diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index 83cd721e7..667941a24 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -43,6 +43,13 @@ rules: - configmaps verbs: - get +# to send events to the CRs +- apiGroups: + - "" + resources: + - events + verbs: + - create # to manage endpoints which are also used by Patroni - apiGroups: - "" diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index f8631f0b7..1ee6a1ae5 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -38,6 +38,28 @@ spec: - teamId - postgresql properties: + additionalVolumes: + type: array + items: + type: object + required: + - name + - mountPath + - volumeSource + properties: + name: + type: string + mountPath: + type: string + targetContainers: + type: array + nullable: true + items: + type: string + volumeSource: + type: object + subPath: + type: string allowedSourceRanges: type: array nullable: true @@ -319,6 +341,8 @@ spec: type: string caFile: type: string + caSecretName: + type: string tolerations: type: array items: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index dea12820e..94a8b2351 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -513,6 +513,9 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ "caFile": { Type: "string", }, + "caSecretName": { + Type: "string", + }, }, }, "tolerations": { @@ -682,6 +685,37 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ }, }, }, + "additionalVolumes": { + Type: "array", + Items: &apiextv1beta1.JSONSchemaPropsOrArray{ + Schema: &apiextv1beta1.JSONSchemaProps{ + Type: "object", + Required: []string{"name", "mountPath", "volumeSource"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "name": { + Type: "string", + }, + "mountPath": { + Type: "string", + }, + "targetContainers": { + Type: "array", + Items: &apiextv1beta1.JSONSchemaPropsOrArray{ + Schema: &apiextv1beta1.JSONSchemaProps{ + Type: "string", + }, + }, + }, + "volumeSource": { + Type: "object", + }, + "subPath": { + Type: "string", + }, + }, + }, + }, + }, }, }, "status": { diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index b1b6a36a6..961051c8d 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -67,6 +67,7 @@ type PostgresSpec struct { PodAnnotations map[string]string `json:"podAnnotations"` ServiceAnnotations map[string]string `json:"serviceAnnotations"` TLS *TLSDescription `json:"tls"` + AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"` // deprecated json tags InitContainersOld []v1.Container `json:"init_containers,omitempty"` @@ -98,6 +99,14 @@ type Volume struct { SubPath string `json:"subPath,omitempty"` } +type AdditionalVolume struct { + Name string `json:"name"` + MountPath string `json:"mountPath"` + SubPath string `json:"subPath"` + TargetContainers []string `json:"targetContainers"` + VolumeSource v1.VolumeSource `json:"volume"` +} + // PostgresqlParam describes PostgreSQL version and pairs of configuration parameter name - values. type PostgresqlParam struct { PgVersion string `json:"version"` @@ -139,6 +148,7 @@ type TLSDescription struct { CertificateFile string `json:"certificateFile,omitempty"` PrivateKeyFile string `json:"privateKeyFile,omitempty"` CAFile string `json:"caFile,omitempty"` + CASecretName string `json:"caSecretName,omitempty"` } // CloneDescription describes which cluster the new should clone and up to which point in time diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 92c8af34b..e6b387ec4 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -47,6 +47,28 @@ func (in *AWSGCPConfiguration) DeepCopy() *AWSGCPConfiguration { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdditionalVolume) DeepCopyInto(out *AdditionalVolume) { + *out = *in + if in.TargetContainers != nil { + in, out := &in.TargetContainers, &out.TargetContainers + *out = make([]string, len(*in)) + copy(*out, *in) + } + in.VolumeSource.DeepCopyInto(&out.VolumeSource) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdditionalVolume. +func (in *AdditionalVolume) DeepCopy() *AdditionalVolume { + if in == nil { + return nil + } + out := new(AdditionalVolume) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CloneDescription) DeepCopyInto(out *CloneDescription) { *out = *in @@ -591,6 +613,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { *out = new(TLSDescription) **out = **in } + if in.AdditionalVolumes != nil { + in, out := &in.AdditionalVolumes, &out.AdditionalVolumes + *out = make([]AdditionalVolume, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.InitContainersOld != nil { in, out := &in.InitContainersOld, &out.InitContainersOld *out = make([]corev1.Container, len(*in)) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index c9db4dd08..2e04cb137 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -21,8 +21,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/reference" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" @@ -81,6 +84,7 @@ type Cluster struct { acidv1.Postgresql Config logger *logrus.Entry + eventRecorder record.EventRecorder patroni patroni.Interface pgUsers map[string]spec.PgUser systemUsers map[string]spec.PgUser @@ -109,7 +113,7 @@ type compareStatefulsetResult struct { } // New creates a new cluster. This function should be called from a controller. -func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry) *Cluster { +func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster { deletePropagationPolicy := metav1.DeletePropagationOrphan podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { @@ -140,7 +144,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName) cluster.patroni = patroni.New(cluster.logger) - + cluster.eventRecorder = eventRecorder return cluster } @@ -166,6 +170,16 @@ func (c *Cluster) setProcessName(procName string, args ...interface{}) { } } +// GetReference of Postgres CR object +// i.e. required to emit events to this resource +func (c *Cluster) GetReference() *v1.ObjectReference { + ref, err := reference.GetReference(scheme.Scheme, &c.Postgresql) + if err != nil { + c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", c.Postgresql.Namespace, c.Postgresql.Name, err) + } + return ref +} + // SetStatus of Postgres cluster // TODO: eventually switch to updateStatus() for kubernetes 1.11 and above func (c *Cluster) setStatus(status string) { @@ -245,6 +259,7 @@ func (c *Cluster) Create() error { }() c.setStatus(acidv1.ClusterStatusCreating) + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources") if err = c.enforceMinResourceLimits(&c.Spec); err != nil { return fmt.Errorf("could not enforce minimum resource limits: %v", err) @@ -263,6 +278,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create %s endpoint: %v", role, err) } c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Endpoints", "Endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) } if c.Services[role] != nil { @@ -273,6 +289,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create %s service: %v", role, err) } c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Services", "The service %q for role %s has been successfully created", util.NameFromMeta(service.ObjectMeta), role) } if err = c.initUsers(); err != nil { @@ -284,6 +301,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create secrets: %v", err) } c.logger.Infof("secrets have been successfully created") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created") if c.PodDisruptionBudget != nil { return fmt.Errorf("pod disruption budget already exists in the cluster") @@ -302,6 +320,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create statefulset: %v", err) } c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Info("waiting for the cluster being ready") @@ -310,6 +329,7 @@ func (c *Cluster) Create() error { return err } c.logger.Infof("pods are ready") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready") // create database objects unless we are running without pods or disabled // that feature explicitly @@ -566,6 +586,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error { } if isSmaller { c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit) spec.Resources.ResourceLimits.CPU = minCPULimit } } @@ -578,6 +599,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error { } if isSmaller { c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit) spec.Resources.ResourceLimits.Memory = minMemoryLimit } } @@ -609,6 +631,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { if oldSpec.Spec.PostgresqlParam.PgVersion != newSpec.Spec.PostgresqlParam.PgVersion { // PG versions comparison c.logger.Warningf("postgresql version change(%q -> %q) has no effect", oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect", + oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) //we need that hack to generate statefulset with the old version newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion } @@ -752,7 +776,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } // sync connection pooler - if err := c.syncConnectionPooler(oldSpec, newSpec, c.installLookupFunction); err != nil { + if _, err := c.syncConnectionPooler(oldSpec, newSpec, + c.installLookupFunction); err != nil { return fmt.Errorf("could not sync connection pooler: %v", err) } @@ -767,6 +792,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { func (c *Cluster) Delete() { c.mu.Lock() defer c.mu.Unlock() + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources") // delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods // deleting the cron job also removes pods and batch jobs it created @@ -794,8 +820,10 @@ func (c *Cluster) Delete() { for _, role := range []PostgresRole{Master, Replica} { - if err := c.deleteEndpoint(role); err != nil { - c.logger.Warningf("could not delete %s endpoint: %v", role, err) + if !c.patroniKubernetesUseConfigMaps() { + if err := c.deleteEndpoint(role); err != nil { + c.logger.Warningf("could not delete %s endpoint: %v", role, err) + } } if err := c.deleteService(role); err != nil { @@ -888,9 +916,20 @@ func (c *Cluster) initSystemUsers() { c.Spec.ConnectionPooler = &acidv1.ConnectionPooler{} } - username := util.Coalesce( - c.Spec.ConnectionPooler.User, - c.OpConfig.ConnectionPooler.User) + // Using superuser as pooler user is not a good idea. First of all it's + // not going to be synced correctly with the current implementation, + // and second it's a bad practice. + username := c.OpConfig.ConnectionPooler.User + + isSuperUser := c.Spec.ConnectionPooler.User == c.OpConfig.SuperUsername + isProtectedUser := c.shouldAvoidProtectedOrSystemRole( + c.Spec.ConnectionPooler.User, "connection pool role") + + if !isSuperUser && !isProtectedUser { + username = util.Coalesce( + c.Spec.ConnectionPooler.User, + c.OpConfig.ConnectionPooler.User) + } // connection pooler application should be able to login with this role connectionPoolerUser := spec.PgUser{ @@ -1092,6 +1131,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e var err error c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) var wg sync.WaitGroup @@ -1118,6 +1158,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) if err = <-podLabelErr; err != nil { err = fmt.Errorf("could not get master pod label: %v", err) } @@ -1133,6 +1174,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e // close the label waiting channel no sooner than the waiting goroutine terminates. close(podLabelErr) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err) return err } @@ -1160,11 +1202,19 @@ type clusterObjectDelete func(name string) error func (c *Cluster) deletePatroniClusterObjects() error { // TODO: figure out how to remove leftover patroni objects in other cases + var actionsList []simpleActionWithResult + if !c.patroniUsesKubernetes() { c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete") } - c.logger.Debugf("removing leftover Patroni objects (endpoints, services and configmaps)") - for _, deleter := range []simpleActionWithResult{c.deletePatroniClusterEndpoints, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps} { + + if !c.patroniKubernetesUseConfigMaps() { + actionsList = append(actionsList, c.deletePatroniClusterEndpoints) + } + actionsList = append(actionsList, c.deletePatroniClusterServices, c.deletePatroniClusterConfigMaps) + + c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)") + for _, deleter := range actionsList { if err := deleter(); err != nil { return err } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index af3092ad5..84ec04e3e 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -13,6 +13,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/teams" v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" ) const ( @@ -21,6 +22,8 @@ const ( ) var logger = logrus.New().WithField("test", "cluster") +var eventRecorder = record.NewFakeRecorder(1) + var cl = New( Config{ OpConfig: config.Config{ @@ -34,6 +37,7 @@ var cl = New( k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, + eventRecorder, ) func TestInitRobotUsers(t *testing.T) { @@ -721,4 +725,38 @@ func TestInitSystemUsers(t *testing.T) { if _, exist := cl.systemUsers[constants.ConnectionPoolerUserKeyName]; !exist { t.Errorf("%s, connection pooler user is not present", testName) } + + // superuser is not allowed as connection pool user + cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{ + User: "postgres", + } + cl.OpConfig.SuperUsername = "postgres" + cl.OpConfig.ConnectionPooler.User = "pooler" + + cl.initSystemUsers() + if _, exist := cl.pgUsers["pooler"]; !exist { + t.Errorf("%s, Superuser is not allowed to be a connection pool user", testName) + } + + // neither protected users are + delete(cl.pgUsers, "pooler") + cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{ + User: "admin", + } + cl.OpConfig.ProtectedRoles = []string{"admin"} + + cl.initSystemUsers() + if _, exist := cl.pgUsers["pooler"]; !exist { + t.Errorf("%s, Protected user are not allowed to be a connection pool user", testName) + } + + delete(cl.pgUsers, "pooler") + cl.Spec.ConnectionPooler = &acidv1.ConnectionPooler{ + User: "standby", + } + + cl.initSystemUsers() + if _, exist := cl.pgUsers["pooler"]; !exist { + t.Errorf("%s, System users are not allowed to be a connection pool user", testName) + } } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 8de61bdea..9fb33eab2 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -500,7 +500,7 @@ func mountShmVolumeNeeded(opConfig config.Config, spec *acidv1.PostgresSpec) *bo return opConfig.ShmVolume } -func generatePodTemplate( +func (c *Cluster) generatePodTemplate( namespace string, labels labels.Set, annotations map[string]string, @@ -519,7 +519,7 @@ func generatePodTemplate( podAntiAffinityTopologyKey string, additionalSecretMount string, additionalSecretMountPath string, - volumes []v1.Volume, + additionalVolumes []acidv1.AdditionalVolume, ) (*v1.PodTemplateSpec, error) { terminateGracePeriodSeconds := terminateGracePeriod @@ -538,7 +538,6 @@ func generatePodTemplate( InitContainers: initContainers, Tolerations: *tolerationsSpec, SecurityContext: &securityContext, - Volumes: volumes, } if shmVolume != nil && *shmVolume { @@ -559,6 +558,10 @@ func generatePodTemplate( addSecretVolume(&podSpec, additionalSecretMount, additionalSecretMountPath) } + if additionalVolumes != nil { + c.addAdditionalVolumes(&podSpec, additionalVolumes) + } + template := v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, @@ -849,7 +852,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef sidecarContainers []v1.Container podTemplate *v1.PodTemplateSpec volumeClaimTemplate *v1.PersistentVolumeClaim - volumes []v1.Volume + additionalVolumes = spec.AdditionalVolumes ) // Improve me. Please. @@ -1002,8 +1005,10 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef // this is combined with the FSGroup in the section above // to give read access to the postgres user defaultMode := int32(0640) - volumes = append(volumes, v1.Volume{ - Name: "tls-secret", + mountPath := "/tls" + additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{ + Name: spec.TLS.SecretName, + MountPath: mountPath, VolumeSource: v1.VolumeSource{ Secret: &v1.SecretVolumeSource{ SecretName: spec.TLS.SecretName, @@ -1012,13 +1017,6 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef }, }) - mountPath := "/tls" - volumeMounts = append(volumeMounts, v1.VolumeMount{ - MountPath: mountPath, - Name: "tls-secret", - ReadOnly: true, - }) - // use the same filenames as Secret resources by default certFile := ensurePath(spec.TLS.CertificateFile, mountPath, "tls.crt") privateKeyFile := ensurePath(spec.TLS.PrivateKeyFile, mountPath, "tls.key") @@ -1029,11 +1027,31 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef ) if spec.TLS.CAFile != "" { - caFile := ensurePath(spec.TLS.CAFile, mountPath, "") + // support scenario when the ca.crt resides in a different secret, diff path + mountPathCA := mountPath + if spec.TLS.CASecretName != "" { + mountPathCA = mountPath + "ca" + } + + caFile := ensurePath(spec.TLS.CAFile, mountPathCA, "") spiloEnvVars = append( spiloEnvVars, v1.EnvVar{Name: "SSL_CA_FILE", Value: caFile}, ) + + // the ca file from CASecretName secret takes priority + if spec.TLS.CASecretName != "" { + additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{ + Name: spec.TLS.CASecretName, + MountPath: mountPathCA, + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: spec.TLS.CASecretName, + DefaultMode: &defaultMode, + }, + }, + }) + } } } @@ -1084,7 +1102,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef annotations := c.generatePodAnnotations(spec) // generate pod template for the statefulset, based on the spilo container and sidecars - podTemplate, err = generatePodTemplate( + podTemplate, err = c.generatePodTemplate( c.Namespace, c.labelsSet(true), annotations, @@ -1103,8 +1121,8 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef c.OpConfig.PodAntiAffinityTopologyKey, c.OpConfig.AdditionalSecretMount, c.OpConfig.AdditionalSecretMountPath, - volumes, - ) + additionalVolumes) + if err != nil { return nil, fmt.Errorf("could not generate pod template: %v", err) } @@ -1298,6 +1316,69 @@ func addSecretVolume(podSpec *v1.PodSpec, additionalSecretMount string, addition podSpec.Volumes = volumes } +func (c *Cluster) addAdditionalVolumes(podSpec *v1.PodSpec, + additionalVolumes []acidv1.AdditionalVolume) { + + volumes := podSpec.Volumes + mountPaths := map[string]acidv1.AdditionalVolume{} + for i, v := range additionalVolumes { + if previousVolume, exist := mountPaths[v.MountPath]; exist { + msg := "Volume %+v cannot be mounted to the same path as %+v" + c.logger.Warningf(msg, v, previousVolume) + continue + } + + if v.MountPath == constants.PostgresDataMount { + msg := "Cannot mount volume on postgresql data directory, %+v" + c.logger.Warningf(msg, v) + continue + } + + if v.TargetContainers == nil { + spiloContainer := podSpec.Containers[0] + additionalVolumes[i].TargetContainers = []string{spiloContainer.Name} + } + + for _, target := range v.TargetContainers { + if target == "all" && len(v.TargetContainers) != 1 { + msg := `Target containers could be either "all" or a list + of containers, mixing those is not allowed, %+v` + c.logger.Warningf(msg, v) + continue + } + } + + volumes = append(volumes, + v1.Volume{ + Name: v.Name, + VolumeSource: v.VolumeSource, + }, + ) + + mountPaths[v.MountPath] = v + } + + c.logger.Infof("Mount additional volumes: %+v", additionalVolumes) + + for i := range podSpec.Containers { + mounts := podSpec.Containers[i].VolumeMounts + for _, v := range additionalVolumes { + for _, target := range v.TargetContainers { + if podSpec.Containers[i].Name == target || target == "all" { + mounts = append(mounts, v1.VolumeMount{ + Name: v.Name, + MountPath: v.MountPath, + SubPath: v.SubPath, + }) + } + } + } + podSpec.Containers[i].VolumeMounts = mounts + } + + podSpec.Volumes = volumes +} + func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { var storageClassName *string @@ -1545,11 +1626,11 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription) c.logger.Info(msg, description.S3WalPath) envs := []v1.EnvVar{ - v1.EnvVar{ + { Name: "CLONE_WAL_S3_BUCKET", Value: c.OpConfig.WALES3Bucket, }, - v1.EnvVar{ + { Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX", Value: getBucketScopeSuffix(description.UID), }, @@ -1702,7 +1783,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) { annotations := c.generatePodAnnotations(&c.Spec) // re-use the method that generates DB pod templates - if podTemplate, err = generatePodTemplate( + if podTemplate, err = c.generatePodTemplate( c.Namespace, labels, annotations, @@ -1721,8 +1802,8 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) { "", c.OpConfig.AdditionalSecretMount, c.OpConfig.AdditionalSecretMountPath, - nil); err != nil { - return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err) + []acidv1.AdditionalVolume{}); err != nil { + return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err) } // overwrite specific params of logical backups pods diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 0930279d2..1291d4f47 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -37,7 +37,7 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) testName := "TestGenerateSpiloConfig" tests := []struct { @@ -102,7 +102,7 @@ func TestCreateLoadBalancerLogic(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) testName := "TestCreateLoadBalancerLogic" tests := []struct { @@ -164,7 +164,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-pdb", @@ -187,7 +188,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-pdb", @@ -210,7 +212,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-pdb", @@ -233,7 +236,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-databass-budget", @@ -368,7 +372,7 @@ func TestCloneEnv(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) for _, tt := range tests { envs := cluster.generateCloneEnvironment(tt.cloneOpts) @@ -502,7 +506,7 @@ func TestGetPgVersion(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) for _, tt := range tests { pgVersion, err := cluster.getNewPgVersion(tt.pgContainer, tt.newPgVersion) @@ -678,7 +682,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) var clusterNoDefaultRes = New( Config{ @@ -690,7 +694,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { }, ConnectionPooler: config.ConnectionPooler{}, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } @@ -803,7 +807,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: "test-sts", @@ -904,7 +908,7 @@ func TestConnectionPoolerServiceSpec(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: "test-sts", @@ -961,6 +965,7 @@ func TestTLS(t *testing.T) { var spec acidv1.PostgresSpec var cluster *Cluster var spiloFSGroup = int64(103) + var additionalVolumes = spec.AdditionalVolumes makeSpec := func(tls acidv1.TLSDescription) acidv1.PostgresSpec { return acidv1.PostgresSpec{ @@ -989,7 +994,7 @@ func TestTLS(t *testing.T) { SpiloFSGroup: &spiloFSGroup, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) spec = makeSpec(acidv1.TLSDescription{SecretName: "my-secret", CAFile: "ca.crt"}) s, err := cluster.generateStatefulSet(&spec) if err != nil { @@ -1000,8 +1005,20 @@ func TestTLS(t *testing.T) { assert.Equal(t, &fsGroup, s.Spec.Template.Spec.SecurityContext.FSGroup, "has a default FSGroup assigned") defaultMode := int32(0640) + mountPath := "/tls" + additionalVolumes = append(additionalVolumes, acidv1.AdditionalVolume{ + Name: spec.TLS.SecretName, + MountPath: mountPath, + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: spec.TLS.SecretName, + DefaultMode: &defaultMode, + }, + }, + }) + volume := v1.Volume{ - Name: "tls-secret", + Name: "my-secret", VolumeSource: v1.VolumeSource{ Secret: &v1.SecretVolumeSource{ SecretName: "my-secret", @@ -1013,11 +1030,179 @@ func TestTLS(t *testing.T) { assert.Contains(t, s.Spec.Template.Spec.Containers[0].VolumeMounts, v1.VolumeMount{ MountPath: "/tls", - Name: "tls-secret", - ReadOnly: true, + Name: "my-secret", }, "the volume gets mounted in /tls") assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_CERTIFICATE_FILE", Value: "/tls/tls.crt"}) assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_PRIVATE_KEY_FILE", Value: "/tls/tls.key"}) assert.Contains(t, s.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "SSL_CA_FILE", Value: "/tls/ca.crt"}) } + +func TestAdditionalVolume(t *testing.T) { + testName := "TestAdditionalVolume" + tests := []struct { + subTest string + podSpec *v1.PodSpec + volumePos int + }{ + { + subTest: "empty PodSpec", + podSpec: &v1.PodSpec{ + Volumes: []v1.Volume{}, + Containers: []v1.Container{ + { + VolumeMounts: []v1.VolumeMount{}, + }, + }, + }, + volumePos: 0, + }, + { + subTest: "non empty PodSpec", + podSpec: &v1.PodSpec{ + Volumes: []v1.Volume{{}}, + Containers: []v1.Container{ + { + Name: "postgres", + VolumeMounts: []v1.VolumeMount{ + { + Name: "data", + ReadOnly: false, + MountPath: "/data", + }, + }, + }, + }, + }, + volumePos: 1, + }, + { + subTest: "non empty PodSpec with sidecar", + podSpec: &v1.PodSpec{ + Volumes: []v1.Volume{{}}, + Containers: []v1.Container{ + { + Name: "postgres", + VolumeMounts: []v1.VolumeMount{ + { + Name: "data", + ReadOnly: false, + MountPath: "/data", + }, + }, + }, + { + Name: "sidecar", + VolumeMounts: []v1.VolumeMount{ + { + Name: "data", + ReadOnly: false, + MountPath: "/data", + }, + }, + }, + }, + }, + volumePos: 1, + }, + } + + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + + for _, tt := range tests { + // Test with additional volume mounted in all containers + additionalVolumeMount := []acidv1.AdditionalVolume{ + { + Name: "test", + MountPath: "/test", + TargetContainers: []string{"all"}, + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }, + } + + numMounts := len(tt.podSpec.Containers[0].VolumeMounts) + + cluster.addAdditionalVolumes(tt.podSpec, additionalVolumeMount) + volumeName := tt.podSpec.Volumes[tt.volumePos].Name + + if volumeName != additionalVolumeMount[0].Name { + t.Errorf("%s %s: Expected volume %v was not created, have %s instead", + testName, tt.subTest, additionalVolumeMount, volumeName) + } + + for i := range tt.podSpec.Containers { + volumeMountName := tt.podSpec.Containers[i].VolumeMounts[tt.volumePos].Name + + if volumeMountName != additionalVolumeMount[0].Name { + t.Errorf("%s %s: Expected mount %v was not created, have %s instead", + testName, tt.subTest, additionalVolumeMount, volumeMountName) + } + + } + + numMountsCheck := len(tt.podSpec.Containers[0].VolumeMounts) + + if numMountsCheck != numMounts+1 { + t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v", + numMountsCheck, numMounts+1) + } + } + + for _, tt := range tests { + // Test with additional volume mounted only in first container + additionalVolumeMount := []acidv1.AdditionalVolume{ + { + Name: "test", + MountPath: "/test", + TargetContainers: []string{"postgres"}, + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }, + } + + numMounts := len(tt.podSpec.Containers[0].VolumeMounts) + + cluster.addAdditionalVolumes(tt.podSpec, additionalVolumeMount) + volumeName := tt.podSpec.Volumes[tt.volumePos].Name + + if volumeName != additionalVolumeMount[0].Name { + t.Errorf("%s %s: Expected volume %v was not created, have %s instead", + testName, tt.subTest, additionalVolumeMount, volumeName) + } + + for _, container := range tt.podSpec.Containers { + if container.Name == "postgres" { + volumeMountName := container.VolumeMounts[tt.volumePos].Name + + if volumeMountName != additionalVolumeMount[0].Name { + t.Errorf("%s %s: Expected mount %v was not created, have %s instead", + testName, tt.subTest, additionalVolumeMount, volumeMountName) + } + + numMountsCheck := len(container.VolumeMounts) + if numMountsCheck != numMounts+1 { + t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v", + numMountsCheck, numMounts+1) + } + } else { + numMountsCheck := len(container.VolumeMounts) + if numMountsCheck == numMounts+1 { + t.Errorf("Unexpected number of VolumeMounts: got %v instead of %v", + numMountsCheck, numMounts) + } + } + } + } +} diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 9991621cc..a734e4835 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -294,6 +294,27 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { return pod, nil } +func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { + + /* + Operator should not re-create pods if there is at least one replica being bootstrapped + because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag). + + XXX operator cannot forbid replica re-init, so we might still fail if re-init is started + after this check succeeds but before a pod is re-created + */ + + for _, pod := range pods.Items { + state, err := c.patroni.GetPatroniMemberState(&pod) + if err != nil || state == "creating replica" { + c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name) + return false + } + + } + return true +} + func (c *Cluster) recreatePods() error { c.setProcessName("starting to recreate pods") ls := c.labelsSet(false) @@ -309,6 +330,10 @@ func (c *Cluster) recreatePods() error { } c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items)) + if !c.isSafeToRecreatePods(pods) { + return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initilalized") + } + var ( masterPod, newMasterPod, newPod *v1.Pod ) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 4c341aefe..b38458af8 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -105,7 +105,12 @@ func (c *Cluster) createConnectionPooler(lookup InstallFunction) (*ConnectionPoo var msg string c.setProcessName("creating connection pooler") + if c.ConnectionPooler == nil { + c.ConnectionPooler = &ConnectionPoolerObjects{} + } + schema := c.Spec.ConnectionPooler.Schema + if schema == "" { schema = c.OpConfig.ConnectionPooler.Schema } diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go index 2db807b38..9739cc354 100644 --- a/pkg/cluster/resources_test.go +++ b/pkg/cluster/resources_test.go @@ -36,7 +36,7 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ @@ -85,7 +85,7 @@ func TestNeedConnectionPooler(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) cluster.Spec = acidv1.PostgresSpec{ ConnectionPooler: &acidv1.ConnectionPooler{}, diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 9a946bc34..0eb02631c 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -111,7 +111,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } // sync connection pooler - if err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil { + if _, err = c.syncConnectionPooler(&oldSpec, newSpec, c.installLookupFunction); err != nil { return fmt.Errorf("could not sync connection pooler: %v", err) } @@ -122,10 +122,11 @@ func (c *Cluster) syncServices() error { for _, role := range []PostgresRole{Master, Replica} { c.logger.Debugf("syncing %s service", role) - if err := c.syncEndpoint(role); err != nil { - return fmt.Errorf("could not sync %s endpoint: %v", role, err) + if !c.patroniKubernetesUseConfigMaps() { + if err := c.syncEndpoint(role); err != nil { + return fmt.Errorf("could not sync %s endpoint: %v", role, err) + } } - if err := c.syncService(role); err != nil { return fmt.Errorf("could not sync %s service: %v", role, err) } @@ -377,10 +378,12 @@ func (c *Cluster) syncStatefulSet() error { // statefulset or those that got their configuration from the outdated statefulset) if podsRollingUpdateRequired { c.logger.Debugln("performing rolling update") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") if err := c.recreatePods(); err != nil { return fmt.Errorf("could not recreate pods: %v", err) } c.logger.Infof("pods have been recreated") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil { c.logger.Warningf("could not clear rolling update for the statefulset: %v", err) } @@ -655,7 +658,13 @@ func (c *Cluster) syncLogicalBackupJob() error { return nil } -func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, lookup InstallFunction) error { +func (c *Cluster) syncConnectionPooler(oldSpec, + newSpec *acidv1.Postgresql, + lookup InstallFunction) (SyncReason, error) { + + var reason SyncReason + var err error + if c.ConnectionPooler == nil { c.ConnectionPooler = &ConnectionPoolerObjects{} } @@ -692,20 +701,20 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, look specUser, c.OpConfig.ConnectionPooler.User) - if err := lookup(schema, user); err != nil { - return err + if err = lookup(schema, user); err != nil { + return NoSync, err } } - if err := c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil { + if reason, err = c.syncConnectionPoolerWorker(oldSpec, newSpec); err != nil { c.logger.Errorf("could not sync connection pooler: %v", err) - return err + return reason, err } } if oldNeedConnectionPooler && !newNeedConnectionPooler { // delete and cleanup resources - if err := c.deleteConnectionPooler(); err != nil { + if err = c.deleteConnectionPooler(); err != nil { c.logger.Warningf("could not remove connection pooler: %v", err) } } @@ -716,20 +725,22 @@ func (c *Cluster) syncConnectionPooler(oldSpec, newSpec *acidv1.Postgresql, look (c.ConnectionPooler.Deployment != nil || c.ConnectionPooler.Service != nil) { - if err := c.deleteConnectionPooler(); err != nil { + if err = c.deleteConnectionPooler(); err != nil { c.logger.Warningf("could not remove connection pooler: %v", err) } } } - return nil + return reason, nil } // Synchronize connection pooler resources. Effectively we're interested only in // synchronizing the corresponding deployment, but in case of deployment or // service is missing, create it. After checking, also remember an object for // the future references. -func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) error { +func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql) ( + SyncReason, error) { + deployment, err := c.KubeClient. Deployments(c.Namespace). Get(context.TODO(), c.connectionPoolerName(), metav1.GetOptions{}) @@ -741,7 +752,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql deploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec) if err != nil { msg = "could not generate deployment for connection pooler: %v" - return fmt.Errorf(msg, err) + return NoSync, fmt.Errorf(msg, err) } deployment, err := c.KubeClient. @@ -749,18 +760,35 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) if err != nil { - return err + return NoSync, err } c.ConnectionPooler.Deployment = deployment } else if err != nil { - return fmt.Errorf("could not get connection pooler deployment to sync: %v", err) + msg := "could not get connection pooler deployment to sync: %v" + return NoSync, fmt.Errorf(msg, err) } else { c.ConnectionPooler.Deployment = deployment // actual synchronization oldConnectionPooler := oldSpec.Spec.ConnectionPooler newConnectionPooler := newSpec.Spec.ConnectionPooler + + // sync implementation below assumes that both old and new specs are + // not nil, but it can happen. To avoid any confusion like updating a + // deployment because the specification changed from nil to an empty + // struct (that was initialized somewhere before) replace any nil with + // an empty spec. + if oldConnectionPooler == nil { + oldConnectionPooler = &acidv1.ConnectionPooler{} + } + + if newConnectionPooler == nil { + newConnectionPooler = &acidv1.ConnectionPooler{} + } + + c.logger.Infof("Old: %+v, New %+v", oldConnectionPooler, newConnectionPooler) + specSync, specReason := c.needSyncConnectionPoolerSpecs(oldConnectionPooler, newConnectionPooler) defaultsSync, defaultsReason := c.needSyncConnectionPoolerDefaults(newConnectionPooler, deployment) reason := append(specReason, defaultsReason...) @@ -771,7 +799,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql newDeploymentSpec, err := c.generateConnectionPoolerDeployment(&newSpec.Spec) if err != nil { msg := "could not generate deployment for connection pooler: %v" - return fmt.Errorf(msg, err) + return reason, fmt.Errorf(msg, err) } oldDeploymentSpec := c.ConnectionPooler.Deployment @@ -781,11 +809,11 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql newDeploymentSpec) if err != nil { - return err + return reason, err } c.ConnectionPooler.Deployment = deployment - return nil + return reason, nil } } @@ -803,16 +831,17 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { - return err + return NoSync, err } c.ConnectionPooler.Service = service } else if err != nil { - return fmt.Errorf("could not get connection pooler service to sync: %v", err) + msg := "could not get connection pooler service to sync: %v" + return NoSync, fmt.Errorf(msg, err) } else { // Service updates are not supported and probably not that useful anyway c.ConnectionPooler.Service = service } - return nil + return NoSync, nil } diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index 45355cca3..3a7317938 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -2,6 +2,7 @@ package cluster import ( "fmt" + "strings" "testing" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" @@ -17,7 +18,7 @@ func int32ToPointer(value int32) *int32 { return &value } -func deploymentUpdated(cluster *Cluster, err error) error { +func deploymentUpdated(cluster *Cluster, err error, reason SyncReason) error { if cluster.ConnectionPooler.Deployment.Spec.Replicas == nil || *cluster.ConnectionPooler.Deployment.Spec.Replicas != 2 { return fmt.Errorf("Wrong nubmer of instances") @@ -26,7 +27,7 @@ func deploymentUpdated(cluster *Cluster, err error) error { return nil } -func objectsAreSaved(cluster *Cluster, err error) error { +func objectsAreSaved(cluster *Cluster, err error, reason SyncReason) error { if cluster.ConnectionPooler == nil { return fmt.Errorf("Connection pooler resources are empty") } @@ -42,7 +43,7 @@ func objectsAreSaved(cluster *Cluster, err error) error { return nil } -func objectsAreDeleted(cluster *Cluster, err error) error { +func objectsAreDeleted(cluster *Cluster, err error, reason SyncReason) error { if cluster.ConnectionPooler != nil { return fmt.Errorf("Connection pooler was not deleted") } @@ -50,6 +51,16 @@ func objectsAreDeleted(cluster *Cluster, err error) error { return nil } +func noEmptySync(cluster *Cluster, err error, reason SyncReason) error { + for _, msg := range reason { + if strings.HasPrefix(msg, "update [] from '' to '") { + return fmt.Errorf("There is an empty reason, %s", msg) + } + } + + return nil +} + func TestConnectionPoolerSynchronization(t *testing.T) { testName := "Test connection pooler synchronization" var cluster = New( @@ -68,7 +79,7 @@ func TestConnectionPoolerSynchronization(t *testing.T) { NumberOfInstances: int32ToPointer(1), }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ @@ -91,15 +102,15 @@ func TestConnectionPoolerSynchronization(t *testing.T) { clusterNewDefaultsMock := *cluster clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() - cluster.OpConfig.ConnectionPooler.Image = "pooler:2.0" - cluster.OpConfig.ConnectionPooler.NumberOfInstances = int32ToPointer(2) tests := []struct { - subTest string - oldSpec *acidv1.Postgresql - newSpec *acidv1.Postgresql - cluster *Cluster - check func(cluster *Cluster, err error) error + subTest string + oldSpec *acidv1.Postgresql + newSpec *acidv1.Postgresql + cluster *Cluster + defaultImage string + defaultInstances int32 + check func(cluster *Cluster, err error, reason SyncReason) error }{ { subTest: "create if doesn't exist", @@ -113,8 +124,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) { ConnectionPooler: &acidv1.ConnectionPooler{}, }, }, - cluster: &clusterMissingObjects, - check: objectsAreSaved, + cluster: &clusterMissingObjects, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreSaved, }, { subTest: "create if doesn't exist with a flag", @@ -126,8 +139,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) { EnableConnectionPooler: boolToPointer(true), }, }, - cluster: &clusterMissingObjects, - check: objectsAreSaved, + cluster: &clusterMissingObjects, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreSaved, }, { subTest: "create from scratch", @@ -139,8 +154,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) { ConnectionPooler: &acidv1.ConnectionPooler{}, }, }, - cluster: &clusterMissingObjects, - check: objectsAreSaved, + cluster: &clusterMissingObjects, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreSaved, }, { subTest: "delete if not needed", @@ -152,8 +169,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) { newSpec: &acidv1.Postgresql{ Spec: acidv1.PostgresSpec{}, }, - cluster: &clusterMock, - check: objectsAreDeleted, + cluster: &clusterMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreDeleted, }, { subTest: "cleanup if still there", @@ -163,8 +182,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) { newSpec: &acidv1.Postgresql{ Spec: acidv1.PostgresSpec{}, }, - cluster: &clusterDirtyMock, - check: objectsAreDeleted, + cluster: &clusterDirtyMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: objectsAreDeleted, }, { subTest: "update deployment", @@ -182,8 +203,10 @@ func TestConnectionPoolerSynchronization(t *testing.T) { }, }, }, - cluster: &clusterMock, - check: deploymentUpdated, + cluster: &clusterMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: deploymentUpdated, }, { subTest: "update image from changed defaults", @@ -197,14 +220,40 @@ func TestConnectionPoolerSynchronization(t *testing.T) { ConnectionPooler: &acidv1.ConnectionPooler{}, }, }, - cluster: &clusterNewDefaultsMock, - check: deploymentUpdated, + cluster: &clusterNewDefaultsMock, + defaultImage: "pooler:2.0", + defaultInstances: 2, + check: deploymentUpdated, + }, + { + subTest: "there is no sync from nil to an empty spec", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + ConnectionPooler: nil, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPooler: boolToPointer(true), + ConnectionPooler: &acidv1.ConnectionPooler{}, + }, + }, + cluster: &clusterMock, + defaultImage: "pooler:1.0", + defaultInstances: 1, + check: noEmptySync, }, } for _, tt := range tests { - err := tt.cluster.syncConnectionPooler(tt.oldSpec, tt.newSpec, mockInstallLookupFunction) + tt.cluster.OpConfig.ConnectionPooler.Image = tt.defaultImage + tt.cluster.OpConfig.ConnectionPooler.NumberOfInstances = + int32ToPointer(tt.defaultInstances) - if err := tt.check(tt.cluster, err); err != nil { + reason, err := tt.cluster.syncConnectionPooler(tt.oldSpec, + tt.newSpec, mockInstallLookupFunction) + + if err := tt.check(tt.cluster, err, reason); err != nil { t.Errorf("%s [%s]: Could not synchronize, %+v", testName, tt.subTest, err) } diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 04d00cb58..199914ccc 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -73,3 +73,8 @@ type ClusterStatus struct { type TemplateParams map[string]interface{} type InstallFunction func(schema string, user string) error + +type SyncReason []string + +// no sync happened, empty value +var NoSync SyncReason = []string{} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9c48b7ef2..4e4685379 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -7,24 +7,24 @@ import ( "sync" "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/cache" - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/apiserver" "github.com/zalando/postgres-operator/pkg/cluster" + acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/ringlog" - - acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" + v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" ) // Controller represents operator controller @@ -36,6 +36,9 @@ type Controller struct { KubeClient k8sutil.KubernetesClient apiserver *apiserver.Server + eventRecorder record.EventRecorder + eventBroadcaster record.EventBroadcaster + stopCh chan struct{} controllerID string @@ -67,10 +70,21 @@ type Controller struct { func NewController(controllerConfig *spec.ControllerConfig, controllerId string) *Controller { logger := logrus.New() + var myComponentName = "postgres-operator" + if controllerId != "" { + myComponentName += "/" + controllerId + } + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(logger.Debugf) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: myComponentName}) + c := &Controller{ config: *controllerConfig, opConfig: &config.Config{}, logger: logger.WithField("pkg", "controller"), + eventRecorder: recorder, + eventBroadcaster: eventBroadcaster, controllerID: controllerId, curWorkerCluster: sync.Map{}, clusterWorkers: make(map[spec.NamespacedName]uint32), @@ -93,6 +107,11 @@ func (c *Controller) initClients() { if err != nil { c.logger.Fatalf("could not create kubernetes clients: %v", err) } + c.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.KubeClient.EventsGetter.Events("")}) + if err != nil { + c.logger.Fatalf("could not setup kubernetes event sink: %v", err) + } + } func (c *Controller) initOperatorConfig() { diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index c91c34604..af30b42f0 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -170,6 +170,11 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur fromCRD.ConnectionPooler.User, constants.ConnectionPoolerUserName) + if result.ConnectionPooler.User == result.SuperUsername { + msg := "Connection pool user is not allowed to be the same as super user, username: %s" + panic(fmt.Errorf(msg, result.ConnectionPooler.User)) + } + result.ConnectionPooler.Image = util.Coalesce( fromCRD.ConnectionPooler.Image, "registry.opensource.zalan.do/acid/pgbouncer") diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index e81671c7d..2a9e1b650 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" @@ -157,7 +158,7 @@ func (c *Controller) acquireInitialListOfClusters() error { } func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) *cluster.Cluster { - cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg) + cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder) cl.Run(c.stopCh) teamName := strings.ToLower(cl.Spec.TeamID) @@ -236,6 +237,7 @@ func (c *Controller) processEvent(event ClusterEvent) { if err := cl.Create(); err != nil { cl.Error = fmt.Sprintf("could not create cluster: %v", err) lg.Error(cl.Error) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error) return } @@ -274,6 +276,8 @@ func (c *Controller) processEvent(event ClusterEvent) { c.curWorkerCluster.Store(event.WorkerID, cl) cl.Delete() + // Fixme - no error handling for delete ? + // c.eventRecorder.Eventf(cl.GetReference, v1.EventTypeWarning, "Delete", "%v", cl.Error) func() { defer c.clustersMu.Unlock() @@ -304,6 +308,7 @@ func (c *Controller) processEvent(event ClusterEvent) { c.curWorkerCluster.Store(event.WorkerID, cl) if err := cl.Sync(event.NewSpec); err != nil { cl.Error = fmt.Sprintf("could not sync cluster: %v", err) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error) lg.Error(cl.Error) return } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 249ef7d78..a56d6bfec 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -219,5 +219,11 @@ func validate(cfg *Config) (err error) { msg := "number of connection pooler instances should be higher than %d" err = fmt.Errorf(msg, constants.ConnectionPoolerMinInstances) } + + if cfg.ConnectionPooler.User == cfg.SuperUsername { + msg := "Connection pool user is not allowed to be the same as super user, username: %s" + err = fmt.Errorf(msg, cfg.ConnectionPooler.User) + } + return } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 3a397af7d..d7be2f48a 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -45,6 +45,7 @@ type KubernetesClient struct { corev1.NodesGetter corev1.NamespacesGetter corev1.ServiceAccountsGetter + corev1.EventsGetter appsv1.StatefulSetsGetter appsv1.DeploymentsGetter rbacv1.RoleBindingsGetter @@ -142,6 +143,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RoleBindingsGetter = client.RbacV1() kubeClient.CronJobsGetter = client.BatchV1beta1() + kubeClient.EventsGetter = client.CoreV1() apiextClient, err := apiextclient.NewForConfig(cfg) if err != nil { diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index bdd96f048..53065e599 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -3,6 +3,7 @@ package patroni import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "net" @@ -11,7 +12,7 @@ import ( "time" "github.com/sirupsen/logrus" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) const ( @@ -25,6 +26,7 @@ const ( type Interface interface { Switchover(master *v1.Pod, candidate string) error SetPostgresParameters(server *v1.Pod, options map[string]string) error + GetPatroniMemberState(pod *v1.Pod) (string, error) } // Patroni API client @@ -123,3 +125,36 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st } return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) } + +//GetPatroniMemberState returns a state of member of a Patroni cluster +func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) { + + apiURLString, err := apiURL(server) + if err != nil { + return "", err + } + response, err := p.httpClient.Get(apiURLString) + if err != nil { + return "", fmt.Errorf("could not perform Get request: %v", err) + } + defer response.Body.Close() + + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return "", fmt.Errorf("could not read response: %v", err) + } + + data := make(map[string]interface{}) + err = json.Unmarshal(body, &data) + if err != nil { + return "", err + } + + state, ok := data["state"].(string) + if !ok { + return "", errors.New("Patroni Get call response contains wrong type for 'state' field") + } + + return state, nil + +}