diff --git a/.gitignore b/.gitignore index b407c62f1..0fdb50756 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ _testmain.go /vendor/ /build/ /docker/build/ +/github.com/ .idea scm-source.json diff --git a/charts/postgres-operator/crds/operatorconfigurations.yaml b/charts/postgres-operator/crds/operatorconfigurations.yaml index 7e20c8fea..285d99b40 100644 --- a/charts/postgres-operator/crds/operatorconfigurations.yaml +++ b/charts/postgres-operator/crds/operatorconfigurations.yaml @@ -84,6 +84,12 @@ spec: type: object additionalProperties: type: string + sidecars: + type: array + nullable: true + items: + type: object + additionalProperties: true workers: type: integer minimum: 1 diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index 38ce85e7a..0defcab41 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -42,6 +42,13 @@ rules: - configmaps verbs: - get +# to send events to the CRs +- apiGroups: + - "" + resources: + - events + verbs: + - create # to manage endpoints which are also used by Patroni - apiGroups: - "" diff --git a/docs/administrator.md b/docs/administrator.md index 93adf2eb1..158b733ad 100644 --- a/docs/administrator.md +++ b/docs/administrator.md @@ -507,6 +507,33 @@ A secret can be pre-provisioned in different ways: * Automatically provisioned via a custom K8s controller like [kube-aws-iam-controller](https://github.com/mikkeloscar/kube-aws-iam-controller) +## Sidecars for Postgres clusters + +A list of sidecars is added to each cluster created by the +operator. The default is empty list. + + +```yaml +kind: OperatorConfiguration +configuration: + sidecars: + - image: image:123 + name: global-sidecar + ports: + - containerPort: 80 + volumeMounts: + - mountPath: /custom-pgdata-mountpoint + name: pgdata + - ... +``` + +In addition to any environment variables you specify, the following environment variables are always passed to sidecars: + + - `POD_NAME` - field reference to `metadata.name` + - `POD_NAMESPACE` - field reference to `metadata.namespace` + - `POSTGRES_USER` - the superuser that can be used to connect to the database + - `POSTGRES_PASSWORD` - the password for the superuser + ## Setting up the Postgres Operator UI Since the v1.2 release the Postgres Operator is shipped with a browser-based diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index 3d31abab4..259f04527 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -93,9 +93,17 @@ Those are top-level keys, containing both leaf keys and groups. repository](https://github.com/zalando/spilo). * **sidecar_docker_images** - a map of sidecar names to Docker images to run with Spilo. In case of the name - conflict with the definition in the cluster manifest the cluster-specific one - is preferred. + *deprecated*: use **sidecars** instead. A map of sidecar names to Docker images to + run with Spilo. In case of the name conflict with the definition in the cluster + manifest the cluster-specific one is preferred. + +* **sidecars** + a list of sidecars to run with Spilo, for any cluster (i.e. globally defined sidecars). + Each item in the list is of type + [Container](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#container-v1-core). + Globally defined sidecars can be overwritten by specifying a sidecar in the custom resource with + the same name. Note: This field is not part of the schema validation. If the container specification + is invalid, then the operator fails to create the statefulset. * **enable_shm_volume** Instruct operator to start any new database pod without limitations on shm @@ -133,8 +141,9 @@ Those are top-level keys, containing both leaf keys and groups. at the cost of overprovisioning memory and potential scheduling problems for containers with high memory limits due to the lack of memory on Kubernetes cluster nodes. This affects all containers created by the operator (Postgres, - Scalyr sidecar, and other sidecars); to set resources for the operator's own - container, change the [operator deployment manually](../../manifests/postgres-operator.yaml#L20). + Scalyr sidecar, and other sidecars except **sidecars** defined in the operator + configuration); to set resources for the operator's own container, change the + [operator deployment manually](../../manifests/postgres-operator.yaml#L20). The default is `false`. ## Postgres users @@ -206,12 +215,12 @@ configuration they are grouped under the `kubernetes` key. Default is true. * **enable_init_containers** - global option to allow for creating init containers to run actions before - Spilo is started. Default is true. + global option to allow for creating init containers in the cluster manifest to + run actions before Spilo is started. Default is true. * **enable_sidecars** - global option to allow for creating sidecar containers to run alongside Spilo - on the same pod. Default is true. + global option to allow for creating sidecar containers in the cluster manifest + to run alongside Spilo on the same pod. Globally defined sidecars are always enabled. Default is true. * **secret_name_template** a template for the name of the database user secrets generated by the diff --git a/docs/user.md b/docs/user.md index 2c1c4fd1f..d7e6add0a 100644 --- a/docs/user.md +++ b/docs/user.md @@ -442,6 +442,8 @@ The PostgreSQL volume is shared with sidecars and is mounted at specified but globally disabled in the configuration. The `enable_sidecars` option must be set to `true`. +If you want to add a sidecar to every cluster managed by the operator, you can specify it in the [operator configuration](administrator.md#sidecars-for-postgres-clusters) instead. + ## InitContainers Support Each cluster can specify arbitrary init containers to run. These containers can diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 445067d61..f46c0577e 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -344,7 +344,6 @@ class EndToEndTestCase(unittest.TestCase): ''' k8s = self.k8s cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' - labels = 'spilo-role=master,' + cluster_label readiness_label = 'lifecycle-status' readiness_value = 'ready' @@ -709,14 +708,16 @@ class K8s: def wait_for_logical_backup_job_creation(self): self.wait_for_logical_backup_job(expected_num_of_jobs=1) - def update_config(self, config_map_patch): - self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) - + def delete_operator_pod(self): operator_pod = self.api.core_v1.list_namespaced_pod( 'default', label_selector="name=postgres-operator").items[0].metadata.name self.api.core_v1.delete_namespaced_pod(operator_pod, "default") # restart reloads the conf self.wait_for_operator_pod_start() + def update_config(self, config_map_patch): + self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch) + self.delete_operator_pod() + def create_with_kubectl(self, path): return subprocess.run( ["kubectl", "create", "-f", path], diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index b469a7564..e701fdfaa 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -31,28 +31,28 @@ spec: size: 1Gi # storageClass: my-sc additionalVolumes: - - name: data - mountPath: /home/postgres/pgdata/partitions - targetContainers: - - postgres - volumeSource: - PersistentVolumeClaim: - claimName: pvc-postgresql-data-partitions - readyOnly: false - - name: conf - mountPath: /etc/telegraf - subPath: telegraf.conf - targetContainers: - - telegraf-sidecar - volumeSource: - configMap: - name: my-config-map - name: empty mountPath: /opt/empty targetContainers: - all volumeSource: emptyDir: {} +# - name: data +# mountPath: /home/postgres/pgdata/partitions +# targetContainers: +# - postgres +# volumeSource: +# PersistentVolumeClaim: +# claimName: pvc-postgresql-data-partitions +# readyOnly: false +# - name: conf +# mountPath: /etc/telegraf +# subPath: telegraf.conf +# targetContainers: +# - telegraf-sidecar +# volumeSource: +# configMap: +# name: my-config-map enableShmVolume: true # spiloFSGroup: 103 diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index 83cd721e7..667941a24 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -43,6 +43,13 @@ rules: - configmaps verbs: - get +# to send events to the CRs +- apiGroups: + - "" + resources: + - events + verbs: + - create # to manage endpoints which are also used by Patroni - apiGroups: - "" diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index 86051e43b..b2496c9c9 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -60,6 +60,12 @@ spec: type: object additionalProperties: type: string + sidecars: + type: array + nullable: true + items: + type: object + additionalProperties: true workers: type: integer minimum: 1 diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index 209e2684b..e80bfa846 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -13,8 +13,11 @@ configuration: resync_period: 30m repair_period: 5m # set_memory_request_to_limit: false - # sidecar_docker_images: - # example: "exampleimage:exampletag" + # sidecars: + # - image: image:123 + # name: global-sidecar-1 + # ports: + # - containerPort: 80 workers: 4 users: replication_username: standby diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index 090b83821..fe18392cc 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -797,6 +797,17 @@ var OperatorConfigCRDResourceValidation = apiextv1beta1.CustomResourceValidation }, }, }, + "sidecars": { + Type: "array", + Items: &apiextv1beta1.JSONSchemaPropsOrArray{ + Schema: &apiextv1beta1.JSONSchemaProps{ + Type: "object", + AdditionalProperties: &apiextv1beta1.JSONSchemaPropsOrBool{ + Allows: true, + }, + }, + }, + }, "workers": { Type: "integer", Minimum: &min1, diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index 587e8152d..5411199ca 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -8,6 +8,7 @@ import ( "time" "github.com/zalando/postgres-operator/pkg/spec" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -194,6 +195,7 @@ type OperatorConfigurationData struct { SetMemoryRequestToLimit bool `json:"set_memory_request_to_limit,omitempty"` ShmVolume *bool `json:"enable_shm_volume,omitempty"` Sidecars map[string]string `json:"sidecar_docker_images,omitempty"` + SidecarContainers []v1.Container `json:"sidecars,omitempty"` PostgresUsersConfiguration PostgresUsersConfiguration `json:"users"` Kubernetes KubernetesMetaConfiguration `json:"kubernetes"` PostgresPodResources PostgresPodResourcesDefaults `json:"postgres_pod_resources"` diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 961051c8d..e36009208 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -104,7 +104,7 @@ type AdditionalVolume struct { MountPath string `json:"mountPath"` SubPath string `json:"subPath"` TargetContainers []string `json:"targetContainers"` - VolumeSource v1.VolumeSource `json:"volume"` + VolumeSource v1.VolumeSource `json:"volumeSource"` } // PostgresqlParam describes PostgreSQL version and pairs of configuration parameter name - values. diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 2e18a5b1f..b3424bf7f 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -317,13 +317,20 @@ func (in *OperatorConfigurationData) DeepCopyInto(out *OperatorConfigurationData *out = new(bool) **out = **in } - if in.Sidecars != nil { - in, out := &in.Sidecars, &out.Sidecars + if in.SidecarImages != nil { + in, out := &in.SidecarImages, &out.SidecarImages *out = make(map[string]string, len(*in)) for key, val := range *in { (*out)[key] = val } } + if in.SidecarContainers != nil { + in, out := &in.SidecarContainers, &out.SidecarContainers + *out = make([]corev1.Container, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } out.PostgresUsersConfiguration = in.PostgresUsersConfiguration in.Kubernetes.DeepCopyInto(&out.Kubernetes) out.PostgresPodResources = in.PostgresPodResources diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 74cf6e61d..244916074 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -21,8 +21,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/reference" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" @@ -81,6 +84,7 @@ type Cluster struct { acidv1.Postgresql Config logger *logrus.Entry + eventRecorder record.EventRecorder patroni patroni.Interface pgUsers map[string]spec.PgUser systemUsers map[string]spec.PgUser @@ -109,7 +113,7 @@ type compareStatefulsetResult struct { } // New creates a new cluster. This function should be called from a controller. -func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry) *Cluster { +func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster { deletePropagationPolicy := metav1.DeletePropagationOrphan podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { @@ -140,7 +144,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger) cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName) cluster.patroni = patroni.New(cluster.logger) - + cluster.eventRecorder = eventRecorder return cluster } @@ -166,6 +170,16 @@ func (c *Cluster) setProcessName(procName string, args ...interface{}) { } } +// GetReference of Postgres CR object +// i.e. required to emit events to this resource +func (c *Cluster) GetReference() *v1.ObjectReference { + ref, err := reference.GetReference(scheme.Scheme, &c.Postgresql) + if err != nil { + c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", c.Postgresql.Namespace, c.Postgresql.Name, err) + } + return ref +} + // SetStatus of Postgres cluster // TODO: eventually switch to updateStatus() for kubernetes 1.11 and above func (c *Cluster) setStatus(status string) { @@ -245,6 +259,7 @@ func (c *Cluster) Create() error { }() c.setStatus(acidv1.ClusterStatusCreating) + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources") if err = c.enforceMinResourceLimits(&c.Spec); err != nil { return fmt.Errorf("could not enforce minimum resource limits: %v", err) @@ -263,6 +278,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create %s endpoint: %v", role, err) } c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Endpoints", "Endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) } if c.Services[role] != nil { @@ -273,6 +289,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create %s service: %v", role, err) } c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Services", "The service %q for role %s has been successfully created", util.NameFromMeta(service.ObjectMeta), role) } if err = c.initUsers(); err != nil { @@ -284,6 +301,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create secrets: %v", err) } c.logger.Infof("secrets have been successfully created") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created") if c.PodDisruptionBudget != nil { return fmt.Errorf("pod disruption budget already exists in the cluster") @@ -302,6 +320,7 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not create statefulset: %v", err) } c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Info("waiting for the cluster being ready") @@ -310,6 +329,7 @@ func (c *Cluster) Create() error { return err } c.logger.Infof("pods are ready") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready") // create database objects unless we are running without pods or disabled // that feature explicitly @@ -555,6 +575,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error { } if isSmaller { c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit) spec.Resources.ResourceLimits.CPU = minCPULimit } } @@ -567,6 +588,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error { } if isSmaller { c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit) spec.Resources.ResourceLimits.Memory = minMemoryLimit } } @@ -598,6 +620,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { if oldSpec.Spec.PostgresqlParam.PgVersion != newSpec.Spec.PostgresqlParam.PgVersion { // PG versions comparison c.logger.Warningf("postgresql version change(%q -> %q) has no effect", oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect", + oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion) //we need that hack to generate statefulset with the old version newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion } @@ -757,6 +781,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { func (c *Cluster) Delete() { c.mu.Lock() defer c.mu.Unlock() + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources") // delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods // deleting the cron job also removes pods and batch jobs it created @@ -1095,6 +1120,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e var err error c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate) var wg sync.WaitGroup @@ -1121,6 +1147,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil { c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate) if err = <-podLabelErr; err != nil { err = fmt.Errorf("could not get master pod label: %v", err) } @@ -1136,6 +1163,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e // close the label waiting channel no sooner than the waiting goroutine terminates. close(podLabelErr) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err) return err } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 432f53132..84ec04e3e 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -13,6 +13,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/teams" v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" ) const ( @@ -21,6 +22,8 @@ const ( ) var logger = logrus.New().WithField("test", "cluster") +var eventRecorder = record.NewFakeRecorder(1) + var cl = New( Config{ OpConfig: config.Config{ @@ -34,6 +37,7 @@ var cl = New( k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, + eventRecorder, ) func TestInitRobotUsers(t *testing.T) { diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 31bc744ea..3ce4196ad 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -463,8 +463,7 @@ func generateContainer( } func generateSidecarContainers(sidecars []acidv1.Sidecar, - volumeMounts []v1.VolumeMount, defaultResources acidv1.Resources, - superUserName string, credentialsSecretName string, logger *logrus.Entry) ([]v1.Container, error) { + defaultResources acidv1.Resources, startIndex int, logger *logrus.Entry) ([]v1.Container, error) { if len(sidecars) > 0 { result := make([]v1.Container, 0) @@ -483,7 +482,7 @@ func generateSidecarContainers(sidecars []acidv1.Sidecar, return nil, err } - sc := getSidecarContainer(sidecar, index, volumeMounts, resources, superUserName, credentialsSecretName, logger) + sc := getSidecarContainer(sidecar, startIndex+index, resources) result = append(result, *sc) } return result, nil @@ -491,6 +490,55 @@ func generateSidecarContainers(sidecars []acidv1.Sidecar, return nil, nil } +// adds common fields to sidecars +func patchSidecarContainers(in []v1.Container, volumeMounts []v1.VolumeMount, superUserName string, credentialsSecretName string, logger *logrus.Entry) []v1.Container { + result := []v1.Container{} + + for _, container := range in { + container.VolumeMounts = append(container.VolumeMounts, volumeMounts...) + env := []v1.EnvVar{ + { + Name: "POD_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "POSTGRES_USER", + Value: superUserName, + }, + { + Name: "POSTGRES_PASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: credentialsSecretName, + }, + Key: "password", + }, + }, + }, + } + mergedEnv := append(container.Env, env...) + container.Env = deduplicateEnvVars(mergedEnv, container.Name, logger) + result = append(result, container) + } + + return result +} + // Check whether or not we're requested to mount an shm volume, // taking into account that PostgreSQL manifest has precedence. func mountShmVolumeNeeded(opConfig config.Config, spec *acidv1.PostgresSpec) *bool { @@ -725,58 +773,18 @@ func deduplicateEnvVars(input []v1.EnvVar, containerName string, logger *logrus. return result } -func getSidecarContainer(sidecar acidv1.Sidecar, index int, volumeMounts []v1.VolumeMount, - resources *v1.ResourceRequirements, superUserName string, credentialsSecretName string, logger *logrus.Entry) *v1.Container { +func getSidecarContainer(sidecar acidv1.Sidecar, index int, resources *v1.ResourceRequirements) *v1.Container { name := sidecar.Name if name == "" { name = fmt.Sprintf("sidecar-%d", index) } - env := []v1.EnvVar{ - { - Name: "POD_NAME", - ValueFrom: &v1.EnvVarSource{ - FieldRef: &v1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "metadata.name", - }, - }, - }, - { - Name: "POD_NAMESPACE", - ValueFrom: &v1.EnvVarSource{ - FieldRef: &v1.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "metadata.namespace", - }, - }, - }, - { - Name: "POSTGRES_USER", - Value: superUserName, - }, - { - Name: "POSTGRES_PASSWORD", - ValueFrom: &v1.EnvVarSource{ - SecretKeyRef: &v1.SecretKeySelector{ - LocalObjectReference: v1.LocalObjectReference{ - Name: credentialsSecretName, - }, - Key: "password", - }, - }, - }, - } - if len(sidecar.Env) > 0 { - env = append(env, sidecar.Env...) - } return &v1.Container{ Name: name, Image: sidecar.DockerImage, ImagePullPolicy: v1.PullIfNotPresent, Resources: *resources, - VolumeMounts: volumeMounts, - Env: deduplicateEnvVars(env, name, logger), + Env: sidecar.Env, Ports: sidecar.Ports, } } @@ -1066,37 +1074,63 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef c.OpConfig.Resources.SpiloPrivileged, ) - // resolve conflicts between operator-global and per-cluster sidecars - sideCars := c.mergeSidecars(spec.Sidecars) + // generate container specs for sidecars specified in the cluster manifest + clusterSpecificSidecars := []v1.Container{} + if spec.Sidecars != nil && len(spec.Sidecars) > 0 { + // warn if sidecars are defined, but globally disabled (does not apply to globally defined sidecars) + if c.OpConfig.EnableSidecars != nil && !(*c.OpConfig.EnableSidecars) { + c.logger.Warningf("sidecars specified but disabled in configuration - next statefulset creation would fail") + } - resourceRequirementsScalyrSidecar := makeResources( - c.OpConfig.ScalyrCPURequest, - c.OpConfig.ScalyrMemoryRequest, - c.OpConfig.ScalyrCPULimit, - c.OpConfig.ScalyrMemoryLimit, - ) + if clusterSpecificSidecars, err = generateSidecarContainers(spec.Sidecars, defaultResources, 0, c.logger); err != nil { + return nil, fmt.Errorf("could not generate sidecar containers: %v", err) + } + } + + // decrapted way of providing global sidecars + var globalSidecarContainersByDockerImage []v1.Container + var globalSidecarsByDockerImage []acidv1.Sidecar + for name, dockerImage := range c.OpConfig.SidecarImages { + globalSidecarsByDockerImage = append(globalSidecarsByDockerImage, acidv1.Sidecar{Name: name, DockerImage: dockerImage}) + } + if globalSidecarContainersByDockerImage, err = generateSidecarContainers(globalSidecarsByDockerImage, defaultResources, len(clusterSpecificSidecars), c.logger); err != nil { + return nil, fmt.Errorf("could not generate sidecar containers: %v", err) + } + // make the resulting list reproducible + // c.OpConfig.SidecarImages is unsorted by Golang definition + // .Name is unique + sort.Slice(globalSidecarContainersByDockerImage, func(i, j int) bool { + return globalSidecarContainersByDockerImage[i].Name < globalSidecarContainersByDockerImage[j].Name + }) // generate scalyr sidecar container - if scalyrSidecar := + var scalyrSidecars []v1.Container + if scalyrSidecar, err := generateScalyrSidecarSpec(c.Name, c.OpConfig.ScalyrAPIKey, c.OpConfig.ScalyrServerURL, c.OpConfig.ScalyrImage, - &resourceRequirementsScalyrSidecar, c.logger); scalyrSidecar != nil { - sideCars = append(sideCars, *scalyrSidecar) + c.OpConfig.ScalyrCPURequest, + c.OpConfig.ScalyrMemoryRequest, + c.OpConfig.ScalyrCPULimit, + c.OpConfig.ScalyrMemoryLimit, + defaultResources, + c.logger); err != nil { + return nil, fmt.Errorf("could not generate Scalyr sidecar: %v", err) + } else { + if scalyrSidecar != nil { + scalyrSidecars = append(scalyrSidecars, *scalyrSidecar) + } } - // generate sidecar containers - if sideCars != nil && len(sideCars) > 0 { - if c.OpConfig.EnableSidecars != nil && !(*c.OpConfig.EnableSidecars) { - c.logger.Warningf("sidecars specified but disabled in configuration - next statefulset creation would fail") - } - if sidecarContainers, err = generateSidecarContainers(sideCars, volumeMounts, defaultResources, - c.OpConfig.SuperUsername, c.credentialSecretName(c.OpConfig.SuperUsername), c.logger); err != nil { - return nil, fmt.Errorf("could not generate sidecar containers: %v", err) - } + sidecarContainers, conflicts := mergeContainers(clusterSpecificSidecars, c.Config.OpConfig.SidecarContainers, globalSidecarContainersByDockerImage, scalyrSidecars) + for containerName := range conflicts { + c.logger.Warningf("a sidecar is specified twice. Ignoring sidecar %q in favor of %q with high a precendence", + containerName, containerName) } + sidecarContainers = patchSidecarContainers(sidecarContainers, volumeMounts, c.OpConfig.SuperUsername, c.credentialSecretName(c.OpConfig.SuperUsername), c.logger) + tolerationSpec := tolerations(&spec.Tolerations, c.OpConfig.PodToleration) effectivePodPriorityClassName := util.Coalesce(spec.PodPriorityClassName, c.OpConfig.PodPriorityClassName) @@ -1192,57 +1226,44 @@ func (c *Cluster) generatePodAnnotations(spec *acidv1.PostgresSpec) map[string]s } func generateScalyrSidecarSpec(clusterName, APIKey, serverURL, dockerImage string, - containerResources *acidv1.Resources, logger *logrus.Entry) *acidv1.Sidecar { + scalyrCPURequest string, scalyrMemoryRequest string, scalyrCPULimit string, scalyrMemoryLimit string, + defaultResources acidv1.Resources, logger *logrus.Entry) (*v1.Container, error) { if APIKey == "" || dockerImage == "" { if APIKey == "" && dockerImage != "" { logger.Warning("Not running Scalyr sidecar: SCALYR_API_KEY must be defined") } - return nil + return nil, nil } - scalarSpec := &acidv1.Sidecar{ - Name: "scalyr-sidecar", - DockerImage: dockerImage, - Env: []v1.EnvVar{ - { - Name: "SCALYR_API_KEY", - Value: APIKey, - }, - { - Name: "SCALYR_SERVER_HOST", - Value: clusterName, - }, + resourcesScalyrSidecar := makeResources( + scalyrCPURequest, + scalyrMemoryRequest, + scalyrCPULimit, + scalyrMemoryLimit, + ) + resourceRequirementsScalyrSidecar, err := generateResourceRequirements(resourcesScalyrSidecar, defaultResources) + if err != nil { + return nil, fmt.Errorf("invalid resources for Scalyr sidecar: %v", err) + } + env := []v1.EnvVar{ + { + Name: "SCALYR_API_KEY", + Value: APIKey, + }, + { + Name: "SCALYR_SERVER_HOST", + Value: clusterName, }, - Resources: *containerResources, } if serverURL != "" { - scalarSpec.Env = append(scalarSpec.Env, v1.EnvVar{Name: "SCALYR_SERVER_URL", Value: serverURL}) + env = append(env, v1.EnvVar{Name: "SCALYR_SERVER_URL", Value: serverURL}) } - return scalarSpec -} - -// mergeSidecar merges globally-defined sidecars with those defined in the cluster manifest -func (c *Cluster) mergeSidecars(sidecars []acidv1.Sidecar) []acidv1.Sidecar { - globalSidecarsToSkip := map[string]bool{} - result := make([]acidv1.Sidecar, 0) - - for i, sidecar := range sidecars { - dockerImage, ok := c.OpConfig.Sidecars[sidecar.Name] - if ok { - if dockerImage != sidecar.DockerImage { - c.logger.Warningf("merging definitions for sidecar %q: "+ - "ignoring %q in the global scope in favor of %q defined in the cluster", - sidecar.Name, dockerImage, sidecar.DockerImage) - } - globalSidecarsToSkip[sidecar.Name] = true - } - result = append(result, sidecars[i]) - } - for name, dockerImage := range c.OpConfig.Sidecars { - if !globalSidecarsToSkip[name] { - result = append(result, acidv1.Sidecar{Name: name, DockerImage: dockerImage}) - } - } - return result + return &v1.Container{ + Name: "scalyr-sidecar", + Image: dockerImage, + Env: env, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: *resourceRequirementsScalyrSidecar, + }, nil } func (c *Cluster) getNumberOfInstances(spec *acidv1.PostgresSpec) int32 { diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 6e4587627..d09a2c0aa 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -18,6 +18,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" ) @@ -37,7 +38,7 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) testName := "TestGenerateSpiloConfig" tests := []struct { @@ -102,7 +103,7 @@ func TestCreateLoadBalancerLogic(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) testName := "TestCreateLoadBalancerLogic" tests := []struct { @@ -164,7 +165,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-pdb", @@ -187,7 +189,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-pdb", @@ -210,7 +213,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-pdb", @@ -233,7 +237,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { acidv1.Postgresql{ ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, - logger), + logger, + eventRecorder), policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "postgres-myapp-database-databass-budget", @@ -368,7 +373,7 @@ func TestCloneEnv(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) for _, tt := range tests { envs := cluster.generateCloneEnvironment(tt.cloneOpts) @@ -502,7 +507,7 @@ func TestGetPgVersion(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) for _, tt := range tests { pgVersion, err := cluster.getNewPgVersion(tt.pgContainer, tt.newPgVersion) @@ -678,7 +683,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) var clusterNoDefaultRes = New( Config{ @@ -690,7 +695,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) { }, ConnectionPooler: config.ConnectionPooler{}, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } @@ -803,7 +808,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: "test-sts", @@ -904,7 +909,7 @@ func TestConnectionPoolerServiceSpec(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: "test-sts", @@ -990,7 +995,7 @@ func TestTLS(t *testing.T) { SpiloFSGroup: &spiloFSGroup, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) spec = makeSpec(acidv1.TLSDescription{SecretName: "my-secret", CAFile: "ca.crt"}) s, err := cluster.generateStatefulSet(&spec) if err != nil { @@ -1112,7 +1117,7 @@ func TestAdditionalVolume(t *testing.T) { ReplicationUsername: replicationUserName, }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) for _, tt := range tests { // Test with additional volume mounted in all containers @@ -1202,3 +1207,201 @@ func TestAdditionalVolume(t *testing.T) { } } } + +// inject sidecars through all available mechanisms and check the resulting container specs +func TestSidecars(t *testing.T) { + var err error + var spec acidv1.PostgresSpec + var cluster *Cluster + + generateKubernetesResources := func(cpuRequest string, cpuLimit string, memoryRequest string, memoryLimit string) v1.ResourceRequirements { + parsedCPURequest, err := resource.ParseQuantity(cpuRequest) + assert.NoError(t, err) + parsedCPULimit, err := resource.ParseQuantity(cpuLimit) + assert.NoError(t, err) + parsedMemoryRequest, err := resource.ParseQuantity(memoryRequest) + assert.NoError(t, err) + parsedMemoryLimit, err := resource.ParseQuantity(memoryLimit) + assert.NoError(t, err) + return v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: parsedCPURequest, + v1.ResourceMemory: parsedMemoryRequest, + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: parsedCPULimit, + v1.ResourceMemory: parsedMemoryLimit, + }, + } + } + + spec = acidv1.PostgresSpec{ + TeamID: "myapp", NumberOfInstances: 1, + Resources: acidv1.Resources{ + ResourceRequests: acidv1.ResourceDescription{CPU: "1", Memory: "10"}, + ResourceLimits: acidv1.ResourceDescription{CPU: "1", Memory: "10"}, + }, + Volume: acidv1.Volume{ + Size: "1G", + }, + Sidecars: []acidv1.Sidecar{ + acidv1.Sidecar{ + Name: "cluster-specific-sidecar", + }, + acidv1.Sidecar{ + Name: "cluster-specific-sidecar-with-resources", + Resources: acidv1.Resources{ + ResourceRequests: acidv1.ResourceDescription{CPU: "210m", Memory: "0.8Gi"}, + ResourceLimits: acidv1.ResourceDescription{CPU: "510m", Memory: "1.4Gi"}, + }, + }, + acidv1.Sidecar{ + Name: "replace-sidecar", + DockerImage: "overwrite-image", + }, + }, + } + + cluster = New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + Resources: config.Resources{ + DefaultCPURequest: "200m", + DefaultCPULimit: "500m", + DefaultMemoryRequest: "0.7Gi", + DefaultMemoryLimit: "1.3Gi", + }, + SidecarImages: map[string]string{ + "deprecated-global-sidecar": "image:123", + }, + SidecarContainers: []v1.Container{ + v1.Container{ + Name: "global-sidecar", + }, + // will be replaced by a cluster specific sidecar with the same name + v1.Container{ + Name: "replace-sidecar", + Image: "replaced-image", + }, + }, + Scalyr: config.Scalyr{ + ScalyrAPIKey: "abc", + ScalyrImage: "scalyr-image", + ScalyrCPURequest: "220m", + ScalyrCPULimit: "520m", + ScalyrMemoryRequest: "0.9Gi", + // ise default memory limit + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) + + s, err := cluster.generateStatefulSet(&spec) + assert.NoError(t, err) + + env := []v1.EnvVar{ + { + Name: "POD_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "POSTGRES_USER", + Value: superUserName, + }, + { + Name: "POSTGRES_PASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "", + }, + Key: "password", + }, + }, + }, + } + mounts := []v1.VolumeMount{ + v1.VolumeMount{ + Name: "pgdata", + MountPath: "/home/postgres/pgdata", + }, + } + + // deduplicated sidecars and Patroni + assert.Equal(t, 7, len(s.Spec.Template.Spec.Containers), "wrong number of containers") + + // cluster specific sidecar + assert.Contains(t, s.Spec.Template.Spec.Containers, v1.Container{ + Name: "cluster-specific-sidecar", + Env: env, + Resources: generateKubernetesResources("200m", "500m", "0.7Gi", "1.3Gi"), + ImagePullPolicy: v1.PullIfNotPresent, + VolumeMounts: mounts, + }) + + // container specific resources + expectedResources := generateKubernetesResources("210m", "510m", "0.8Gi", "1.4Gi") + assert.Equal(t, expectedResources.Requests[v1.ResourceCPU], s.Spec.Template.Spec.Containers[2].Resources.Requests[v1.ResourceCPU]) + assert.Equal(t, expectedResources.Limits[v1.ResourceCPU], s.Spec.Template.Spec.Containers[2].Resources.Limits[v1.ResourceCPU]) + assert.Equal(t, expectedResources.Requests[v1.ResourceMemory], s.Spec.Template.Spec.Containers[2].Resources.Requests[v1.ResourceMemory]) + assert.Equal(t, expectedResources.Limits[v1.ResourceMemory], s.Spec.Template.Spec.Containers[2].Resources.Limits[v1.ResourceMemory]) + + // deprecated global sidecar + assert.Contains(t, s.Spec.Template.Spec.Containers, v1.Container{ + Name: "deprecated-global-sidecar", + Image: "image:123", + Env: env, + Resources: generateKubernetesResources("200m", "500m", "0.7Gi", "1.3Gi"), + ImagePullPolicy: v1.PullIfNotPresent, + VolumeMounts: mounts, + }) + + // global sidecar + assert.Contains(t, s.Spec.Template.Spec.Containers, v1.Container{ + Name: "global-sidecar", + Env: env, + VolumeMounts: mounts, + }) + + // replaced sidecar + assert.Contains(t, s.Spec.Template.Spec.Containers, v1.Container{ + Name: "replace-sidecar", + Image: "overwrite-image", + Resources: generateKubernetesResources("200m", "500m", "0.7Gi", "1.3Gi"), + ImagePullPolicy: v1.PullIfNotPresent, + Env: env, + VolumeMounts: mounts, + }) + + // replaced sidecar + // the order in env is important + scalyrEnv := append([]v1.EnvVar{v1.EnvVar{Name: "SCALYR_API_KEY", Value: "abc"}, v1.EnvVar{Name: "SCALYR_SERVER_HOST", Value: ""}}, env...) + assert.Contains(t, s.Spec.Template.Spec.Containers, v1.Container{ + Name: "scalyr-sidecar", + Image: "scalyr-image", + Resources: generateKubernetesResources("220m", "520m", "0.9Gi", "1.3Gi"), + ImagePullPolicy: v1.PullIfNotPresent, + Env: scalyrEnv, + VolumeMounts: mounts, + }) + +} diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 9991621cc..a734e4835 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -294,6 +294,27 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { return pod, nil } +func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool { + + /* + Operator should not re-create pods if there is at least one replica being bootstrapped + because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag). + + XXX operator cannot forbid replica re-init, so we might still fail if re-init is started + after this check succeeds but before a pod is re-created + */ + + for _, pod := range pods.Items { + state, err := c.patroni.GetPatroniMemberState(&pod) + if err != nil || state == "creating replica" { + c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name) + return false + } + + } + return true +} + func (c *Cluster) recreatePods() error { c.setProcessName("starting to recreate pods") ls := c.labelsSet(false) @@ -309,6 +330,10 @@ func (c *Cluster) recreatePods() error { } c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items)) + if !c.isSafeToRecreatePods(pods) { + return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initilalized") + } + var ( masterPod, newMasterPod, newPod *v1.Pod ) diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go index 2db807b38..9739cc354 100644 --- a/pkg/cluster/resources_test.go +++ b/pkg/cluster/resources_test.go @@ -36,7 +36,7 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ @@ -85,7 +85,7 @@ func TestNeedConnectionPooler(t *testing.T) { ConnectionPoolerDefaultMemoryLimit: "100Mi", }, }, - }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder) cluster.Spec = acidv1.PostgresSpec{ ConnectionPooler: &acidv1.ConnectionPooler{}, diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 2ce0c7e88..e5a223324 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -346,10 +346,12 @@ func (c *Cluster) syncStatefulSet() error { // statefulset or those that got their configuration from the outdated statefulset) if podsRollingUpdateRequired { c.logger.Debugln("performing rolling update") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update") if err := c.recreatePods(); err != nil { return fmt.Errorf("could not recreate pods: %v", err) } c.logger.Infof("pods have been recreated") + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated") if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil { c.logger.Warningf("could not clear rolling update for the statefulset: %v", err) } diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index 50b5cfaa8..3a7317938 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -79,7 +79,7 @@ func TestConnectionPoolerSynchronization(t *testing.T) { NumberOfInstances: int32ToPointer(1), }, }, - }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder) cluster.Statefulset = &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 4dcdfb28a..7559ce3d4 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -530,3 +530,22 @@ func (c *Cluster) needConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool { func (c *Cluster) needConnectionPooler() bool { return c.needConnectionPoolerWorker(&c.Spec) } + +// Earlier arguments take priority +func mergeContainers(containers ...[]v1.Container) ([]v1.Container, []string) { + containerNameTaken := map[string]bool{} + result := make([]v1.Container, 0) + conflicts := make([]string, 0) + + for _, containerArray := range containers { + for _, container := range containerArray { + if _, taken := containerNameTaken[container.Name]; taken { + conflicts = append(conflicts, container.Name) + } else { + containerNameTaken[container.Name] = true + result = append(result, container) + } + } + } + return result, conflicts +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9c48b7ef2..0b3fde5d9 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -7,24 +7,24 @@ import ( "sync" "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/cache" - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/apiserver" "github.com/zalando/postgres-operator/pkg/cluster" + acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/ringlog" - - acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1" + v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" ) // Controller represents operator controller @@ -36,6 +36,9 @@ type Controller struct { KubeClient k8sutil.KubernetesClient apiserver *apiserver.Server + eventRecorder record.EventRecorder + eventBroadcaster record.EventBroadcaster + stopCh chan struct{} controllerID string @@ -67,10 +70,21 @@ type Controller struct { func NewController(controllerConfig *spec.ControllerConfig, controllerId string) *Controller { logger := logrus.New() + var myComponentName = "postgres-operator" + if controllerId != "" { + myComponentName += "/" + controllerId + } + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(logger.Debugf) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: myComponentName}) + c := &Controller{ config: *controllerConfig, opConfig: &config.Config{}, logger: logger.WithField("pkg", "controller"), + eventRecorder: recorder, + eventBroadcaster: eventBroadcaster, controllerID: controllerId, curWorkerCluster: sync.Map{}, clusterWorkers: make(map[spec.NamespacedName]uint32), @@ -93,6 +107,11 @@ func (c *Controller) initClients() { if err != nil { c.logger.Fatalf("could not create kubernetes clients: %v", err) } + c.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.KubeClient.EventsGetter.Events("")}) + if err != nil { + c.logger.Fatalf("could not setup kubernetes event sink: %v", err) + } + } func (c *Controller) initOperatorConfig() { @@ -159,6 +178,11 @@ func (c *Controller) warnOnDeprecatedOperatorParameters() { c.logger.Warningf("Operator configuration parameter 'enable_load_balancer' is deprecated and takes no effect. " + "Consider using the 'enable_master_load_balancer' or 'enable_replica_load_balancer' instead.") } + + if len(c.opConfig.SidecarImages) > 0 { + c.logger.Warningf("Operator configuration parameter 'sidecar_docker_images' is deprecated. " + + "Consider using 'sidecars' instead.") + } } func (c *Controller) initPodServiceAccount() { diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index a94fe7ecc..801dd1210 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -44,7 +44,8 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur result.RepairPeriod = time.Duration(fromCRD.RepairPeriod) result.SetMemoryRequestToLimit = fromCRD.SetMemoryRequestToLimit result.ShmVolume = fromCRD.ShmVolume - result.Sidecars = fromCRD.Sidecars + result.SidecarImages = fromCRD.SidecarImages + result.SidecarContainers = fromCRD.SidecarContainers result.StatefulsetPropagateAnnotations = fromCRD.StatefulsetPropagateAnnotations diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index e81671c7d..2a9e1b650 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" @@ -157,7 +158,7 @@ func (c *Controller) acquireInitialListOfClusters() error { } func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) *cluster.Cluster { - cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg) + cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder) cl.Run(c.stopCh) teamName := strings.ToLower(cl.Spec.TeamID) @@ -236,6 +237,7 @@ func (c *Controller) processEvent(event ClusterEvent) { if err := cl.Create(); err != nil { cl.Error = fmt.Sprintf("could not create cluster: %v", err) lg.Error(cl.Error) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error) return } @@ -274,6 +276,8 @@ func (c *Controller) processEvent(event ClusterEvent) { c.curWorkerCluster.Store(event.WorkerID, cl) cl.Delete() + // Fixme - no error handling for delete ? + // c.eventRecorder.Eventf(cl.GetReference, v1.EventTypeWarning, "Delete", "%v", cl.Error) func() { defer c.clustersMu.Unlock() @@ -304,6 +308,7 @@ func (c *Controller) processEvent(event ClusterEvent) { c.curWorkerCluster.Store(event.WorkerID, cl) if err := cl.Sync(event.NewSpec); err != nil { cl.Error = fmt.Sprintf("could not sync cluster: %v", err) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error) lg.Error(cl.Error) return } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 3222e2f55..4fe5763ea 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -9,6 +9,7 @@ import ( "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util/constants" + v1 "k8s.io/api/core/v1" ) // CRD describes CustomResourceDefinition specific configuration parameters @@ -107,12 +108,14 @@ type Config struct { LogicalBackup ConnectionPooler - WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to' - KubernetesUseConfigMaps bool `name:"kubernetes_use_configmaps" default:"false"` - EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS - DockerImage string `name:"docker_image" default:"registry.opensource.zalan.do/acid/spilo-12:1.6-p2"` - Sidecars map[string]string `name:"sidecar_docker_images"` - PodServiceAccountName string `name:"pod_service_account_name" default:"postgres-pod"` + WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to' + KubernetesUseConfigMaps bool `name:"kubernetes_use_configmaps" default:"false"` + EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS + DockerImage string `name:"docker_image" default:"registry.opensource.zalan.do/acid/spilo-12:1.6-p2"` + // deprecated in favour of SidecarContainers + SidecarImages map[string]string `name:"sidecar_docker_images"` + SidecarContainers []v1.Container `name:"sidecars"` + PodServiceAccountName string `name:"pod_service_account_name" default:"postgres-pod"` // value of this string must be valid JSON or YAML; see initPodServiceAccount PodServiceAccountDefinition string `name:"pod_service_account_definition" default:""` PodServiceAccountRoleBindingDefinition string `name:"pod_service_account_role_binding_definition" default:""` diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 3a397af7d..d7be2f48a 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -45,6 +45,7 @@ type KubernetesClient struct { corev1.NodesGetter corev1.NamespacesGetter corev1.ServiceAccountsGetter + corev1.EventsGetter appsv1.StatefulSetsGetter appsv1.DeploymentsGetter rbacv1.RoleBindingsGetter @@ -142,6 +143,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RoleBindingsGetter = client.RbacV1() kubeClient.CronJobsGetter = client.BatchV1beta1() + kubeClient.EventsGetter = client.CoreV1() apiextClient, err := apiextclient.NewForConfig(cfg) if err != nil { diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index bdd96f048..53065e599 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -3,6 +3,7 @@ package patroni import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "net" @@ -11,7 +12,7 @@ import ( "time" "github.com/sirupsen/logrus" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) const ( @@ -25,6 +26,7 @@ const ( type Interface interface { Switchover(master *v1.Pod, candidate string) error SetPostgresParameters(server *v1.Pod, options map[string]string) error + GetPatroniMemberState(pod *v1.Pod) (string, error) } // Patroni API client @@ -123,3 +125,36 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st } return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf) } + +//GetPatroniMemberState returns a state of member of a Patroni cluster +func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) { + + apiURLString, err := apiURL(server) + if err != nil { + return "", err + } + response, err := p.httpClient.Get(apiURLString) + if err != nil { + return "", fmt.Errorf("could not perform Get request: %v", err) + } + defer response.Body.Close() + + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return "", fmt.Errorf("could not read response: %v", err) + } + + data := make(map[string]interface{}) + err = json.Unmarshal(body, &data) + if err != nil { + return "", err + } + + state, ok := data["state"].(string) + if !ok { + return "", errors.New("Patroni Get call response contains wrong type for 'state' field") + } + + return state, nil + +}