Merge branch 'master' into propogate_annotations

This commit is contained in:
Felix Kunde 2020-04-28 14:27:54 +02:00 committed by GitHub
commit d60f56b440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 639 additions and 178 deletions

1
.gitignore vendored
View File

@ -28,6 +28,7 @@ _testmain.go
/vendor/
/build/
/docker/build/
/github.com/
.idea
scm-source.json

View File

@ -84,6 +84,12 @@ spec:
type: object
additionalProperties:
type: string
sidecars:
type: array
nullable: true
items:
type: object
additionalProperties: true
workers:
type: integer
minimum: 1

View File

@ -42,6 +42,13 @@ rules:
- configmaps
verbs:
- get
# to send events to the CRs
- apiGroups:
- ""
resources:
- events
verbs:
- create
# to manage endpoints which are also used by Patroni
- apiGroups:
- ""

View File

@ -507,6 +507,33 @@ A secret can be pre-provisioned in different ways:
* Automatically provisioned via a custom K8s controller like
[kube-aws-iam-controller](https://github.com/mikkeloscar/kube-aws-iam-controller)
## Sidecars for Postgres clusters
A list of sidecars is added to each cluster created by the
operator. The default is empty list.
```yaml
kind: OperatorConfiguration
configuration:
sidecars:
- image: image:123
name: global-sidecar
ports:
- containerPort: 80
volumeMounts:
- mountPath: /custom-pgdata-mountpoint
name: pgdata
- ...
```
In addition to any environment variables you specify, the following environment variables are always passed to sidecars:
- `POD_NAME` - field reference to `metadata.name`
- `POD_NAMESPACE` - field reference to `metadata.namespace`
- `POSTGRES_USER` - the superuser that can be used to connect to the database
- `POSTGRES_PASSWORD` - the password for the superuser
## Setting up the Postgres Operator UI
Since the v1.2 release the Postgres Operator is shipped with a browser-based

View File

@ -93,9 +93,17 @@ Those are top-level keys, containing both leaf keys and groups.
repository](https://github.com/zalando/spilo).
* **sidecar_docker_images**
a map of sidecar names to Docker images to run with Spilo. In case of the name
conflict with the definition in the cluster manifest the cluster-specific one
is preferred.
*deprecated*: use **sidecars** instead. A map of sidecar names to Docker images to
run with Spilo. In case of the name conflict with the definition in the cluster
manifest the cluster-specific one is preferred.
* **sidecars**
a list of sidecars to run with Spilo, for any cluster (i.e. globally defined sidecars).
Each item in the list is of type
[Container](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#container-v1-core).
Globally defined sidecars can be overwritten by specifying a sidecar in the custom resource with
the same name. Note: This field is not part of the schema validation. If the container specification
is invalid, then the operator fails to create the statefulset.
* **enable_shm_volume**
Instruct operator to start any new database pod without limitations on shm
@ -133,8 +141,9 @@ Those are top-level keys, containing both leaf keys and groups.
at the cost of overprovisioning memory and potential scheduling problems for
containers with high memory limits due to the lack of memory on Kubernetes
cluster nodes. This affects all containers created by the operator (Postgres,
Scalyr sidecar, and other sidecars); to set resources for the operator's own
container, change the [operator deployment manually](../../manifests/postgres-operator.yaml#L20).
Scalyr sidecar, and other sidecars except **sidecars** defined in the operator
configuration); to set resources for the operator's own container, change the
[operator deployment manually](../../manifests/postgres-operator.yaml#L20).
The default is `false`.
## Postgres users
@ -206,12 +215,12 @@ configuration they are grouped under the `kubernetes` key.
Default is true.
* **enable_init_containers**
global option to allow for creating init containers to run actions before
Spilo is started. Default is true.
global option to allow for creating init containers in the cluster manifest to
run actions before Spilo is started. Default is true.
* **enable_sidecars**
global option to allow for creating sidecar containers to run alongside Spilo
on the same pod. Default is true.
global option to allow for creating sidecar containers in the cluster manifest
to run alongside Spilo on the same pod. Globally defined sidecars are always enabled. Default is true.
* **secret_name_template**
a template for the name of the database user secrets generated by the

View File

@ -442,6 +442,8 @@ The PostgreSQL volume is shared with sidecars and is mounted at
specified but globally disabled in the configuration. The `enable_sidecars`
option must be set to `true`.
If you want to add a sidecar to every cluster managed by the operator, you can specify it in the [operator configuration](administrator.md#sidecars-for-postgres-clusters) instead.
## InitContainers Support
Each cluster can specify arbitrary init containers to run. These containers can

View File

@ -344,7 +344,6 @@ class EndToEndTestCase(unittest.TestCase):
'''
k8s = self.k8s
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
labels = 'spilo-role=master,' + cluster_label
readiness_label = 'lifecycle-status'
readiness_value = 'ready'
@ -709,14 +708,16 @@ class K8s:
def wait_for_logical_backup_job_creation(self):
self.wait_for_logical_backup_job(expected_num_of_jobs=1)
def update_config(self, config_map_patch):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
def delete_operator_pod(self):
operator_pod = self.api.core_v1.list_namespaced_pod(
'default', label_selector="name=postgres-operator").items[0].metadata.name
self.api.core_v1.delete_namespaced_pod(operator_pod, "default") # restart reloads the conf
self.wait_for_operator_pod_start()
def update_config(self, config_map_patch):
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
self.delete_operator_pod()
def create_with_kubectl(self, path):
return subprocess.run(
["kubectl", "create", "-f", path],

View File

@ -31,28 +31,28 @@ spec:
size: 1Gi
# storageClass: my-sc
additionalVolumes:
- name: data
mountPath: /home/postgres/pgdata/partitions
targetContainers:
- postgres
volumeSource:
PersistentVolumeClaim:
claimName: pvc-postgresql-data-partitions
readyOnly: false
- name: conf
mountPath: /etc/telegraf
subPath: telegraf.conf
targetContainers:
- telegraf-sidecar
volumeSource:
configMap:
name: my-config-map
- name: empty
mountPath: /opt/empty
targetContainers:
- all
volumeSource:
emptyDir: {}
# - name: data
# mountPath: /home/postgres/pgdata/partitions
# targetContainers:
# - postgres
# volumeSource:
# PersistentVolumeClaim:
# claimName: pvc-postgresql-data-partitions
# readyOnly: false
# - name: conf
# mountPath: /etc/telegraf
# subPath: telegraf.conf
# targetContainers:
# - telegraf-sidecar
# volumeSource:
# configMap:
# name: my-config-map
enableShmVolume: true
# spiloFSGroup: 103

View File

@ -43,6 +43,13 @@ rules:
- configmaps
verbs:
- get
# to send events to the CRs
- apiGroups:
- ""
resources:
- events
verbs:
- create
# to manage endpoints which are also used by Patroni
- apiGroups:
- ""

View File

@ -60,6 +60,12 @@ spec:
type: object
additionalProperties:
type: string
sidecars:
type: array
nullable: true
items:
type: object
additionalProperties: true
workers:
type: integer
minimum: 1

View File

@ -13,8 +13,11 @@ configuration:
resync_period: 30m
repair_period: 5m
# set_memory_request_to_limit: false
# sidecar_docker_images:
# example: "exampleimage:exampletag"
# sidecars:
# - image: image:123
# name: global-sidecar-1
# ports:
# - containerPort: 80
workers: 4
users:
replication_username: standby

View File

@ -797,6 +797,17 @@ var OperatorConfigCRDResourceValidation = apiextv1beta1.CustomResourceValidation
},
},
},
"sidecars": {
Type: "array",
Items: &apiextv1beta1.JSONSchemaPropsOrArray{
Schema: &apiextv1beta1.JSONSchemaProps{
Type: "object",
AdditionalProperties: &apiextv1beta1.JSONSchemaPropsOrBool{
Allows: true,
},
},
},
},
"workers": {
Type: "integer",
Minimum: &min1,

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/zalando/postgres-operator/pkg/spec"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -194,6 +195,7 @@ type OperatorConfigurationData struct {
SetMemoryRequestToLimit bool `json:"set_memory_request_to_limit,omitempty"`
ShmVolume *bool `json:"enable_shm_volume,omitempty"`
Sidecars map[string]string `json:"sidecar_docker_images,omitempty"`
SidecarContainers []v1.Container `json:"sidecars,omitempty"`
PostgresUsersConfiguration PostgresUsersConfiguration `json:"users"`
Kubernetes KubernetesMetaConfiguration `json:"kubernetes"`
PostgresPodResources PostgresPodResourcesDefaults `json:"postgres_pod_resources"`

View File

@ -104,7 +104,7 @@ type AdditionalVolume struct {
MountPath string `json:"mountPath"`
SubPath string `json:"subPath"`
TargetContainers []string `json:"targetContainers"`
VolumeSource v1.VolumeSource `json:"volume"`
VolumeSource v1.VolumeSource `json:"volumeSource"`
}
// PostgresqlParam describes PostgreSQL version and pairs of configuration parameter name - values.

View File

@ -317,13 +317,20 @@ func (in *OperatorConfigurationData) DeepCopyInto(out *OperatorConfigurationData
*out = new(bool)
**out = **in
}
if in.Sidecars != nil {
in, out := &in.Sidecars, &out.Sidecars
if in.SidecarImages != nil {
in, out := &in.SidecarImages, &out.SidecarImages
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.SidecarContainers != nil {
in, out := &in.SidecarContainers, &out.SidecarContainers
*out = make([]corev1.Container, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
out.PostgresUsersConfiguration = in.PostgresUsersConfiguration
in.Kubernetes.DeepCopyInto(&out.Kubernetes)
out.PostgresPodResources = in.PostgresPodResources

View File

@ -21,8 +21,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
@ -81,6 +84,7 @@ type Cluster struct {
acidv1.Postgresql
Config
logger *logrus.Entry
eventRecorder record.EventRecorder
patroni patroni.Interface
pgUsers map[string]spec.PgUser
systemUsers map[string]spec.PgUser
@ -109,7 +113,7 @@ type compareStatefulsetResult struct {
}
// New creates a new cluster. This function should be called from a controller.
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry) *Cluster {
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgresql, logger *logrus.Entry, eventRecorder record.EventRecorder) *Cluster {
deletePropagationPolicy := metav1.DeletePropagationOrphan
podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) {
@ -140,7 +144,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
cluster.oauthTokenGetter = newSecretOauthTokenGetter(&kubeClient, cfg.OpConfig.OAuthTokenSecretName)
cluster.patroni = patroni.New(cluster.logger)
cluster.eventRecorder = eventRecorder
return cluster
}
@ -166,6 +170,16 @@ func (c *Cluster) setProcessName(procName string, args ...interface{}) {
}
}
// GetReference of Postgres CR object
// i.e. required to emit events to this resource
func (c *Cluster) GetReference() *v1.ObjectReference {
ref, err := reference.GetReference(scheme.Scheme, &c.Postgresql)
if err != nil {
c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", c.Postgresql.Namespace, c.Postgresql.Name, err)
}
return ref
}
// SetStatus of Postgres cluster
// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above
func (c *Cluster) setStatus(status string) {
@ -245,6 +259,7 @@ func (c *Cluster) Create() error {
}()
c.setStatus(acidv1.ClusterStatusCreating)
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")
if err = c.enforceMinResourceLimits(&c.Spec); err != nil {
return fmt.Errorf("could not enforce minimum resource limits: %v", err)
@ -263,6 +278,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create %s endpoint: %v", role, err)
}
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Endpoints", "Endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
}
if c.Services[role] != nil {
@ -273,6 +289,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create %s service: %v", role, err)
}
c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta))
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Services", "The service %q for role %s has been successfully created", util.NameFromMeta(service.ObjectMeta), role)
}
if err = c.initUsers(); err != nil {
@ -284,6 +301,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create secrets: %v", err)
}
c.logger.Infof("secrets have been successfully created")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created")
if c.PodDisruptionBudget != nil {
return fmt.Errorf("pod disruption budget already exists in the cluster")
@ -302,6 +320,7 @@ func (c *Cluster) Create() error {
return fmt.Errorf("could not create statefulset: %v", err)
}
c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
c.logger.Info("waiting for the cluster being ready")
@ -310,6 +329,7 @@ func (c *Cluster) Create() error {
return err
}
c.logger.Infof("pods are ready")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")
// create database objects unless we are running without pods or disabled
// that feature explicitly
@ -555,6 +575,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error {
}
if isSmaller {
c.logger.Warningf("defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined CPU limit %s is below required minimum %s and will be set to it", cpuLimit, minCPULimit)
spec.Resources.ResourceLimits.CPU = minCPULimit
}
}
@ -567,6 +588,7 @@ func (c *Cluster) enforceMinResourceLimits(spec *acidv1.PostgresSpec) error {
}
if isSmaller {
c.logger.Warningf("defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "ResourceLimits", "defined memory limit %s is below required minimum %s and will be set to it", memoryLimit, minMemoryLimit)
spec.Resources.ResourceLimits.Memory = minMemoryLimit
}
}
@ -598,6 +620,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
if oldSpec.Spec.PostgresqlParam.PgVersion != newSpec.Spec.PostgresqlParam.PgVersion { // PG versions comparison
c.logger.Warningf("postgresql version change(%q -> %q) has no effect",
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "PostgreSQL", "postgresql version change(%q -> %q) has no effect",
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
//we need that hack to generate statefulset with the old version
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
}
@ -757,6 +781,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
func (c *Cluster) Delete() {
c.mu.Lock()
defer c.mu.Unlock()
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources")
// delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods
// deleting the cron job also removes pods and batch jobs it created
@ -1095,6 +1120,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
var err error
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switching over from %q to %q", curMaster.Name, candidate)
var wg sync.WaitGroup
@ -1121,6 +1147,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil {
c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
if err = <-podLabelErr; err != nil {
err = fmt.Errorf("could not get master pod label: %v", err)
}
@ -1136,6 +1163,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
// close the label waiting channel no sooner than the waiting goroutine terminates.
close(podLabelErr)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Switchover from %q to %q FAILED: %v", curMaster.Name, candidate, err)
return err
}

View File

@ -13,6 +13,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/teams"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
)
const (
@ -21,6 +22,8 @@ const (
)
var logger = logrus.New().WithField("test", "cluster")
var eventRecorder = record.NewFakeRecorder(1)
var cl = New(
Config{
OpConfig: config.Config{
@ -34,6 +37,7 @@ var cl = New(
k8sutil.NewMockKubernetesClient(),
acidv1.Postgresql{},
logger,
eventRecorder,
)
func TestInitRobotUsers(t *testing.T) {

View File

@ -463,8 +463,7 @@ func generateContainer(
}
func generateSidecarContainers(sidecars []acidv1.Sidecar,
volumeMounts []v1.VolumeMount, defaultResources acidv1.Resources,
superUserName string, credentialsSecretName string, logger *logrus.Entry) ([]v1.Container, error) {
defaultResources acidv1.Resources, startIndex int, logger *logrus.Entry) ([]v1.Container, error) {
if len(sidecars) > 0 {
result := make([]v1.Container, 0)
@ -483,7 +482,7 @@ func generateSidecarContainers(sidecars []acidv1.Sidecar,
return nil, err
}
sc := getSidecarContainer(sidecar, index, volumeMounts, resources, superUserName, credentialsSecretName, logger)
sc := getSidecarContainer(sidecar, startIndex+index, resources)
result = append(result, *sc)
}
return result, nil
@ -491,6 +490,55 @@ func generateSidecarContainers(sidecars []acidv1.Sidecar,
return nil, nil
}
// adds common fields to sidecars
func patchSidecarContainers(in []v1.Container, volumeMounts []v1.VolumeMount, superUserName string, credentialsSecretName string, logger *logrus.Entry) []v1.Container {
result := []v1.Container{}
for _, container := range in {
container.VolumeMounts = append(container.VolumeMounts, volumeMounts...)
env := []v1.EnvVar{
{
Name: "POD_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.name",
},
},
},
{
Name: "POD_NAMESPACE",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.namespace",
},
},
},
{
Name: "POSTGRES_USER",
Value: superUserName,
},
{
Name: "POSTGRES_PASSWORD",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: credentialsSecretName,
},
Key: "password",
},
},
},
}
mergedEnv := append(container.Env, env...)
container.Env = deduplicateEnvVars(mergedEnv, container.Name, logger)
result = append(result, container)
}
return result
}
// Check whether or not we're requested to mount an shm volume,
// taking into account that PostgreSQL manifest has precedence.
func mountShmVolumeNeeded(opConfig config.Config, spec *acidv1.PostgresSpec) *bool {
@ -725,58 +773,18 @@ func deduplicateEnvVars(input []v1.EnvVar, containerName string, logger *logrus.
return result
}
func getSidecarContainer(sidecar acidv1.Sidecar, index int, volumeMounts []v1.VolumeMount,
resources *v1.ResourceRequirements, superUserName string, credentialsSecretName string, logger *logrus.Entry) *v1.Container {
func getSidecarContainer(sidecar acidv1.Sidecar, index int, resources *v1.ResourceRequirements) *v1.Container {
name := sidecar.Name
if name == "" {
name = fmt.Sprintf("sidecar-%d", index)
}
env := []v1.EnvVar{
{
Name: "POD_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.name",
},
},
},
{
Name: "POD_NAMESPACE",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.namespace",
},
},
},
{
Name: "POSTGRES_USER",
Value: superUserName,
},
{
Name: "POSTGRES_PASSWORD",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: credentialsSecretName,
},
Key: "password",
},
},
},
}
if len(sidecar.Env) > 0 {
env = append(env, sidecar.Env...)
}
return &v1.Container{
Name: name,
Image: sidecar.DockerImage,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: *resources,
VolumeMounts: volumeMounts,
Env: deduplicateEnvVars(env, name, logger),
Env: sidecar.Env,
Ports: sidecar.Ports,
}
}
@ -1066,37 +1074,63 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef
c.OpConfig.Resources.SpiloPrivileged,
)
// resolve conflicts between operator-global and per-cluster sidecars
sideCars := c.mergeSidecars(spec.Sidecars)
// generate container specs for sidecars specified in the cluster manifest
clusterSpecificSidecars := []v1.Container{}
if spec.Sidecars != nil && len(spec.Sidecars) > 0 {
// warn if sidecars are defined, but globally disabled (does not apply to globally defined sidecars)
if c.OpConfig.EnableSidecars != nil && !(*c.OpConfig.EnableSidecars) {
c.logger.Warningf("sidecars specified but disabled in configuration - next statefulset creation would fail")
}
resourceRequirementsScalyrSidecar := makeResources(
c.OpConfig.ScalyrCPURequest,
c.OpConfig.ScalyrMemoryRequest,
c.OpConfig.ScalyrCPULimit,
c.OpConfig.ScalyrMemoryLimit,
)
if clusterSpecificSidecars, err = generateSidecarContainers(spec.Sidecars, defaultResources, 0, c.logger); err != nil {
return nil, fmt.Errorf("could not generate sidecar containers: %v", err)
}
}
// decrapted way of providing global sidecars
var globalSidecarContainersByDockerImage []v1.Container
var globalSidecarsByDockerImage []acidv1.Sidecar
for name, dockerImage := range c.OpConfig.SidecarImages {
globalSidecarsByDockerImage = append(globalSidecarsByDockerImage, acidv1.Sidecar{Name: name, DockerImage: dockerImage})
}
if globalSidecarContainersByDockerImage, err = generateSidecarContainers(globalSidecarsByDockerImage, defaultResources, len(clusterSpecificSidecars), c.logger); err != nil {
return nil, fmt.Errorf("could not generate sidecar containers: %v", err)
}
// make the resulting list reproducible
// c.OpConfig.SidecarImages is unsorted by Golang definition
// .Name is unique
sort.Slice(globalSidecarContainersByDockerImage, func(i, j int) bool {
return globalSidecarContainersByDockerImage[i].Name < globalSidecarContainersByDockerImage[j].Name
})
// generate scalyr sidecar container
if scalyrSidecar :=
var scalyrSidecars []v1.Container
if scalyrSidecar, err :=
generateScalyrSidecarSpec(c.Name,
c.OpConfig.ScalyrAPIKey,
c.OpConfig.ScalyrServerURL,
c.OpConfig.ScalyrImage,
&resourceRequirementsScalyrSidecar, c.logger); scalyrSidecar != nil {
sideCars = append(sideCars, *scalyrSidecar)
c.OpConfig.ScalyrCPURequest,
c.OpConfig.ScalyrMemoryRequest,
c.OpConfig.ScalyrCPULimit,
c.OpConfig.ScalyrMemoryLimit,
defaultResources,
c.logger); err != nil {
return nil, fmt.Errorf("could not generate Scalyr sidecar: %v", err)
} else {
if scalyrSidecar != nil {
scalyrSidecars = append(scalyrSidecars, *scalyrSidecar)
}
}
// generate sidecar containers
if sideCars != nil && len(sideCars) > 0 {
if c.OpConfig.EnableSidecars != nil && !(*c.OpConfig.EnableSidecars) {
c.logger.Warningf("sidecars specified but disabled in configuration - next statefulset creation would fail")
}
if sidecarContainers, err = generateSidecarContainers(sideCars, volumeMounts, defaultResources,
c.OpConfig.SuperUsername, c.credentialSecretName(c.OpConfig.SuperUsername), c.logger); err != nil {
return nil, fmt.Errorf("could not generate sidecar containers: %v", err)
}
sidecarContainers, conflicts := mergeContainers(clusterSpecificSidecars, c.Config.OpConfig.SidecarContainers, globalSidecarContainersByDockerImage, scalyrSidecars)
for containerName := range conflicts {
c.logger.Warningf("a sidecar is specified twice. Ignoring sidecar %q in favor of %q with high a precendence",
containerName, containerName)
}
sidecarContainers = patchSidecarContainers(sidecarContainers, volumeMounts, c.OpConfig.SuperUsername, c.credentialSecretName(c.OpConfig.SuperUsername), c.logger)
tolerationSpec := tolerations(&spec.Tolerations, c.OpConfig.PodToleration)
effectivePodPriorityClassName := util.Coalesce(spec.PodPriorityClassName, c.OpConfig.PodPriorityClassName)
@ -1192,57 +1226,44 @@ func (c *Cluster) generatePodAnnotations(spec *acidv1.PostgresSpec) map[string]s
}
func generateScalyrSidecarSpec(clusterName, APIKey, serverURL, dockerImage string,
containerResources *acidv1.Resources, logger *logrus.Entry) *acidv1.Sidecar {
scalyrCPURequest string, scalyrMemoryRequest string, scalyrCPULimit string, scalyrMemoryLimit string,
defaultResources acidv1.Resources, logger *logrus.Entry) (*v1.Container, error) {
if APIKey == "" || dockerImage == "" {
if APIKey == "" && dockerImage != "" {
logger.Warning("Not running Scalyr sidecar: SCALYR_API_KEY must be defined")
}
return nil
return nil, nil
}
scalarSpec := &acidv1.Sidecar{
Name: "scalyr-sidecar",
DockerImage: dockerImage,
Env: []v1.EnvVar{
{
Name: "SCALYR_API_KEY",
Value: APIKey,
},
{
Name: "SCALYR_SERVER_HOST",
Value: clusterName,
},
resourcesScalyrSidecar := makeResources(
scalyrCPURequest,
scalyrMemoryRequest,
scalyrCPULimit,
scalyrMemoryLimit,
)
resourceRequirementsScalyrSidecar, err := generateResourceRequirements(resourcesScalyrSidecar, defaultResources)
if err != nil {
return nil, fmt.Errorf("invalid resources for Scalyr sidecar: %v", err)
}
env := []v1.EnvVar{
{
Name: "SCALYR_API_KEY",
Value: APIKey,
},
{
Name: "SCALYR_SERVER_HOST",
Value: clusterName,
},
Resources: *containerResources,
}
if serverURL != "" {
scalarSpec.Env = append(scalarSpec.Env, v1.EnvVar{Name: "SCALYR_SERVER_URL", Value: serverURL})
env = append(env, v1.EnvVar{Name: "SCALYR_SERVER_URL", Value: serverURL})
}
return scalarSpec
}
// mergeSidecar merges globally-defined sidecars with those defined in the cluster manifest
func (c *Cluster) mergeSidecars(sidecars []acidv1.Sidecar) []acidv1.Sidecar {
globalSidecarsToSkip := map[string]bool{}
result := make([]acidv1.Sidecar, 0)
for i, sidecar := range sidecars {
dockerImage, ok := c.OpConfig.Sidecars[sidecar.Name]
if ok {
if dockerImage != sidecar.DockerImage {
c.logger.Warningf("merging definitions for sidecar %q: "+
"ignoring %q in the global scope in favor of %q defined in the cluster",
sidecar.Name, dockerImage, sidecar.DockerImage)
}
globalSidecarsToSkip[sidecar.Name] = true
}
result = append(result, sidecars[i])
}
for name, dockerImage := range c.OpConfig.Sidecars {
if !globalSidecarsToSkip[name] {
result = append(result, acidv1.Sidecar{Name: name, DockerImage: dockerImage})
}
}
return result
return &v1.Container{
Name: "scalyr-sidecar",
Image: dockerImage,
Env: env,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: *resourceRequirementsScalyrSidecar,
}, nil
}
func (c *Cluster) getNumberOfInstances(spec *acidv1.PostgresSpec) int32 {

View File

@ -18,6 +18,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
@ -37,7 +38,7 @@ func TestGenerateSpiloJSONConfiguration(t *testing.T) {
ReplicationUsername: replicationUserName,
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
testName := "TestGenerateSpiloConfig"
tests := []struct {
@ -102,7 +103,7 @@ func TestCreateLoadBalancerLogic(t *testing.T) {
ReplicationUsername: replicationUserName,
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
testName := "TestCreateLoadBalancerLogic"
tests := []struct {
@ -164,7 +165,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger),
logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-pdb",
@ -187,7 +189,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}},
logger),
logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-pdb",
@ -210,7 +213,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger),
logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-pdb",
@ -233,7 +237,8 @@ func TestGeneratePodDisruptionBudget(t *testing.T) {
acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"},
Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}},
logger),
logger,
eventRecorder),
policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: "postgres-myapp-database-databass-budget",
@ -368,7 +373,7 @@ func TestCloneEnv(t *testing.T) {
ReplicationUsername: replicationUserName,
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests {
envs := cluster.generateCloneEnvironment(tt.cloneOpts)
@ -502,7 +507,7 @@ func TestGetPgVersion(t *testing.T) {
ReplicationUsername: replicationUserName,
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests {
pgVersion, err := cluster.getNewPgVersion(tt.pgContainer, tt.newPgVersion)
@ -678,7 +683,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi",
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
var clusterNoDefaultRes = New(
Config{
@ -690,7 +695,7 @@ func TestConnectionPoolerPodSpec(t *testing.T) {
},
ConnectionPooler: config.ConnectionPooler{},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil }
@ -803,7 +808,7 @@ func TestConnectionPoolerDeploymentSpec(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi",
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sts",
@ -904,7 +909,7 @@ func TestConnectionPoolerServiceSpec(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi",
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sts",
@ -990,7 +995,7 @@ func TestTLS(t *testing.T) {
SpiloFSGroup: &spiloFSGroup,
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
spec = makeSpec(acidv1.TLSDescription{SecretName: "my-secret", CAFile: "ca.crt"})
s, err := cluster.generateStatefulSet(&spec)
if err != nil {
@ -1112,7 +1117,7 @@ func TestAdditionalVolume(t *testing.T) {
ReplicationUsername: replicationUserName,
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
for _, tt := range tests {
// Test with additional volume mounted in all containers
@ -1202,3 +1207,201 @@ func TestAdditionalVolume(t *testing.T) {
}
}
}
// inject sidecars through all available mechanisms and check the resulting container specs
func TestSidecars(t *testing.T) {
var err error
var spec acidv1.PostgresSpec
var cluster *Cluster
generateKubernetesResources := func(cpuRequest string, cpuLimit string, memoryRequest string, memoryLimit string) v1.ResourceRequirements {
parsedCPURequest, err := resource.ParseQuantity(cpuRequest)
assert.NoError(t, err)
parsedCPULimit, err := resource.ParseQuantity(cpuLimit)
assert.NoError(t, err)
parsedMemoryRequest, err := resource.ParseQuantity(memoryRequest)
assert.NoError(t, err)
parsedMemoryLimit, err := resource.ParseQuantity(memoryLimit)
assert.NoError(t, err)
return v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: parsedCPURequest,
v1.ResourceMemory: parsedMemoryRequest,
},
Limits: v1.ResourceList{
v1.ResourceCPU: parsedCPULimit,
v1.ResourceMemory: parsedMemoryLimit,
},
}
}
spec = acidv1.PostgresSpec{
TeamID: "myapp", NumberOfInstances: 1,
Resources: acidv1.Resources{
ResourceRequests: acidv1.ResourceDescription{CPU: "1", Memory: "10"},
ResourceLimits: acidv1.ResourceDescription{CPU: "1", Memory: "10"},
},
Volume: acidv1.Volume{
Size: "1G",
},
Sidecars: []acidv1.Sidecar{
acidv1.Sidecar{
Name: "cluster-specific-sidecar",
},
acidv1.Sidecar{
Name: "cluster-specific-sidecar-with-resources",
Resources: acidv1.Resources{
ResourceRequests: acidv1.ResourceDescription{CPU: "210m", Memory: "0.8Gi"},
ResourceLimits: acidv1.ResourceDescription{CPU: "510m", Memory: "1.4Gi"},
},
},
acidv1.Sidecar{
Name: "replace-sidecar",
DockerImage: "overwrite-image",
},
},
}
cluster = New(
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
ProtectedRoles: []string{"admin"},
Auth: config.Auth{
SuperUsername: superUserName,
ReplicationUsername: replicationUserName,
},
Resources: config.Resources{
DefaultCPURequest: "200m",
DefaultCPULimit: "500m",
DefaultMemoryRequest: "0.7Gi",
DefaultMemoryLimit: "1.3Gi",
},
SidecarImages: map[string]string{
"deprecated-global-sidecar": "image:123",
},
SidecarContainers: []v1.Container{
v1.Container{
Name: "global-sidecar",
},
// will be replaced by a cluster specific sidecar with the same name
v1.Container{
Name: "replace-sidecar",
Image: "replaced-image",
},
},
Scalyr: config.Scalyr{
ScalyrAPIKey: "abc",
ScalyrImage: "scalyr-image",
ScalyrCPURequest: "220m",
ScalyrCPULimit: "520m",
ScalyrMemoryRequest: "0.9Gi",
// ise default memory limit
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
s, err := cluster.generateStatefulSet(&spec)
assert.NoError(t, err)
env := []v1.EnvVar{
{
Name: "POD_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.name",
},
},
},
{
Name: "POD_NAMESPACE",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.namespace",
},
},
},
{
Name: "POSTGRES_USER",
Value: superUserName,
},
{
Name: "POSTGRES_PASSWORD",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: "",
},
Key: "password",
},
},
},
}
mounts := []v1.VolumeMount{
v1.VolumeMount{
Name: "pgdata",
MountPath: "/home/postgres/pgdata",
},
}
// deduplicated sidecars and Patroni
assert.Equal(t, 7, len(s.Spec.Template.Spec.Containers), "wrong number of containers")
// cluster specific sidecar
assert.Contains(t, s.Spec.Template.Spec.Containers, v1.Container{
Name: "cluster-specific-sidecar",
Env: env,
Resources: generateKubernetesResources("200m", "500m", "0.7Gi", "1.3Gi"),
ImagePullPolicy: v1.PullIfNotPresent,
VolumeMounts: mounts,
})
// container specific resources
expectedResources := generateKubernetesResources("210m", "510m", "0.8Gi", "1.4Gi")
assert.Equal(t, expectedResources.Requests[v1.ResourceCPU], s.Spec.Template.Spec.Containers[2].Resources.Requests[v1.ResourceCPU])
assert.Equal(t, expectedResources.Limits[v1.ResourceCPU], s.Spec.Template.Spec.Containers[2].Resources.Limits[v1.ResourceCPU])
assert.Equal(t, expectedResources.Requests[v1.ResourceMemory], s.Spec.Template.Spec.Containers[2].Resources.Requests[v1.ResourceMemory])
assert.Equal(t, expectedResources.Limits[v1.ResourceMemory], s.Spec.Template.Spec.Containers[2].Resources.Limits[v1.ResourceMemory])
// deprecated global sidecar
assert.Contains(t, s.Spec.Template.Spec.Containers, v1.Container{
Name: "deprecated-global-sidecar",
Image: "image:123",
Env: env,
Resources: generateKubernetesResources("200m", "500m", "0.7Gi", "1.3Gi"),
ImagePullPolicy: v1.PullIfNotPresent,
VolumeMounts: mounts,
})
// global sidecar
assert.Contains(t, s.Spec.Template.Spec.Containers, v1.Container{
Name: "global-sidecar",
Env: env,
VolumeMounts: mounts,
})
// replaced sidecar
assert.Contains(t, s.Spec.Template.Spec.Containers, v1.Container{
Name: "replace-sidecar",
Image: "overwrite-image",
Resources: generateKubernetesResources("200m", "500m", "0.7Gi", "1.3Gi"),
ImagePullPolicy: v1.PullIfNotPresent,
Env: env,
VolumeMounts: mounts,
})
// replaced sidecar
// the order in env is important
scalyrEnv := append([]v1.EnvVar{v1.EnvVar{Name: "SCALYR_API_KEY", Value: "abc"}, v1.EnvVar{Name: "SCALYR_SERVER_HOST", Value: ""}}, env...)
assert.Contains(t, s.Spec.Template.Spec.Containers, v1.Container{
Name: "scalyr-sidecar",
Image: "scalyr-image",
Resources: generateKubernetesResources("220m", "520m", "0.9Gi", "1.3Gi"),
ImagePullPolicy: v1.PullIfNotPresent,
Env: scalyrEnv,
VolumeMounts: mounts,
})
}

View File

@ -294,6 +294,27 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
return pod, nil
}
func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
/*
Operator should not re-create pods if there is at least one replica being bootstrapped
because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag).
XXX operator cannot forbid replica re-init, so we might still fail if re-init is started
after this check succeeds but before a pod is re-created
*/
for _, pod := range pods.Items {
state, err := c.patroni.GetPatroniMemberState(&pod)
if err != nil || state == "creating replica" {
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
return false
}
}
return true
}
func (c *Cluster) recreatePods() error {
c.setProcessName("starting to recreate pods")
ls := c.labelsSet(false)
@ -309,6 +330,10 @@ func (c *Cluster) recreatePods() error {
}
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))
if !c.isSafeToRecreatePods(pods) {
return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initilalized")
}
var (
masterPod, newMasterPod, newPod *v1.Pod
)

View File

@ -36,7 +36,7 @@ func TestConnectionPoolerCreationAndDeletion(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi",
},
},
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger)
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
@ -85,7 +85,7 @@ func TestNeedConnectionPooler(t *testing.T) {
ConnectionPoolerDefaultMemoryLimit: "100Mi",
},
},
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger)
}, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger, eventRecorder)
cluster.Spec = acidv1.PostgresSpec{
ConnectionPooler: &acidv1.ConnectionPooler{},

View File

@ -346,10 +346,12 @@ func (c *Cluster) syncStatefulSet() error {
// statefulset or those that got their configuration from the outdated statefulset)
if podsRollingUpdateRequired {
c.logger.Debugln("performing rolling update")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
if err := c.recreatePods(); err != nil {
return fmt.Errorf("could not recreate pods: %v", err)
}
c.logger.Infof("pods have been recreated")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Rolling update done - pods have been recreated")
if err := c.applyRollingUpdateFlagforStatefulSet(false); err != nil {
c.logger.Warningf("could not clear rolling update for the statefulset: %v", err)
}

View File

@ -79,7 +79,7 @@ func TestConnectionPoolerSynchronization(t *testing.T) {
NumberOfInstances: int32ToPointer(1),
},
},
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger)
}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)
cluster.Statefulset = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{

View File

@ -530,3 +530,22 @@ func (c *Cluster) needConnectionPoolerWorker(spec *acidv1.PostgresSpec) bool {
func (c *Cluster) needConnectionPooler() bool {
return c.needConnectionPoolerWorker(&c.Spec)
}
// Earlier arguments take priority
func mergeContainers(containers ...[]v1.Container) ([]v1.Container, []string) {
containerNameTaken := map[string]bool{}
result := make([]v1.Container, 0)
conflicts := make([]string, 0)
for _, containerArray := range containers {
for _, container := range containerArray {
if _, taken := containerNameTaken[container.Name]; taken {
conflicts = append(conflicts, container.Name)
} else {
containerNameTaken[container.Name] = true
result = append(result, container)
}
}
}
return result, conflicts
}

View File

@ -7,24 +7,24 @@ import (
"sync"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/apiserver"
"github.com/zalando/postgres-operator/pkg/cluster"
acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/ringlog"
acidv1informer "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)
// Controller represents operator controller
@ -36,6 +36,9 @@ type Controller struct {
KubeClient k8sutil.KubernetesClient
apiserver *apiserver.Server
eventRecorder record.EventRecorder
eventBroadcaster record.EventBroadcaster
stopCh chan struct{}
controllerID string
@ -67,10 +70,21 @@ type Controller struct {
func NewController(controllerConfig *spec.ControllerConfig, controllerId string) *Controller {
logger := logrus.New()
var myComponentName = "postgres-operator"
if controllerId != "" {
myComponentName += "/" + controllerId
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logger.Debugf)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: myComponentName})
c := &Controller{
config: *controllerConfig,
opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"),
eventRecorder: recorder,
eventBroadcaster: eventBroadcaster,
controllerID: controllerId,
curWorkerCluster: sync.Map{},
clusterWorkers: make(map[spec.NamespacedName]uint32),
@ -93,6 +107,11 @@ func (c *Controller) initClients() {
if err != nil {
c.logger.Fatalf("could not create kubernetes clients: %v", err)
}
c.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.KubeClient.EventsGetter.Events("")})
if err != nil {
c.logger.Fatalf("could not setup kubernetes event sink: %v", err)
}
}
func (c *Controller) initOperatorConfig() {
@ -159,6 +178,11 @@ func (c *Controller) warnOnDeprecatedOperatorParameters() {
c.logger.Warningf("Operator configuration parameter 'enable_load_balancer' is deprecated and takes no effect. " +
"Consider using the 'enable_master_load_balancer' or 'enable_replica_load_balancer' instead.")
}
if len(c.opConfig.SidecarImages) > 0 {
c.logger.Warningf("Operator configuration parameter 'sidecar_docker_images' is deprecated. " +
"Consider using 'sidecars' instead.")
}
}
func (c *Controller) initPodServiceAccount() {

View File

@ -44,7 +44,8 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur
result.RepairPeriod = time.Duration(fromCRD.RepairPeriod)
result.SetMemoryRequestToLimit = fromCRD.SetMemoryRequestToLimit
result.ShmVolume = fromCRD.ShmVolume
result.Sidecars = fromCRD.Sidecars
result.SidecarImages = fromCRD.SidecarImages
result.SidecarContainers = fromCRD.SidecarContainers
result.StatefulsetPropagateAnnotations = fromCRD.StatefulsetPropagateAnnotations

View File

@ -11,6 +11,7 @@ import (
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
@ -157,7 +158,7 @@ func (c *Controller) acquireInitialListOfClusters() error {
}
func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedName, pgSpec *acidv1.Postgresql) *cluster.Cluster {
cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg)
cl := cluster.New(c.makeClusterConfig(), c.KubeClient, *pgSpec, lg, c.eventRecorder)
cl.Run(c.stopCh)
teamName := strings.ToLower(cl.Spec.TeamID)
@ -236,6 +237,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
if err := cl.Create(); err != nil {
cl.Error = fmt.Sprintf("could not create cluster: %v", err)
lg.Error(cl.Error)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Create", "%v", cl.Error)
return
}
@ -274,6 +276,8 @@ func (c *Controller) processEvent(event ClusterEvent) {
c.curWorkerCluster.Store(event.WorkerID, cl)
cl.Delete()
// Fixme - no error handling for delete ?
// c.eventRecorder.Eventf(cl.GetReference, v1.EventTypeWarning, "Delete", "%v", cl.Error)
func() {
defer c.clustersMu.Unlock()
@ -304,6 +308,7 @@ func (c *Controller) processEvent(event ClusterEvent) {
c.curWorkerCluster.Store(event.WorkerID, cl)
if err := cl.Sync(event.NewSpec); err != nil {
cl.Error = fmt.Sprintf("could not sync cluster: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Sync", "%v", cl.Error)
lg.Error(cl.Error)
return
}

View File

@ -9,6 +9,7 @@ import (
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util/constants"
v1 "k8s.io/api/core/v1"
)
// CRD describes CustomResourceDefinition specific configuration parameters
@ -107,12 +108,14 @@ type Config struct {
LogicalBackup
ConnectionPooler
WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to'
KubernetesUseConfigMaps bool `name:"kubernetes_use_configmaps" default:"false"`
EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS
DockerImage string `name:"docker_image" default:"registry.opensource.zalan.do/acid/spilo-12:1.6-p2"`
Sidecars map[string]string `name:"sidecar_docker_images"`
PodServiceAccountName string `name:"pod_service_account_name" default:"postgres-pod"`
WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to'
KubernetesUseConfigMaps bool `name:"kubernetes_use_configmaps" default:"false"`
EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS
DockerImage string `name:"docker_image" default:"registry.opensource.zalan.do/acid/spilo-12:1.6-p2"`
// deprecated in favour of SidecarContainers
SidecarImages map[string]string `name:"sidecar_docker_images"`
SidecarContainers []v1.Container `name:"sidecars"`
PodServiceAccountName string `name:"pod_service_account_name" default:"postgres-pod"`
// value of this string must be valid JSON or YAML; see initPodServiceAccount
PodServiceAccountDefinition string `name:"pod_service_account_definition" default:""`
PodServiceAccountRoleBindingDefinition string `name:"pod_service_account_role_binding_definition" default:""`

View File

@ -45,6 +45,7 @@ type KubernetesClient struct {
corev1.NodesGetter
corev1.NamespacesGetter
corev1.ServiceAccountsGetter
corev1.EventsGetter
appsv1.StatefulSetsGetter
appsv1.DeploymentsGetter
rbacv1.RoleBindingsGetter
@ -142,6 +143,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
kubeClient.RESTClient = client.CoreV1().RESTClient()
kubeClient.RoleBindingsGetter = client.RbacV1()
kubeClient.CronJobsGetter = client.BatchV1beta1()
kubeClient.EventsGetter = client.CoreV1()
apiextClient, err := apiextclient.NewForConfig(cfg)
if err != nil {

View File

@ -3,6 +3,7 @@ package patroni
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
@ -11,7 +12,7 @@ import (
"time"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)
const (
@ -25,6 +26,7 @@ const (
type Interface interface {
Switchover(master *v1.Pod, candidate string) error
SetPostgresParameters(server *v1.Pod, options map[string]string) error
GetPatroniMemberState(pod *v1.Pod) (string, error)
}
// Patroni API client
@ -123,3 +125,36 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
}
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
}
//GetPatroniMemberState returns a state of member of a Patroni cluster
func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) {
apiURLString, err := apiURL(server)
if err != nil {
return "", err
}
response, err := p.httpClient.Get(apiURLString)
if err != nil {
return "", fmt.Errorf("could not perform Get request: %v", err)
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", fmt.Errorf("could not read response: %v", err)
}
data := make(map[string]interface{})
err = json.Unmarshal(body, &data)
if err != nil {
return "", err
}
state, ok := data["state"].(string)
if !ok {
return "", errors.New("Patroni Get call response contains wrong type for 'state' field")
}
return state, nil
}