diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index cdd58f5ba..25b19b89c 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -13,6 +13,7 @@ rules: - acid.zalan.do resources: - postgresqls + - postgresqls/status - operatorconfigurations verbs: - "*" @@ -23,6 +24,8 @@ rules: verbs: - create - get + - patch + - update - apiGroups: - "" resources: @@ -138,4 +141,15 @@ rules: - bind resourceNames: - {{ template "postgres-operator.fullname" . }} +- apiGroups: + - batch + resources: + - cronjobs # enables logical backups + verbs: + - create + - delete + - get + - list + - patch + - update {{ end }} diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index 21813fbe9..c5349b55f 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -62,6 +62,9 @@ config: pod_management_policy: "ordered_ready" enable_pod_antiaffinity: "false" pod_antiaffinity_topology_key: "kubernetes.io/hostname" + logical_backup_schedule: "30 00 * * *" + logical_backup_docker_image: "registry.opensource.zalan.do/acid/logical-backup" + logical_backup_s3_bucket: "" rbac: # Specifies whether RBAC resources should be created create: true diff --git a/docs/administrator.md b/docs/administrator.md index 32a749e36..f6f37aafb 100644 --- a/docs/administrator.md +++ b/docs/administrator.md @@ -340,9 +340,18 @@ Postgres database cluster: ## Understanding rolling update of Spilo pods -The operator logs reasons for a rolling update with the `info` level and -a diff between the old and new StatefulSet specs with the `debug` level. -To read the latter log entry with the escaped characters rendered, view it -in CLI with `echo -e`. Note that the resultant message will contain some -noise because the `PodTemplate` used by the operator is yet to be updated -with the default values used internally in Kubernetes. +The operator logs reasons for a rolling update with the `info` level and a diff between the old and new StatefulSet specs with the `debug` level. To benefit from numerous escape characters in the latter log entry, view it in CLI with `echo -e`. Note that the resultant message will contain some noise because the `PodTemplate` used by the operator is yet to be updated with the default values used internally in Kubernetes. + +## Logical backups + +The operator can manage k8s cron jobs to run logical backups of Postgres clusters. The cron job periodically spawns a batch job that runs a single pod. The backup script within this pod's container can connect to a DB for a logical backup. The operator updates cron jobs during Sync if the job schedule changes; the job name acts as the job identifier. These jobs are to be enabled for each indvidual Postgres cluster by setting `enableLogicalBackup: true` in its manifest. Notes: + +1. The provided `registry.opensource.zalan.do/acid/logical-backup` image implements the backup via `pg_dumpall` and upload of (compressed) results to an S3 bucket; `pg_dumpall` requires a `superuser` access to a DB and runs on the replica when possible. + +2. Due to the [limitation of Kubernetes cron jobs](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-job-limitations) it is highly advisable to set up additional monitoring for this feature; such monitoring is outside of the scope of operator responsibilities. + +3. The operator does not remove old backups. + +4. You may use your own image by overwriting the relevant field in the operator configuration. Any such image must ensure the logical backup is able to finish [in presence of pod restarts](https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#handling-pod-and-container-failures) and [simultaneous invocations](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-job-limitations) of the backup cron job. + +5. For that feature to work, your RBAC policy must enable operations on the `cronjobs` resource from the `batch` API group for the operator service account. See [example RBAC](../manifests/operator-service-account-rbac.yaml) \ No newline at end of file diff --git a/docs/developer.md b/docs/developer.md index 561b437e6..cbbf63dc1 100644 --- a/docs/developer.md +++ b/docs/developer.md @@ -203,7 +203,7 @@ localhost:8080 by doing: The inner 'query' gets the name of the postgres operator pod, and the outer enables port forwarding. Afterwards, you can access the operator API with: - $ curl http://127.0.0.1:8080/$endpoint| jq . + $ curl --location http://127.0.0.1:8080/$endpoint | jq . The available endpoints are listed below. Note that the worker ID is an integer from 0 up to 'workers' - 1 (value configured in the operator configuration and @@ -331,6 +331,9 @@ be updated. As explained [here](reference/operator_parameters.md), it's possible to configure the operator either with a ConfigMap or CRD, but currently we aim to synchronize parameters everywhere. +When choosing a parameter name for a new option in a PG manifest, keep in mind +the naming conventions there. The `snake_case` variables come from the Patroni/Postgres world, while the `camelCase` from the k8s world. + Note: If one option is defined in the operator configuration and in the cluster [manifest](../manifests/complete-postgres-manifest.yaml), the latter takes precedence. diff --git a/docs/quickstart.md b/docs/quickstart.md index 244f45b54..09072ea50 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -32,16 +32,19 @@ kubectl create -f manifests/postgres-operator.yaml # deployment ## Helm chart -Another possibility is using a provided [Helm](https://helm.sh/) chart which -saves you these steps. Therefore, you would need to install the helm CLI on your -machine. After initializing helm (and its server component Tiller) in your local -cluster you can install the operator chart. +Alternatively, the operator can be installed by using the provided [Helm](https://helm.sh/) +chart which saves you the manual steps. Therefore, you would need to install +the helm CLI on your machine. After initializing helm (and its server +component Tiller) in your local cluster you can install the operator chart. +You can define a release name that is prepended to the operator resource's +names. Use `--name zalando` to match with the default service account name +as older operator versions do not support custom names for service accounts. ```bash # 1) initialize helm helm init # 2) install postgres-operator chart -helm install --name postgres-operator ./charts/postgres-operator +helm install --name zalando ./charts/postgres-operator ``` ## Create a Postgres cluster diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index f1491525d..842b50cf9 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -14,6 +14,8 @@ measurements. Please, refer to the [Kubernetes documentation](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/) for the possible values of those. +:exclamation: If both operator configmap/CRD and a Postgres cluster manifest define the same parameter, the value from the Postgres cluster manifest is applied. + ## Manifest structure A postgres manifest is a `YAML` document. On the top level both individual @@ -45,7 +47,7 @@ Those parameters are grouped under the `metadata` top-level key. ## Top-level parameters -Those are parameters grouped directly under the `spec` key in the manifest. +These parameters are grouped directly under the `spec` key in the manifest. * **teamId** name of the team the cluster belongs to. Changing it after the cluster @@ -117,6 +119,12 @@ Those are parameters grouped directly under the `spec` key in the manifest. is `false`, then no volume will be mounted no matter how operator was configured (so you can override the operator configuration). +* **enableLogicalBackup** + Determines if the logical backup of this cluster should be taken and uploaded to S3. Default: false. + +* **logicalBackupSchedule** + Schedule for the logical backup k8s cron job. Please take [the reference schedule format](https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/#schedule) into account. Default: "30 00 \* \* \*" + ## Postgres parameters Those parameters are grouped under the `postgresql` top-level key. diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index 11fee3846..3e06cf31d 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -51,6 +51,8 @@ parameters, those parameters have no effect and are replaced by the `CRD_READY_WAIT_INTERVAL` and `CRD_READY_WAIT_TIMEOUT` environment variables. They will be deprecated and removed in the future. +For the configmap operator configuration, the [default parameter values](https://github.com/zalando-incubator/postgres-operator/blob/master/pkg/util/config/config.go#L14) mentioned here are likely to be overwritten in your local operator installation via your local version of the operator configmap. In the case you use the operator CRD, all the CRD defaults are provided in the [operator's default configuration manifest](https://github.com/zalando-incubator/postgres-operator/blob/master/manifests/postgresql-operator-default-configuration.yaml) + Variable names are underscore-separated words. @@ -476,4 +478,16 @@ scalyr sidecar. In the CRD-based configuration they are grouped under the Memory limit value for the Scalyr sidecar. The default is `1Gi`. -For the configmap operator configuration, the [default parameter values](https://github.com/zalando/postgres-operator/blob/master/pkg/util/config/config.go#L14) mentioned here are likely to be overwritten in your local operator installation via your local version of the operator configmap. In the case you use the operator CRD, all the CRD defaults are provided in the [operator's default configuration manifest](https://github.com/zalando/postgres-operator/blob/master/manifests/postgresql-operator-default-configuration.yaml) +## Logical backup + + These parameters configure a k8s cron job managed by the operator to produce Postgres logical backups. + In the CRD-based configuration those parameters are grouped under the `logical_backup` key. + + * **logical_backup_schedule** + Backup schedule in the cron format. Please take [the reference schedule format](https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/#schedule) into account. Default: "30 00 \* \* \*" + + * **logical_backup_docker_image** + Docker image for the pods of the cron job. Must implement backup logic and correctly handle pod and job restarts. The default image runs `pg_dumpall` (on a replica if possible) and uploads compressed results to an S3 bucket under the key `/spilo/pg_cluster_name/cluster_k8s_uuid/logical_backups` Default: "registry.opensource.zalan.do/acid/logical-backup" + + * **logical_backup_s3_bucket** + S3 bucket to store backup results. The bucket has to be present and accessible by Postgres pods. Default: empty. diff --git a/docs/user.md b/docs/user.md index bab09f8c9..1942bab16 100644 --- a/docs/user.md +++ b/docs/user.md @@ -347,3 +347,11 @@ every 6 hours. Note that if the statefulset is scaled down before resizing the size changes are only applied to the volumes attached to the running pods. The size of the volumes that correspond to the previously running pods is not changed. + +## Logical backups + +If you add +``` + enableLogicalBackup: true +``` +to the cluster manifest, the operator will create and sync a k8s cron job to do periodic logical backups of this particular Postgres cluster. Due to the [limitation of Kubernetes cron jobs](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-job-limitations) it is highly advisable to set up additional monitoring for this feature; such monitoring is outside of the scope of operator responsibilities. See [configuration reference](reference/cluster_manifest.md) and [administrator documentation](administrator.md) for details on how backups are executed. diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 4a4c6078a..b2ebe948e 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -63,6 +63,11 @@ spec: # uid: "efd12e58-5786-11e8-b5a7-06148230260c" # cluster: "acid-batman" # timestamp: "2017-12-19T12:40:33+01:00" # timezone required (offset relative to UTC, see RFC 3339 section 5.6) + # s3_wal_path: "s3://custom/path/to/bucket" + + # run periodic backups with k8s cron jobs + # enableLogicalBackup: true + # logicalBackupSchedule: "30 00 * * *" maintenanceWindows: - 01:00-06:00 #UTC - Sat:00:00-04:00 diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index 17830c41f..bd7d11c6a 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -54,3 +54,7 @@ data: resource_check_interval: 3s resource_check_timeout: 10m resync_period: 5m + + # logical_backup_schedule: "30 00 * * *" + # logical_backup_docker_image: "registry.opensource.zalan.do/acid/logical-backup" + # logical_backup_s3_bucket: "" diff --git a/manifests/minimal-postgres-manifest.yaml b/manifests/minimal-postgres-manifest.yaml index 37d772567..e952df374 100644 --- a/manifests/minimal-postgres-manifest.yaml +++ b/manifests/minimal-postgres-manifest.yaml @@ -17,7 +17,6 @@ spec: # role for application foo foo_user: [] - #databases: name->owner databases: foo: zalando diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index 7bd539ac5..2057c414f 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -14,6 +14,7 @@ rules: - acid.zalan.do resources: - postgresqls + - postgresqls/status - operatorconfigurations verbs: - "*" @@ -24,6 +25,8 @@ rules: verbs: - create - get + - patch + - update - apiGroups: - "" resources: @@ -137,9 +140,19 @@ rules: - clusterroles verbs: - bind - resourceNames: + resourceNames: - zalando-postgres-operator - +- apiGroups: + - batch + resources: + - cronjobs # enables logical backups + verbs: + - create + - delete + - get + - list + - patch + - update --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index 5ea5ba87c..fa27c6956 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -91,4 +91,7 @@ configuration: # scalyr_api_key: "" # scalyr_image: "" # scalyr_server_url: "" - + logical_backup: + logical_backup_schedule: "30 00 * * *" + logical_backup_docker_image: "registry.opensource.zalan.do/acid/logical-backup" + logical_backup_s3_bucket: "" diff --git a/pkg/apis/acid.zalan.do/v1/const.go b/pkg/apis/acid.zalan.do/v1/const.go index 59d6c1406..3cb1c1ade 100644 --- a/pkg/apis/acid.zalan.do/v1/const.go +++ b/pkg/apis/acid.zalan.do/v1/const.go @@ -2,14 +2,14 @@ package v1 // ClusterStatusUnknown etc : status of a Postgres cluster known to the operator const ( - ClusterStatusUnknown PostgresStatus = "" - ClusterStatusCreating PostgresStatus = "Creating" - ClusterStatusUpdating PostgresStatus = "Updating" - ClusterStatusUpdateFailed PostgresStatus = "UpdateFailed" - ClusterStatusSyncFailed PostgresStatus = "SyncFailed" - ClusterStatusAddFailed PostgresStatus = "CreateFailed" - ClusterStatusRunning PostgresStatus = "Running" - ClusterStatusInvalid PostgresStatus = "Invalid" + ClusterStatusUnknown = "" + ClusterStatusCreating = "Creating" + ClusterStatusUpdating = "Updating" + ClusterStatusUpdateFailed = "UpdateFailed" + ClusterStatusSyncFailed = "SyncFailed" + ClusterStatusAddFailed = "CreateFailed" + ClusterStatusRunning = "Running" + ClusterStatusInvalid = "Invalid" ) const ( diff --git a/pkg/apis/acid.zalan.do/v1/marshal.go b/pkg/apis/acid.zalan.do/v1/marshal.go index 823ff0ef2..d180f784c 100644 --- a/pkg/apis/acid.zalan.do/v1/marshal.go +++ b/pkg/apis/acid.zalan.do/v1/marshal.go @@ -8,6 +8,7 @@ import ( ) type postgresqlCopy Postgresql +type postgresStatusCopy PostgresStatus // MarshalJSON converts a maintenance window definition to JSON. func (m *MaintenanceWindow) MarshalJSON() ([]byte, error) { @@ -69,6 +70,26 @@ func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error { return nil } +// UnmarshalJSON converts a JSON to the status subresource definition. +func (ps *PostgresStatus) UnmarshalJSON(data []byte) error { + var ( + tmp postgresStatusCopy + status string + ) + + err := json.Unmarshal(data, &tmp) + if err != nil { + metaErr := json.Unmarshal(data, &status) + if metaErr != nil { + return fmt.Errorf("Could not parse status: %v; err %v", string(data), metaErr) + } + tmp.PostgresClusterStatus = status + } + *ps = PostgresStatus(tmp) + + return nil +} + // UnmarshalJSON converts a JSON into the PostgreSQL object. func (p *Postgresql) UnmarshalJSON(data []byte) error { var tmp postgresqlCopy @@ -81,7 +102,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { } tmp.Error = err.Error() - tmp.Status = ClusterStatusInvalid + tmp.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid} *p = Postgresql(tmp) @@ -91,10 +112,10 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { if clusterName, err := extractClusterName(tmp2.ObjectMeta.Name, tmp2.Spec.TeamID); err != nil { tmp2.Error = err.Error() - tmp2.Status = ClusterStatusInvalid + tmp2.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid} } else if err := validateCloneClusterDescription(&tmp2.Spec.Clone); err != nil { tmp2.Error = err.Error() - tmp2.Status = ClusterStatusInvalid + tmp2.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid} } else { tmp2.Spec.ClusterName = clusterName } diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index d4ea04e15..c6e87d8ea 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -143,25 +143,26 @@ type ScalyrConfiguration struct { // OperatorConfigurationData defines the operation config type OperatorConfigurationData struct { - EtcdHost string `json:"etcd_host,omitempty"` - DockerImage string `json:"docker_image,omitempty"` - Workers uint32 `json:"workers,omitempty"` - MinInstances int32 `json:"min_instances,omitempty"` - MaxInstances int32 `json:"max_instances,omitempty"` - ResyncPeriod Duration `json:"resync_period,omitempty"` - RepairPeriod Duration `json:"repair_period,omitempty"` - Sidecars map[string]string `json:"sidecar_docker_images,omitempty"` - PostgresUsersConfiguration PostgresUsersConfiguration `json:"users"` - Kubernetes KubernetesMetaConfiguration `json:"kubernetes"` - PostgresPodResources PostgresPodResourcesDefaults `json:"postgres_pod_resources"` - SetMemoryRequestToLimit bool `json:"set_memory_request_to_limit,omitempty"` - Timeouts OperatorTimeouts `json:"timeouts"` - LoadBalancer LoadBalancerConfiguration `json:"load_balancer"` - AWSGCP AWSGCPConfiguration `json:"aws_or_gcp"` - OperatorDebug OperatorDebugConfiguration `json:"debug"` - TeamsAPI TeamsAPIConfiguration `json:"teams_api"` - LoggingRESTAPI LoggingRESTAPIConfiguration `json:"logging_rest_api"` - Scalyr ScalyrConfiguration `json:"scalyr"` + EtcdHost string `json:"etcd_host,omitempty"` + DockerImage string `json:"docker_image,omitempty"` + Workers uint32 `json:"workers,omitempty"` + MinInstances int32 `json:"min_instances,omitempty"` + MaxInstances int32 `json:"max_instances,omitempty"` + ResyncPeriod Duration `json:"resync_period,omitempty"` + RepairPeriod Duration `json:"repair_period,omitempty"` + Sidecars map[string]string `json:"sidecar_docker_images,omitempty"` + PostgresUsersConfiguration PostgresUsersConfiguration `json:"users"` + Kubernetes KubernetesMetaConfiguration `json:"kubernetes"` + PostgresPodResources PostgresPodResourcesDefaults `json:"postgres_pod_resources"` + SetMemoryRequestToLimit bool `json:"set_memory_request_to_limit,omitempty"` + Timeouts OperatorTimeouts `json:"timeouts"` + LoadBalancer LoadBalancerConfiguration `json:"load_balancer"` + AWSGCP AWSGCPConfiguration `json:"aws_or_gcp"` + OperatorDebug OperatorDebugConfiguration `json:"debug"` + TeamsAPI TeamsAPIConfiguration `json:"teams_api"` + LoggingRESTAPI LoggingRESTAPIConfiguration `json:"logging_rest_api"` + Scalyr ScalyrConfiguration `json:"scalyr"` + LogicalBackup OperatorLogicalBackupConfiguration `json:"logical_backup"` } // OperatorConfigurationUsers defines configration for super user @@ -174,3 +175,9 @@ type OperatorConfigurationUsers struct { //Duration shortens this frequently used name type Duration time.Duration + +type OperatorLogicalBackupConfiguration struct { + Schedule string `json:"logical_backup_schedule,omitempty"` + DockerImage string `json:"logical_backup_docker_image,omitempty"` + S3Bucket string `json:"logical_backup_s3_bucket,omitempty"` +} diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index ccd7fe08c..87a079da9 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -3,7 +3,7 @@ package v1 import ( "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -16,7 +16,7 @@ type Postgresql struct { metav1.ObjectMeta `json:"metadata,omitempty"` Spec PostgresSpec `json:"spec"` - Status PostgresStatus `json:"status,omitempty"` + Status PostgresStatus `json:"status"` Error string `json:"-"` } @@ -43,17 +43,19 @@ type PostgresSpec struct { // load balancers' source ranges are the same for master and replica services AllowedSourceRanges []string `json:"allowedSourceRanges"` - NumberOfInstances int32 `json:"numberOfInstances"` - Users map[string]UserFlags `json:"users"` - MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"` - Clone CloneDescription `json:"clone"` - ClusterName string `json:"-"` - Databases map[string]string `json:"databases,omitempty"` - Tolerations []v1.Toleration `json:"tolerations,omitempty"` - Sidecars []Sidecar `json:"sidecars,omitempty"` - InitContainers []v1.Container `json:"init_containers,omitempty"` - PodPriorityClassName string `json:"pod_priority_class_name,omitempty"` - ShmVolume *bool `json:"enableShmVolume,omitempty"` + NumberOfInstances int32 `json:"numberOfInstances"` + Users map[string]UserFlags `json:"users"` + MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"` + Clone CloneDescription `json:"clone"` + ClusterName string `json:"-"` + Databases map[string]string `json:"databases,omitempty"` + Tolerations []v1.Toleration `json:"tolerations,omitempty"` + Sidecars []Sidecar `json:"sidecars,omitempty"` + InitContainers []v1.Container `json:"init_containers,omitempty"` + PodPriorityClassName string `json:"pod_priority_class_name,omitempty"` + ShmVolume *bool `json:"enableShmVolume,omitempty"` + EnableLogicalBackup bool `json:"enableLogicalBackup,omitempty"` + LogicalBackupSchedule string `json:"logicalBackupSchedule,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -114,6 +116,7 @@ type CloneDescription struct { ClusterName string `json:"cluster,omitempty"` UID string `json:"uid,omitempty"` EndTimestamp string `json:"timestamp,omitempty"` + S3WalPath string `json:"s3_wal_path,omitempty"` } // Sidecar defines a container to be run in the same pod as the Postgres container. @@ -129,4 +132,6 @@ type Sidecar struct { type UserFlags []string // PostgresStatus contains status of the PostgreSQL cluster (running, creation failed etc.) -type PostgresStatus string +type PostgresStatus struct { + PostgresClusterStatus string `json:"PostgresClusterStatus"` +} diff --git a/pkg/apis/acid.zalan.do/v1/util.go b/pkg/apis/acid.zalan.do/v1/util.go index 0a3267972..db6efcd71 100644 --- a/pkg/apis/acid.zalan.do/v1/util.go +++ b/pkg/apis/acid.zalan.do/v1/util.go @@ -85,12 +85,22 @@ func validateCloneClusterDescription(clone *CloneDescription) error { } // Success of the current Status -func (status PostgresStatus) Success() bool { - return status != ClusterStatusAddFailed && - status != ClusterStatusUpdateFailed && - status != ClusterStatusSyncFailed +func (postgresStatus PostgresStatus) Success() bool { + return postgresStatus.PostgresClusterStatus != ClusterStatusAddFailed && + postgresStatus.PostgresClusterStatus != ClusterStatusUpdateFailed && + postgresStatus.PostgresClusterStatus != ClusterStatusSyncFailed } -func (status PostgresStatus) String() string { - return string(status) +// Running status of cluster +func (postgresStatus PostgresStatus) Running() bool { + return postgresStatus.PostgresClusterStatus == ClusterStatusRunning +} + +// Creating status of cluster +func (postgresStatus PostgresStatus) Creating() bool { + return postgresStatus.PostgresClusterStatus == ClusterStatusCreating +} + +func (postgresStatus PostgresStatus) String() string { + return postgresStatus.PostgresClusterStatus } diff --git a/pkg/apis/acid.zalan.do/v1/util_test.go b/pkg/apis/acid.zalan.do/v1/util_test.go index 01be31e88..537619aaf 100644 --- a/pkg/apis/acid.zalan.do/v1/util_test.go +++ b/pkg/apis/acid.zalan.do/v1/util_test.go @@ -61,12 +61,12 @@ var cloneClusterDescriptions = []struct { in *CloneDescription err error }{ - {&CloneDescription{"foo+bar", "", "NotEmpty"}, nil}, - {&CloneDescription{"foo+bar", "", ""}, + {&CloneDescription{"foo+bar", "", "NotEmpty", ""}, nil}, + {&CloneDescription{"foo+bar", "", "", ""}, errors.New(`clone cluster name must confirm to DNS-1035, regex used for validation is "^[a-z]([-a-z0-9]*[a-z0-9])?$"`)}, - {&CloneDescription{"foobar123456789012345678901234567890123456789012345678901234567890", "", ""}, + {&CloneDescription{"foobar123456789012345678901234567890123456789012345678901234567890", "", "", ""}, errors.New("clone cluster name must be no longer than 63 characters")}, - {&CloneDescription{"foobar", "", ""}, nil}, + {&CloneDescription{"foobar", "", "", ""}, nil}, } var maintenanceWindows = []struct { @@ -111,101 +111,139 @@ var maintenanceWindows = []struct { {[]byte(`"Mon:00:00"`), MaintenanceWindow{}, errors.New("incorrect maintenance window format")}, {[]byte(`"Mon:00:00-00:00:00"`), MaintenanceWindow{}, errors.New("could not parse end time: incorrect time format")}} +var postgresStatus = []struct { + in []byte + out PostgresStatus + err error +}{ + {[]byte(`{"PostgresClusterStatus":"Running"}`), + PostgresStatus{PostgresClusterStatus: ClusterStatusRunning}, nil}, + {[]byte(`{"PostgresClusterStatus":""}`), + PostgresStatus{PostgresClusterStatus: ClusterStatusUnknown}, nil}, + {[]byte(`"Running"`), + PostgresStatus{PostgresClusterStatus: ClusterStatusRunning}, nil}, + {[]byte(`""`), + PostgresStatus{PostgresClusterStatus: ClusterStatusUnknown}, nil}} + var unmarshalCluster = []struct { in []byte out Postgresql marshal []byte err error -}{{ - []byte(`{ - "kind": "Postgresql","apiVersion": "acid.zalan.do/v1", - "metadata": {"name": "acid-testcluster1"}, "spec": {"teamId": 100}}`), - Postgresql{ - TypeMeta: metav1.TypeMeta{ - Kind: "Postgresql", - APIVersion: "acid.zalan.do/v1", +}{ + // example with simple status field + { + in: []byte(`{ + "kind": "Postgresql","apiVersion": "acid.zalan.do/v1", + "metadata": {"name": "acid-testcluster1"}, "spec": {"teamId": 100}}`), + out: Postgresql{ + TypeMeta: metav1.TypeMeta{ + Kind: "Postgresql", + APIVersion: "acid.zalan.do/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-testcluster1", + }, + Status: PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid}, + // This error message can vary between Go versions, so compute it for the current version. + Error: json.Unmarshal([]byte(`{"teamId": 0}`), &PostgresSpec{}).Error(), }, - ObjectMeta: metav1.ObjectMeta{ - Name: "acid-testcluster1", + marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":"Invalid"}`), + err: nil}, + // example with /status subresource + { + in: []byte(`{ + "kind": "Postgresql","apiVersion": "acid.zalan.do/v1", + "metadata": {"name": "acid-testcluster1"}, "spec": {"teamId": 100}}`), + out: Postgresql{ + TypeMeta: metav1.TypeMeta{ + Kind: "Postgresql", + APIVersion: "acid.zalan.do/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-testcluster1", + }, + Status: PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid}, + // This error message can vary between Go versions, so compute it for the current version. + Error: json.Unmarshal([]byte(`{"teamId": 0}`), &PostgresSpec{}).Error(), }, - Status: ClusterStatusInvalid, - // This error message can vary between Go versions, so compute it for the current version. - Error: json.Unmarshal([]byte(`{"teamId": 0}`), &PostgresSpec{}).Error(), - }, - []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":"Invalid"}`), nil}, - {[]byte(`{ - "kind": "Postgresql", - "apiVersion": "acid.zalan.do/v1", - "metadata": { - "name": "acid-testcluster1" - }, - "spec": { - "teamId": "ACID", - "volume": { - "size": "5Gi", - "storageClass": "SSD" - }, - "numberOfInstances": 2, - "users": { - "zalando": [ - "superuser", - "createdb" - ] - }, - "allowedSourceRanges": [ - "127.0.0.1/32" - ], - "postgresql": { - "version": "9.6", - "parameters": { - "shared_buffers": "32MB", - "max_connections": "10", - "log_statement": "all" - } - }, - "resources": { - "requests": { - "cpu": "10m", - "memory": "50Mi" - }, - "limits": { - "cpu": "300m", - "memory": "3000Mi" - } - }, - "clone" : { - "cluster": "acid-batman" - }, - "patroni": { - "initdb": { - "encoding": "UTF8", - "locale": "en_US.UTF-8", - "data-checksums": "true" - }, - "pg_hba": [ - "hostssl all all 0.0.0.0/0 md5", - "host all all 0.0.0.0/0 md5" - ], - "ttl": 30, - "loop_wait": 10, - "retry_timeout": 10, - "maximum_lag_on_failover": 33554432, - "slots" : { - "permanent_logical_1" : { - "type" : "logical", - "database" : "foo", - "plugin" : "pgoutput" - } + marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":{"PostgresClusterStatus":"Invalid"}}`), + err: nil}, + // example with detailed input manifest + { + in: []byte(`{ + "kind": "Postgresql", + "apiVersion": "acid.zalan.do/v1", + "metadata": { + "name": "acid-testcluster1" + }, + "spec": { + "teamId": "ACID", + "volume": { + "size": "5Gi", + "storageClass": "SSD" + }, + "numberOfInstances": 2, + "users": { + "zalando": [ + "superuser", + "createdb" + ] + }, + "allowedSourceRanges": [ + "127.0.0.1/32" + ], + "postgresql": { + "version": "9.6", + "parameters": { + "shared_buffers": "32MB", + "max_connections": "10", + "log_statement": "all" + } + }, + "resources": { + "requests": { + "cpu": "10m", + "memory": "50Mi" + }, + "limits": { + "cpu": "300m", + "memory": "3000Mi" + } + }, + "clone" : { + "cluster": "acid-batman" + }, + "patroni": { + "initdb": { + "encoding": "UTF8", + "locale": "en_US.UTF-8", + "data-checksums": "true" + }, + "pg_hba": [ + "hostssl all all 0.0.0.0/0 md5", + "host all all 0.0.0.0/0 md5" + ], + "ttl": 30, + "loop_wait": 10, + "retry_timeout": 10, + "maximum_lag_on_failover": 33554432, + "slots" : { + "permanent_logical_1" : { + "type" : "logical", + "database" : "foo", + "plugin" : "pgoutput" + } + } + }, + "maintenanceWindows": [ + "Mon:01:00-06:00", + "Sat:00:00-04:00", + "05:00-05:15" + ] } - }, - "maintenanceWindows": [ - "Mon:01:00-06:00", - "Sat:00:00-04:00", - "05:00-05:15" - ] - } -}`), - Postgresql{ + }`), + out: Postgresql{ TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", @@ -273,10 +311,12 @@ var unmarshalCluster = []struct { }, Error: "", }, - []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"9.6","parameters":{"log_statement":"all","max_connections":"10","shared_buffers":"32MB"}},"volume":{"size":"5Gi","storageClass":"SSD"},"patroni":{"initdb":{"data-checksums":"true","encoding":"UTF8","locale":"en_US.UTF-8"},"pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"],"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}},"resources":{"requests":{"cpu":"10m","memory":"50Mi"},"limits":{"cpu":"300m","memory":"3000Mi"}},"teamId":"ACID","allowedSourceRanges":["127.0.0.1/32"],"numberOfInstances":2,"users":{"zalando":["superuser","createdb"]},"maintenanceWindows":["Mon:01:00-06:00","Sat:00:00-04:00","05:00-05:15"],"clone":{"cluster":"acid-batman"}}}`), nil}, + marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"9.6","parameters":{"log_statement":"all","max_connections":"10","shared_buffers":"32MB"}},"volume":{"size":"5Gi","storageClass":"SSD"},"patroni":{"initdb":{"data-checksums":"true","encoding":"UTF8","locale":"en_US.UTF-8"},"pg_hba":["hostssl all all 0.0.0.0/0 md5","host all all 0.0.0.0/0 md5"],"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432,"slots":{"permanent_logical_1":{"database":"foo","plugin":"pgoutput","type":"logical"}}},"resources":{"requests":{"cpu":"10m","memory":"50Mi"},"limits":{"cpu":"300m","memory":"3000Mi"}},"teamId":"ACID","allowedSourceRanges":["127.0.0.1/32"],"numberOfInstances":2,"users":{"zalando":["superuser","createdb"]},"maintenanceWindows":["Mon:01:00-06:00","Sat:00:00-04:00","05:00-05:15"],"clone":{"cluster":"acid-batman"}},"status":{"PostgresClusterStatus":""}}`), + err: nil}, + // example with teamId set in input { - []byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1","metadata": {"name": "teapot-testcluster1"}, "spec": {"teamId": "acid"}}`), - Postgresql{ + in: []byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1","metadata": {"name": "teapot-testcluster1"}, "spec": {"teamId": "acid"}}`), + out: Postgresql{ TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", @@ -285,10 +325,12 @@ var unmarshalCluster = []struct { Name: "teapot-testcluster1", }, Spec: PostgresSpec{TeamID: "acid"}, - Status: ClusterStatusInvalid, + Status: PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid}, Error: errors.New("name must match {TEAM}-{NAME} format").Error(), }, - []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"teapot-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":"Invalid"}`), nil}, + marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"teapot-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":{"PostgresClusterStatus":"Invalid"}}`), + err: nil}, + // clone example { in: []byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1","metadata": {"name": "acid-testcluster1"}, "spec": {"teamId": "acid", "clone": {"cluster": "team-batman"}}}`), out: Postgresql{ @@ -308,22 +350,26 @@ var unmarshalCluster = []struct { }, Error: "", }, - marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{"cluster":"team-batman"}}}`), err: nil}, - {[]byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1"`), - Postgresql{}, - []byte{}, - errors.New("unexpected end of JSON input")}, - {[]byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster","creationTimestamp":qaz},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":"Invalid"}`), - Postgresql{}, - []byte{}, - errors.New("invalid character 'q' looking for beginning of value")}} + marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{"cluster":"team-batman"}},"status":{"PostgresClusterStatus":""}}`), + err: nil}, + // erroneous examples + { + in: []byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1"`), + out: Postgresql{}, + marshal: []byte{}, + err: errors.New("unexpected end of JSON input")}, + { + in: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster","creationTimestamp":qaz},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0,"slots":null},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":{"PostgresClusterStatus":"Invalid"}}`), + out: Postgresql{}, + marshal: []byte{}, + err: errors.New("invalid character 'q' looking for beginning of value")}} var postgresqlList = []struct { in []byte out PostgresqlList err error }{ - {[]byte(`{"apiVersion":"v1","items":[{"apiVersion":"acid.zalan.do/v1","kind":"Postgresql","metadata":{"labels":{"team":"acid"},"name":"acid-testcluster42","namespace":"default","resourceVersion":"30446957","selfLink":"/apis/acid.zalan.do/v1/namespaces/default/postgresqls/acid-testcluster42","uid":"857cd208-33dc-11e7-b20a-0699041e4b03"},"spec":{"allowedSourceRanges":["185.85.220.0/22"],"numberOfInstances":1,"postgresql":{"version":"9.6"},"teamId":"acid","volume":{"size":"10Gi"}},"status":"Running"}],"kind":"List","metadata":{},"resourceVersion":"","selfLink":""}`), + {[]byte(`{"apiVersion":"v1","items":[{"apiVersion":"acid.zalan.do/v1","kind":"Postgresql","metadata":{"labels":{"team":"acid"},"name":"acid-testcluster42","namespace":"default","resourceVersion":"30446957","selfLink":"/apis/acid.zalan.do/v1/namespaces/default/postgresqls/acid-testcluster42","uid":"857cd208-33dc-11e7-b20a-0699041e4b03"},"spec":{"allowedSourceRanges":["185.85.220.0/22"],"numberOfInstances":1,"postgresql":{"version":"9.6"},"teamId":"acid","volume":{"size":"10Gi"}},"status":{"PostgresClusterStatus":"Running"}}],"kind":"List","metadata":{},"resourceVersion":"","selfLink":""}`), PostgresqlList{ TypeMeta: metav1.TypeMeta{ Kind: "List", @@ -350,8 +396,10 @@ var postgresqlList = []struct { AllowedSourceRanges: []string{"185.85.220.0/22"}, NumberOfInstances: 1, }, - Status: ClusterStatusRunning, - Error: "", + Status: PostgresStatus{ + PostgresClusterStatus: ClusterStatusRunning, + }, + Error: "", }}, }, nil}, @@ -469,6 +517,25 @@ func TestMarshalMaintenanceWindow(t *testing.T) { } } +func TestUnmarshalPostgresStatus(t *testing.T) { + for _, tt := range postgresStatus { + var ps PostgresStatus + err := ps.UnmarshalJSON(tt.in) + if err != nil { + if tt.err == nil || err.Error() != tt.err.Error() { + t.Errorf("CR status unmarshal expected error: %v, got %v", tt.err, err) + } + continue + //} else if tt.err != nil { + //t.Errorf("Expected error: %v", tt.err) + } + + if !reflect.DeepEqual(ps, tt.out) { + t.Errorf("Expected status: %#v, got: %#v", tt.out, ps) + } + } +} + func TestPostgresUnmarshal(t *testing.T) { for _, tt := range unmarshalCluster { var cluster Postgresql @@ -494,12 +561,26 @@ func TestMarshal(t *testing.T) { continue } + // Unmarshal and marshal example to capture api changes + var cluster Postgresql + err := cluster.UnmarshalJSON(tt.marshal) + if err != nil { + if tt.err == nil || err.Error() != tt.err.Error() { + t.Errorf("Backwards compatibility unmarshal expected error: %v, got: %v", tt.err, err) + } + continue + } + expected, err := json.Marshal(cluster) + if err != nil { + t.Errorf("Backwards compatibility marshal error: %v", err) + } + m, err := json.Marshal(tt.out) if err != nil { t.Errorf("Marshal error: %v", err) } - if !bytes.Equal(m, tt.marshal) { - t.Errorf("Marshal Postgresql \nexpected: %q, \ngot: %q", string(tt.marshal), string(m)) + if !bytes.Equal(m, expected) { + t.Errorf("Marshal Postgresql \nexpected: %q, \ngot: %q", string(expected), string(m)) } } } diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 0f5546f0f..7a27bb794 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -211,6 +211,7 @@ func (in *OperatorConfigurationData) DeepCopyInto(out *OperatorConfigurationData in.TeamsAPI.DeepCopyInto(&out.TeamsAPI) out.LoggingRESTAPI = in.LoggingRESTAPI out.Scalyr = in.Scalyr + out.LogicalBackup = in.LogicalBackup return } @@ -301,6 +302,22 @@ func (in *OperatorDebugConfiguration) DeepCopy() *OperatorDebugConfiguration { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OperatorLogicalBackupConfiguration) DeepCopyInto(out *OperatorLogicalBackupConfiguration) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OperatorLogicalBackupConfiguration. +func (in *OperatorLogicalBackupConfiguration) DeepCopy() *OperatorLogicalBackupConfiguration { + if in == nil { + return nil + } + out := new(OperatorLogicalBackupConfiguration) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OperatorTimeouts) DeepCopyInto(out *OperatorTimeouts) { *out = *in @@ -479,6 +496,22 @@ func (in *PostgresSpec) DeepCopy() *PostgresSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PostgresStatus) DeepCopyInto(out *PostgresStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PostgresStatus. +func (in *PostgresStatus) DeepCopy() *PostgresStatus { + if in == nil { + return nil + } + out := new(PostgresStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PostgresUsersConfiguration) DeepCopyInto(out *PostgresUsersConfiguration) { *out = *in @@ -501,6 +534,7 @@ func (in *Postgresql) DeepCopyInto(out *Postgresql) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status return } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index c319ab5d1..9cbc46e70 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -4,6 +4,7 @@ package cluster import ( "database/sql" + "encoding/json" "fmt" "reflect" "regexp" @@ -19,8 +20,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "encoding/json" - acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util" @@ -82,6 +81,7 @@ type Cluster struct { currentProcess Process processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex + } type compareStatefulsetResult struct { @@ -149,21 +149,24 @@ func (c *Cluster) setProcessName(procName string, args ...interface{}) { } } -func (c *Cluster) setStatus(status acidv1.PostgresStatus) { - // TODO: eventually switch to updateStatus() for kubernetes 1.11 and above - var ( - err error - b []byte - ) - if b, err = json.Marshal(status); err != nil { +// SetStatus of Postgres cluster +// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above +func (c *Cluster) setStatus(status string) { + var pgStatus acidv1.PostgresStatus + pgStatus.PostgresClusterStatus = status + + patch, err := json.Marshal(struct { + PgStatus interface{} `json:"status"` + }{&pgStatus}) + + if err != nil { c.logger.Errorf("could not marshal status: %v", err) } - patch := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) // we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ), // however, we could do patch without it. In the future, once /status subresource is there (starting Kubernets 1.11) // we should take advantage of it. - newspec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(c.Name, types.MergePatchType, patch) + newspec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(c.Name, types.MergePatchType, patch, "status") if err != nil { c.logger.Errorf("could not update status: %v", err) } @@ -172,7 +175,7 @@ func (c *Cluster) setStatus(status acidv1.PostgresStatus) { } func (c *Cluster) isNewCluster() bool { - return c.Status == acidv1.ClusterStatusCreating + return c.Status.Creating() } // initUsers populates c.systemUsers and c.pgUsers maps. @@ -296,6 +299,13 @@ func (c *Cluster) Create() error { c.logger.Infof("databases have been successfully created") } + if c.Postgresql.Spec.EnableLogicalBackup { + if err := c.createLogicalBackupJob(); err != nil { + return fmt.Errorf("could not create a k8s cron job for logical backups: %v", err) + } + c.logger.Info("a k8s cron job for logical backup has been successfully created") + } + if err := c.listResources(); err != nil { c.logger.Errorf("could not list resources: %v", err) } @@ -479,8 +489,10 @@ func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resource } -// Update changes Kubernetes objects according to the new specification. Unlike the sync case, the missing object. -// (i.e. service) is treated as an error. +// Update changes Kubernetes objects according to the new specification. Unlike the sync case, the missing object +// (i.e. service) is treated as an error +// logical backup cron jobs are an exception: a user-initiated Update can enable a logical backup job +// for a cluster that had no such job before. In this case a missing job is not an error. func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed := false @@ -567,6 +579,43 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } }() + // logical backup job + func() { + + // create if it did not exist + if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup { + c.logger.Debugf("creating backup cron job") + if err := c.createLogicalBackupJob(); err != nil { + c.logger.Errorf("could not create a k8s cron job for logical backups: %v", err) + updateFailed = true + return + } + } + + // delete if no longer needed + if oldSpec.Spec.EnableLogicalBackup && !newSpec.Spec.EnableLogicalBackup { + c.logger.Debugf("deleting backup cron job") + if err := c.deleteLogicalBackupJob(); err != nil { + c.logger.Errorf("could not delete a k8s cron job for logical backups: %v", err) + updateFailed = true + return + } + + } + + // apply schedule changes + // this is the only parameter of logical backups a user can overwrite in the cluster manifest + if (oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup) && + (newSpec.Spec.LogicalBackupSchedule != oldSpec.Spec.LogicalBackupSchedule) { + c.logger.Debugf("updating schedule of the backup cron job") + if err := c.syncLogicalBackupJob(); err != nil { + c.logger.Errorf("could not sync logical backup jobs: %v", err) + updateFailed = true + } + } + + }() + // Roles and Databases if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0) { c.logger.Debugf("syncing roles") @@ -595,6 +644,12 @@ func (c *Cluster) Delete() { c.mu.Lock() defer c.mu.Unlock() + // delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods + // deleting the cron job also removes pods and batch jobs it created + if err := c.deleteLogicalBackupJob(); err != nil { + c.logger.Warningf("could not remove the logical backup k8s cron job; %v", err) + } + if err := c.deleteStatefulSet(); err != nil { c.logger.Warningf("could not delete statefulset: %v", err) } @@ -627,6 +682,7 @@ func (c *Cluster) Delete() { if err := c.deletePatroniClusterObjects(); err != nil { c.logger.Warningf("could not remove leftover patroni objects; %v", err) } + } //NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status). diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 5ff00c7b3..6f10aae22 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -20,10 +20,20 @@ const ( ) var logger = logrus.New().WithField("test", "cluster") -var cl = New(Config{OpConfig: config.Config{ProtectedRoles: []string{"admin"}, - Auth: config.Auth{SuperUsername: superUserName, - ReplicationUsername: replicationUserName}}}, - k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) +var cl = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + }, + }, + k8sutil.NewMockKubernetesClient(), + acidv1.Postgresql{}, + logger, +) func TestInitRobotUsers(t *testing.T) { testName := "TestInitRobotUsers" diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index a156b89d6..4953fcfe9 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -8,7 +8,7 @@ import ( "github.com/sirupsen/logrus" "k8s.io/api/apps/v1beta1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -20,6 +20,8 @@ import ( "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" "k8s.io/apimachinery/pkg/labels" ) @@ -352,7 +354,7 @@ func generateVolumeMounts() []v1.VolumeMount { } } -func generateSpiloContainer( +func generateContainer( name string, dockerImage *string, resourceRequirements *v1.ResourceRequirements, @@ -792,7 +794,7 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*v1beta1.State // generate the spilo container c.logger.Debugf("Generating Spilo container, environment variables: %v", spiloEnvVars) - spiloContainer := generateSpiloContainer(c.containerName(), + spiloContainer := generateContainer(c.containerName(), &effectiveDockerImage, resourceRequirements, spiloEnvVars, @@ -1014,6 +1016,7 @@ func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string return nil, fmt.Errorf("could not parse volume size: %v", err) } + volumeMode := v1.PersistentVolumeFilesystem volumeClaim := &v1.PersistentVolumeClaim{ ObjectMeta: metadata, Spec: v1.PersistentVolumeClaimSpec{ @@ -1024,6 +1027,7 @@ func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string }, }, StorageClassName: storageClassName, + VolumeMode: &volumeMode, }, } @@ -1216,10 +1220,37 @@ func (c *Cluster) generateCloneEnvironment(description *acidv1.CloneDescription) }) } else { // cloning with S3, find out the bucket to clone + msg := "Clone from S3 bucket" + c.logger.Info(msg, description.S3WalPath) + + if description.S3WalPath == "" { + msg := "Figure out which S3 bucket to use from env" + c.logger.Info(msg, description.S3WalPath) + + envs := []v1.EnvVar{ + v1.EnvVar{ + Name: "CLONE_WAL_S3_BUCKET", + Value: c.OpConfig.WALES3Bucket, + }, + v1.EnvVar{ + Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX", + Value: getBucketScopeSuffix(description.UID), + }, + } + + result = append(result, envs...) + } else { + msg := "Use custom parsed S3WalPath %s from the manifest" + c.logger.Warningf(msg, description.S3WalPath) + + result = append(result, v1.EnvVar{ + Name: "CLONE_WALE_S3_PREFIX", + Value: description.S3WalPath, + }) + } + result = append(result, v1.EnvVar{Name: "CLONE_METHOD", Value: "CLONE_WITH_WALE"}) - result = append(result, v1.EnvVar{Name: "CLONE_WAL_S3_BUCKET", Value: c.OpConfig.WALES3Bucket}) result = append(result, v1.EnvVar{Name: "CLONE_TARGET_TIME", Value: description.EndTimestamp}) - result = append(result, v1.EnvVar{Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX", Value: getBucketScopeSuffix(description.UID)}) result = append(result, v1.EnvVar{Name: "CLONE_WAL_BUCKET_SCOPE_PREFIX", Value: ""}) } @@ -1252,3 +1283,167 @@ func (c *Cluster) getClusterServiceConnectionParameters(clusterName string) (hos port = "5432" return } + +func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) { + + var ( + err error + podTemplate *v1.PodTemplateSpec + resourceRequirements *v1.ResourceRequirements + ) + + // NB: a cron job creates standard batch jobs according to schedule; these batch jobs manage pods and clean-up + + c.logger.Debug("Generating logical backup pod template") + + // allocate for the backup pod the same amount of resources as for normal DB pods + defaultResources := c.makeDefaultResources() + resourceRequirements, err = generateResourceRequirements(c.Spec.Resources, defaultResources) + if err != nil { + return nil, fmt.Errorf("could not generate resource requirements for logical backup pods: %v", err) + } + + envVars := c.generateLogicalBackupPodEnvVars() + logicalBackupContainer := generateContainer( + "logical-backup", + &c.OpConfig.LogicalBackup.LogicalBackupDockerImage, + resourceRequirements, + envVars, + []v1.VolumeMount{}, + c.OpConfig.SpiloPrivileged, // use same value as for normal DB pods + ) + + labels := map[string]string{ + "version": c.Name, + "application": "spilo-logical-backup", + } + podAffinityTerm := v1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + TopologyKey: "kubernetes.io/hostname", + } + podAffinity := v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{{ + Weight: 1, + PodAffinityTerm: podAffinityTerm, + }, + }, + }} + + // re-use the method that generates DB pod templates + if podTemplate, err = generatePodTemplate( + c.Namespace, + c.labelsSet(true), + logicalBackupContainer, + []v1.Container{}, + []v1.Container{}, + &[]v1.Toleration{}, + nodeAffinity(c.OpConfig.NodeReadinessLabel), + int64(c.OpConfig.PodTerminateGracePeriod.Seconds()), + c.OpConfig.PodServiceAccountName, + c.OpConfig.KubeIAMRole, + "", + false, + false, + ""); err != nil { + return nil, fmt.Errorf("could not generate pod template for logical backup pod: %v", err) + } + + // overwrite specifc params of logical backups pods + podTemplate.Spec.Affinity = &podAffinity + podTemplate.Spec.RestartPolicy = "Never" // affects containers within a pod + + // configure a batch job + + jobSpec := batchv1.JobSpec{ + Template: *podTemplate, + } + + // configure a cron job + + jobTemplateSpec := batchv1beta1.JobTemplateSpec{ + Spec: jobSpec, + } + + schedule := c.Postgresql.Spec.LogicalBackupSchedule + if schedule == "" { + schedule = c.OpConfig.LogicalBackupSchedule + } + + cronJob := &batchv1beta1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.getLogicalBackupJobName(), + Namespace: c.Namespace, + Labels: c.labelsSet(true), + }, + Spec: batchv1beta1.CronJobSpec{ + Schedule: schedule, + JobTemplate: jobTemplateSpec, + ConcurrencyPolicy: batchv1beta1.ForbidConcurrent, + }, + } + + return cronJob, nil +} + +func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar { + + envVars := []v1.EnvVar{ + { + Name: "SCOPE", + Value: c.Name, + }, + // Bucket env vars + { + Name: "LOGICAL_BACKUP_S3_BUCKET", + Value: c.OpConfig.LogicalBackup.LogicalBackupS3Bucket, + }, + { + Name: "LOGICAL_BACKUP_S3_BUCKET_SCOPE_SUFFIX", + Value: getBucketScopeSuffix(string(c.Postgresql.GetUID())), + }, + // Postgres env vars + { + Name: "PG_VERSION", + Value: c.Spec.PgVersion, + }, + { + Name: "PGPORT", + Value: "5432", + }, + { + Name: "PGUSER", + Value: c.OpConfig.SuperUsername, + }, + { + Name: "PGDATABASE", + Value: c.OpConfig.SuperUsername, + }, + { + Name: "PGSSLMODE", + Value: "require", + }, + { + Name: "PGPASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: c.credentialSecretName(c.OpConfig.SuperUsername), + }, + Key: "password", + }, + }, + }, + } + + c.logger.Debugf("Generated logical backup env vars %v", envVars) + + return envVars +} + +// getLogicalBackupJobName returns the name; the job itself may not exists +func (c *Cluster) getLogicalBackupJobName() (jobName string) { + return "logical-backup-" + c.clusterName().Name +} diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 6b96f8ec1..dc48c0389 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -129,3 +129,82 @@ func TestShmVolume(t *testing.T) { } } } + +func TestCloneEnv(t *testing.T) { + testName := "TestCloneEnv" + tests := []struct { + subTest string + cloneOpts *acidv1.CloneDescription + env v1.EnvVar + envPos int + }{ + { + subTest: "custom s3 path", + cloneOpts: &acidv1.CloneDescription{ + ClusterName: "test-cluster", + S3WalPath: "s3://some/path/", + EndTimestamp: "somewhen", + }, + env: v1.EnvVar{ + Name: "CLONE_WALE_S3_PREFIX", + Value: "s3://some/path/", + }, + envPos: 1, + }, + { + subTest: "generated s3 path, bucket", + cloneOpts: &acidv1.CloneDescription{ + ClusterName: "test-cluster", + EndTimestamp: "somewhen", + UID: "0000", + }, + env: v1.EnvVar{ + Name: "CLONE_WAL_S3_BUCKET", + Value: "wale-bucket", + }, + envPos: 1, + }, + { + subTest: "generated s3 path, target time", + cloneOpts: &acidv1.CloneDescription{ + ClusterName: "test-cluster", + EndTimestamp: "somewhen", + UID: "0000", + }, + env: v1.EnvVar{ + Name: "CLONE_TARGET_TIME", + Value: "somewhen", + }, + envPos: 4, + }, + } + + var cluster = New( + Config{ + OpConfig: config.Config{ + WALES3Bucket: "wale-bucket", + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + for _, tt := range tests { + envs := cluster.generateCloneEnvironment(tt.cloneOpts) + + env := envs[tt.envPos] + + if env.Name != tt.env.Name { + t.Errorf("%s %s: Expected env name %s, have %s instead", + testName, tt.subTest, tt.env.Name, env.Name) + } + + if env.Value != tt.env.Value { + t.Errorf("%s %s: Expected env value %s, have %s instead", + testName, tt.subTest, tt.env.Value, env.Value) + } + + } +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 18c295804..e8674283a 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -6,7 +6,8 @@ import ( "strings" "k8s.io/api/apps/v1beta1" - "k8s.io/api/core/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -609,6 +610,51 @@ func (c *Cluster) createRoles() (err error) { return c.syncRoles() } +func (c *Cluster) createLogicalBackupJob() (err error) { + + c.setProcessName("creating a k8s cron job for logical backups") + + logicalBackupJobSpec, err := c.generateLogicalBackupJob() + if err != nil { + return fmt.Errorf("could not generate k8s cron job spec: %v", err) + } + c.logger.Debugf("Generated cronJobSpec: %v", logicalBackupJobSpec) + + _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Create(logicalBackupJobSpec) + if err != nil { + return fmt.Errorf("could not create k8s cron job: %v", err) + } + + return nil +} + +func (c *Cluster) patchLogicalBackupJob(newJob *batchv1beta1.CronJob) error { + c.setProcessName("patching logical backup job") + + patchData, err := specPatch(newJob.Spec) + if err != nil { + return fmt.Errorf("could not form patch for the logical backup job: %v", err) + } + + // update the backup job spec + _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch( + c.getLogicalBackupJobName(), + types.MergePatchType, + patchData, "") + if err != nil { + return fmt.Errorf("could not patch logical backup job: %v", err) + } + + return nil +} + +func (c *Cluster) deleteLogicalBackupJob() error { + + c.logger.Info("removing the logical backup job") + + return c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Delete(c.getLogicalBackupJobName(), c.deleteOptions) +} + // GetServiceMaster returns cluster's kubernetes master Service func (c *Cluster) GetServiceMaster() *v1.Service { return c.Services[Master] diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index d2157c8a2..f5ae30b81 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -3,7 +3,8 @@ package cluster import ( "fmt" - "k8s.io/api/core/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,7 +29,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { if err != nil { c.logger.Warningf("error while syncing cluster state: %v", err) c.setStatus(acidv1.ClusterStatusSyncFailed) - } else if c.Status != acidv1.ClusterStatusRunning { + } else if !c.Status.Running() { c.setStatus(acidv1.ClusterStatusRunning) } }() @@ -92,6 +93,16 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return err } + // create a logical backup job unless we are running without pods or disable that feature explicitly + if c.Spec.EnableLogicalBackup && c.getNumberOfInstances(&c.Spec) > 0 { + + c.logger.Debug("syncing logical backup job") + if err = c.syncLogicalBackupJob(); err != nil { + err = fmt.Errorf("could not sync the logical backup job: %v", err) + return err + } + } + return err } @@ -519,3 +530,56 @@ func (c *Cluster) syncDatabases() error { return nil } + +func (c *Cluster) syncLogicalBackupJob() error { + var ( + job *batchv1beta1.CronJob + desiredJob *batchv1beta1.CronJob + err error + ) + c.setProcessName("syncing the logical backup job") + + // sync the job if it exists + + jobName := c.getLogicalBackupJobName() + if job, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(jobName, metav1.GetOptions{}); err == nil { + + desiredJob, err = c.generateLogicalBackupJob() + if err != nil { + return fmt.Errorf("could not generate the desired logical backup job state: %v", err) + } + if match, reason := k8sutil.SameLogicalBackupJob(job, desiredJob); !match { + c.logger.Infof("logical job %q is not in the desired state and needs to be updated", + c.getLogicalBackupJobName(), + ) + if reason != "" { + c.logger.Infof("reason: %s", reason) + } + if err = c.patchLogicalBackupJob(desiredJob); err != nil { + return fmt.Errorf("could not update logical backup job to match desired state: %v", err) + } + c.logger.Info("the logical backup job is synced") + } + return nil + } + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get logical backp job: %v", err) + } + + // no existing logical backup job, create new one + c.logger.Info("could not find the cluster's logical backup job") + + if err = c.createLogicalBackupJob(); err == nil { + c.logger.Infof("created missing logical backup job %q", jobName) + } else { + if !k8sutil.ResourceAlreadyExists(err) { + return fmt.Errorf("could not create missing logical backup job: %v", err) + } + c.logger.Infof("logical backup job %q already exists", jobName) + if _, err = c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get(jobName, metav1.GetOptions{}); err != nil { + return fmt.Errorf("could not fetch existing logical backup job: %v", err) + } + } + + return nil +} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 93268cb93..5b531cc90 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -12,7 +12,7 @@ import ( "time" "k8s.io/api/apps/v1beta1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index bb4f89918..245754e1c 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -106,5 +106,9 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit + result.LogicalBackupSchedule = fromCRD.LogicalBackup.Schedule + result.LogicalBackupDockerImage = fromCRD.LogicalBackup.DockerImage + result.LogicalBackupS3Bucket = fromCRD.LogicalBackup.S3Bucket + return result } diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 57633ae82..f9fc4468a 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -1,11 +1,13 @@ package controller import ( + "encoding/json" "fmt" "k8s.io/api/core/v1" apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" @@ -52,7 +54,15 @@ func (c *Controller) createOperatorCRD(crd *apiextv1beta1.CustomResourceDefiniti if !k8sutil.ResourceAlreadyExists(err) { return fmt.Errorf("could not create customResourceDefinition: %v", err) } - c.logger.Infof("customResourceDefinition %q is already registered", crd.Name) + c.logger.Infof("customResourceDefinition %q is already registered and will only be updated", crd.Name) + + patch, err := json.Marshal(crd) + if err != nil { + return fmt.Errorf("could not marshal new customResourceDefintion: %v", err) + } + if _, err := c.KubeClient.CustomResourceDefinitions().Patch(crd.Name, types.MergePatchType, patch); err != nil { + return fmt.Errorf("could not update customResourceDefinition: %v", err) + } } else { c.logger.Infof("customResourceDefinition %q has been registered", crd.Name) } diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go index cb782904c..c9e16cbd9 100644 --- a/pkg/controller/util_test.go +++ b/pkg/controller/util_test.go @@ -6,82 +6,24 @@ import ( "testing" b64 "encoding/base64" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util/k8sutil" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( testInfrastructureRolesSecretName = "infrastructureroles-test" ) -type mockSecret struct { - v1core.SecretInterface -} - -type mockConfigMap struct { - v1core.ConfigMapInterface -} - -func (c *mockSecret) Get(name string, options metav1.GetOptions) (*v1.Secret, error) { - if name != testInfrastructureRolesSecretName { - return nil, fmt.Errorf("NotFound") - } - secret := &v1.Secret{} - secret.Name = mockController.opConfig.ClusterNameLabel - secret.Data = map[string][]byte{ - "user1": []byte("testrole"), - "password1": []byte("testpassword"), - "inrole1": []byte("testinrole"), - "foobar": []byte(b64.StdEncoding.EncodeToString([]byte("password"))), - } - return secret, nil - -} - -func (c *mockConfigMap) Get(name string, options metav1.GetOptions) (*v1.ConfigMap, error) { - if name != testInfrastructureRolesSecretName { - return nil, fmt.Errorf("NotFound") - } - configmap := &v1.ConfigMap{} - configmap.Name = mockController.opConfig.ClusterNameLabel - configmap.Data = map[string]string{ - "foobar": "{}", - } - return configmap, nil -} - -type MockSecretGetter struct { -} - -type MockConfigMapsGetter struct { -} - -func (c *MockSecretGetter) Secrets(namespace string) v1core.SecretInterface { - return &mockSecret{} -} - -func (c *MockConfigMapsGetter) ConfigMaps(namespace string) v1core.ConfigMapInterface { - return &mockConfigMap{} -} - -func newMockKubernetesClient() k8sutil.KubernetesClient { - return k8sutil.KubernetesClient{ - SecretsGetter: &MockSecretGetter{}, - ConfigMapsGetter: &MockConfigMapsGetter{}, - } -} - func newMockController() *Controller { controller := NewController(&spec.ControllerConfig{}) controller.opConfig.ClusterNameLabel = "cluster-name" controller.opConfig.InfrastructureRolesSecretName = spec.NamespacedName{Namespace: v1.NamespaceDefault, Name: testInfrastructureRolesSecretName} controller.opConfig.Workers = 4 - controller.KubeClient = newMockKubernetesClient() + controller.KubeClient = k8sutil.NewMockKubernetesClient() return controller } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index b2374e042..0cd662a6e 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -66,12 +66,20 @@ type Scalyr struct { ScalyrMemoryLimit string `name:"scalyr_memory_limit" default:"1Gi"` } +// LogicalBackup +type LogicalBackup struct { + LogicalBackupSchedule string `name:"logical_backup_schedule" default:"30 00 * * *"` + LogicalBackupDockerImage string `name:"logical_backup_docker_image" default:"registry.opensource.zalan.do/acid/logical-backup"` + LogicalBackupS3Bucket string `name:"logical_backup_s3_bucket" default:""` +} + // Config describes operator config type Config struct { CRD Resources Auth Scalyr + LogicalBackup WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to' EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use k8s as a DCS diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 6f5b15085..bd10256e0 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -2,8 +2,15 @@ package k8sutil import ( "fmt" + "reflect" + + b64 "encoding/base64" + + batchv1beta1 "k8s.io/api/batch/v1beta1" + clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1" + "github.com/zalando/postgres-operator/pkg/util/constants" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1" @@ -15,9 +22,9 @@ import ( rbacv1beta1 "k8s.io/client-go/kubernetes/typed/rbac/v1beta1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "reflect" acidv1client "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // KubernetesClient describes getters for Kubernetes objects @@ -36,11 +43,26 @@ type KubernetesClient struct { rbacv1beta1.RoleBindingsGetter policyv1beta1.PodDisruptionBudgetsGetter apiextbeta1.CustomResourceDefinitionsGetter + clientbatchv1beta1.CronJobsGetter RESTClient rest.Interface AcidV1ClientSet *acidv1client.Clientset } +type mockSecret struct { + v1core.SecretInterface +} + +type MockSecretGetter struct { +} + +type mockConfigMap struct { + v1core.ConfigMapInterface +} + +type MockConfigMapsGetter struct { +} + // RestConfig creates REST config func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { if outOfCluster { @@ -83,6 +105,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1() kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RoleBindingsGetter = client.RbacV1beta1() + kubeClient.CronJobsGetter = client.BatchV1beta1() apiextClient, err := apiextclient.NewForConfig(cfg) if err != nil { @@ -140,3 +163,71 @@ func SamePDB(cur, new *policybeta1.PodDisruptionBudget) (match bool, reason stri return } + +func getJobImage(cronJob *batchv1beta1.CronJob) string { + return cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image +} + +// SameLogicalBackupJob compares Specs of logical backup cron jobs +func SameLogicalBackupJob(cur, new *batchv1beta1.CronJob) (match bool, reason string) { + + if cur.Spec.Schedule != new.Spec.Schedule { + return false, fmt.Sprintf("new job's schedule %q doesn't match the current one %q", + new.Spec.Schedule, cur.Spec.Schedule) + } + + newImage := getJobImage(new) + curImage := getJobImage(cur) + if newImage != curImage { + return false, fmt.Sprintf("new job's image %q doesn't match the current one %q", + newImage, curImage) + } + + return true, "" +} + +func (c *mockSecret) Get(name string, options metav1.GetOptions) (*v1.Secret, error) { + if name != "infrastructureroles-test" { + return nil, fmt.Errorf("NotFound") + } + secret := &v1.Secret{} + secret.Name = "testcluster" + secret.Data = map[string][]byte{ + "user1": []byte("testrole"), + "password1": []byte("testpassword"), + "inrole1": []byte("testinrole"), + "foobar": []byte(b64.StdEncoding.EncodeToString([]byte("password"))), + } + return secret, nil + +} + +func (c *mockConfigMap) Get(name string, options metav1.GetOptions) (*v1.ConfigMap, error) { + if name != "infrastructureroles-test" { + return nil, fmt.Errorf("NotFound") + } + configmap := &v1.ConfigMap{} + configmap.Name = "testcluster" + configmap.Data = map[string]string{ + "foobar": "{}", + } + return configmap, nil +} + +// Secrets to be mocked +func (c *MockSecretGetter) Secrets(namespace string) v1core.SecretInterface { + return &mockSecret{} +} + +// ConfigMaps to be mocked +func (c *MockConfigMapsGetter) ConfigMaps(namespace string) v1core.ConfigMapInterface { + return &mockConfigMap{} +} + +// NewMockKubernetesClient for other tests +func NewMockKubernetesClient() KubernetesClient { + return KubernetesClient{ + SecretsGetter: &MockSecretGetter{}, + ConfigMapsGetter: &MockConfigMapsGetter{}, + } +} diff --git a/run_operator_locally.sh b/run_operator_locally.sh index 2594097b2..ee0768354 100755 --- a/run_operator_locally.sh +++ b/run_operator_locally.sh @@ -82,15 +82,15 @@ function clean_up(){ function start_minikube(){ - echo "==== START MINIKUBE ==== " + echo "==== START MINIKUBE ====" echo "May take a few minutes ..." minikube start kubectl config set-context minikube - echo "==== MINIKUBE STATUS ==== " + echo "==== MINIKUBE STATUS ====" minikube status - + echo "" } @@ -133,7 +133,7 @@ function deploy_self_built_image() { function start_operator(){ - echo "==== START OPERATOR ==== " + echo "==== START OPERATOR ====" echo "Certain operations may be retried multiple times..." # the order of resource initialization is significant