Allow cloning clusters from the operator. (#90)
Allow cloning clusters from the operator. The changes add a new JSON node `clone` with possible values `cluster` and `timestamp`. `cluster` is mandatory, and setting a non-empty `timestamp` triggers wal-e point in time recovery. Spilo and Patroni do the whole heavy-lifting, the operator just defines certain variables and gathers some data about how to connect to the host to clone or the target S3 bucket. As a minor change, set the image pull policy to IfNotPresent instead of Always to simplify local testing. Change the default replication username to standby.
This commit is contained in:
		
							parent
							
								
									a0a9e8f849
								
							
						
					
					
						commit
						8b85935a7a
					
				|  | @ -21,7 +21,7 @@ data: | |||
|   pod_label_wait_timeout: 10m | ||||
|   ready_wait_interval: 3s | ||||
|   ready_wait_timeout: 30s | ||||
|   replication_username: replication | ||||
|   replication_username: standby | ||||
|   resource_check_interval: 3s | ||||
|   resource_check_timeout: 10m | ||||
|   resync_period: 5m | ||||
|  |  | |||
|  | @ -226,7 +226,10 @@ PATRONI_INITDB_PARAMS: | |||
| 	return string(result) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequirements, pgParameters *spec.PostgresqlParam, patroniParameters *spec.Patroni) *v1.PodTemplateSpec { | ||||
| func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequirements, | ||||
| 	pgParameters *spec.PostgresqlParam, | ||||
| 	patroniParameters *spec.Patroni, | ||||
| 	cloneDescription *spec.CloneDescription) *v1.PodTemplateSpec { | ||||
| 	spiloConfiguration := c.generateSpiloJSONConfiguration(pgParameters, patroniParameters) | ||||
| 
 | ||||
| 	envVars := []v1.EnvVar{ | ||||
|  | @ -301,11 +304,17 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme | |||
| 	if c.OpConfig.WALES3Bucket != "" { | ||||
| 		envVars = append(envVars, v1.EnvVar{Name: "WAL_S3_BUCKET", Value: c.OpConfig.WALES3Bucket}) | ||||
| 	} | ||||
| 	if cloneDescription.ClusterName != "" { | ||||
| 		cloneVars := c.generateCloneEnvironment(cloneDescription) | ||||
| 		for _, v := range cloneVars { | ||||
| 			envVars = append(envVars, v) | ||||
| 		} | ||||
| 	} | ||||
| 	privilegedMode := bool(true) | ||||
| 	container := v1.Container{ | ||||
| 		Name:            c.containerName(), | ||||
| 		Image:           c.OpConfig.DockerImage, | ||||
| 		ImagePullPolicy: v1.PullAlways, | ||||
| 		ImagePullPolicy: v1.PullIfNotPresent, | ||||
| 		Resources:       *resourceRequirements, | ||||
| 		Ports: []v1.ContainerPort{ | ||||
| 			{ | ||||
|  | @ -357,13 +366,13 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme | |||
| func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.StatefulSet, error) { | ||||
| 	resourceRequirements, err := c.resourceRequirements(spec.Resources) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 		return nil, fmt.Errorf("could not generate resource requirements: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	podTemplate := c.generatePodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni) | ||||
| 	podTemplate := c.generatePodTemplate(resourceRequirements, &spec.PostgresqlParam, &spec.Patroni, &spec.Clone) | ||||
| 	volumeClaimTemplate, err := generatePersistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 		return nil, fmt.Errorf("could not generate volume claim template: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	statefulSet := &v1beta1.StatefulSet{ | ||||
|  | @ -523,3 +532,50 @@ func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpo | |||
| 
 | ||||
| 	return endpoints | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) generateCloneEnvironment(description *spec.CloneDescription) []v1.EnvVar { | ||||
| 	result := make([]v1.EnvVar, 0) | ||||
| 	if description.ClusterName == "" { | ||||
| 		return result | ||||
| 	} | ||||
| 	cluster := description.ClusterName | ||||
| 	result = append(result, v1.EnvVar{Name: "CLONE_SCOPE", Value: cluster}) | ||||
| 	if description.EndTimestamp == "" { | ||||
| 		// cloning with basebackup, make a connection string to the cluster to clone from
 | ||||
| 		host, port := c.getClusterServiceConnectionParameters(cluster) | ||||
| 		// TODO: make some/all of those constants
 | ||||
| 		result = append(result, v1.EnvVar{Name: "CLONE_METHOD", Value: "CLONE_WITH_BASEBACKUP"}) | ||||
| 		result = append(result, v1.EnvVar{Name: "CLONE_HOST", Value: host}) | ||||
| 		result = append(result, v1.EnvVar{Name: "CLONE_PORT", Value: port}) | ||||
| 		// TODO: assume replication user name is the same for all clusters, fetch it from secrets otherwise
 | ||||
| 		result = append(result, v1.EnvVar{Name: "CLONE_USER", Value: c.OpConfig.ReplicationUsername}) | ||||
| 		result = append(result, | ||||
| 			v1.EnvVar{Name: "CLONE_PASSWORD", | ||||
| 				ValueFrom: &v1.EnvVarSource{ | ||||
| 					SecretKeyRef: &v1.SecretKeySelector{ | ||||
| 						LocalObjectReference: v1.LocalObjectReference{ | ||||
| 							Name: c.credentialSecretNameForCluster(c.OpConfig.ReplicationUsername, | ||||
| 								description.ClusterName), | ||||
| 						}, | ||||
| 						Key: "password", | ||||
| 					}, | ||||
| 				}, | ||||
| 			}) | ||||
| 	} else { | ||||
| 		// cloning with S3, find out the bucket to clone
 | ||||
| 		clone_wal_s3_bucket := c.OpConfig.WALES3Bucket | ||||
| 		result = append(result, v1.EnvVar{Name: "CLONE_METHOD", Value: "CLONE_WITH_WALE"}) | ||||
| 		result = append(result, v1.EnvVar{Name: "CLONE_WAL_S3_BUCKET", Value: clone_wal_s3_bucket}) | ||||
| 		result = append(result, v1.EnvVar{Name: "CLONE_TARGET_TIME", Value: description.EndTimestamp}) | ||||
| 	} | ||||
| 	return result | ||||
| } | ||||
| 
 | ||||
| // getClusterServiceConnectionParameters fetches cluster host name and port
 | ||||
| // TODO: perhaps we need to query the service (i.e. if non-standard port is used?)
 | ||||
| // TODO: handle clusters in different namespaces
 | ||||
| func (c *Cluster) getClusterServiceConnectionParameters(clusterName string) (host string, port string) { | ||||
| 	host = clusterName | ||||
| 	port = "5432" | ||||
| 	return | ||||
| } | ||||
|  |  | |||
|  | @ -304,11 +304,15 @@ func (c *Cluster) replicaDNSName() string { | |||
| } | ||||
| 
 | ||||
| func (c *Cluster) credentialSecretName(username string) string { | ||||
| 	return c.credentialSecretNameForCluster(username, c.Name) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) credentialSecretNameForCluster(username string, clusterName string) string { | ||||
| 	// secret  must consist of lower case alphanumeric characters, '-' or '.',
 | ||||
| 	// and must start and end with an alphanumeric character
 | ||||
| 	return fmt.Sprintf(constants.UserSecretTemplate, | ||||
| 		strings.Replace(username, "_", "-", -1), | ||||
| 		c.Name) | ||||
| 		clusterName) | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) podSpiloRole(pod *v1.Pod) string { | ||||
|  |  | |||
|  | @ -51,6 +51,12 @@ type Patroni struct { | |||
| 	MaximumLagOnFailover float32           `json:"maximum_lag_on_failover"` // float32 because https://github.com/kubernetes/kubernetes/issues/30213
 | ||||
| } | ||||
| 
 | ||||
| // CloneDescription describes which cluster the new should clone and up to which point in time
 | ||||
| type CloneDescription struct { | ||||
| 	ClusterName  string `json:"cluster,omitempty"` | ||||
| 	EndTimestamp string `json:"timestamp,omitempty"` | ||||
| } | ||||
| 
 | ||||
| type userFlags []string | ||||
| 
 | ||||
| // PostgresStatus contains status of the PostgreSQL cluster (running, creation failed etc.)
 | ||||
|  | @ -86,12 +92,13 @@ type PostgresSpec struct { | |||
| 
 | ||||
| 	TeamID              string   `json:"teamId"` | ||||
| 	AllowedSourceRanges []string `json:"allowedSourceRanges"` | ||||
| 	// EnableLoadBalancer  is a pointer, since it is importat to know if that parameters is omitted from the manifest
 | ||||
| 	// EnableLoadBalancer  is a pointer, since it is important to know if that parameters is omitted from the manifest
 | ||||
| 	UseLoadBalancer     *bool                `json:"useLoadBalancer,omitempty"` | ||||
| 	ReplicaLoadBalancer bool                 `json:"replicaLoadBalancer,omitempty"` | ||||
| 	NumberOfInstances   int32                `json:"numberOfInstances"` | ||||
| 	Users               map[string]userFlags `json:"users"` | ||||
| 	MaintenanceWindows  []MaintenanceWindow  `json:"maintenanceWindows,omitempty"` | ||||
| 	Clone               CloneDescription     `json:"clone"` | ||||
| 	ClusterName         string               `json:"-"` | ||||
| } | ||||
| 
 | ||||
|  | @ -236,6 +243,15 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { | |||
| 		tmp2.Error = err | ||||
| 		tmp2.Status = ClusterStatusInvalid | ||||
| 	} | ||||
| 	// The assumption below is that a cluster to clone, if any, belongs to the same team
 | ||||
| 	if tmp2.Spec.Clone.ClusterName != "" { | ||||
| 		_, err := extractClusterName(tmp2.Spec.Clone.ClusterName, tmp2.Spec.TeamID) | ||||
| 		if err != nil { | ||||
| 			tmp2.Error = fmt.Errorf("%s for the cluster to clone", err) | ||||
| 			tmp2.Spec.Clone = CloneDescription{} | ||||
| 			tmp2.Status = ClusterStatusInvalid | ||||
| 		} | ||||
| 	} | ||||
| 	*p = tmp2 | ||||
| 
 | ||||
| 	return nil | ||||
|  |  | |||
|  | @ -119,7 +119,7 @@ var unmarshalCluster = []struct { | |||
| 			Field:  "teamId", | ||||
| 		}, | ||||
| 	}, | ||||
| 	[]byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"","allowedSourceRanges":null,"numberOfInstances":0,"users":null},"status":"Invalid"}`), nil}, | ||||
| 	[]byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":"Invalid"}`), nil}, | ||||
| 	{[]byte(`{ | ||||
|   "kind": "Postgresql", | ||||
|   "apiVersion": "acid.zalan.do/v1", | ||||
|  | @ -160,6 +160,9 @@ var unmarshalCluster = []struct { | |||
|         "memory": "3000Mi" | ||||
|       } | ||||
|     }, | ||||
|     "clone" : { | ||||
|      "cluster": "acid-batman" | ||||
|      }, | ||||
|     "patroni": { | ||||
|       "initdb": { | ||||
|         "encoding": "UTF8", | ||||
|  | @ -219,6 +222,7 @@ var unmarshalCluster = []struct { | |||
| 					ResourceRequest: ResourceDescription{CPU: "10m", Memory: "50Mi"}, | ||||
| 					ResourceLimits:  ResourceDescription{CPU: "300m", Memory: "3000Mi"}, | ||||
| 				}, | ||||
| 
 | ||||
| 				TeamID:              "ACID", | ||||
| 				AllowedSourceRanges: []string{"127.0.0.1/32"}, | ||||
| 				NumberOfInstances:   2, | ||||
|  | @ -241,11 +245,14 @@ var unmarshalCluster = []struct { | |||
| 						EndTime:   mustParseTime("05:15"), | ||||
| 					}, | ||||
| 				}, | ||||
| 				Clone: CloneDescription{ | ||||
| 					ClusterName: "acid-batman", | ||||
| 				}, | ||||
| 				ClusterName: "testcluster1", | ||||
| 			}, | ||||
| 			Error: nil, | ||||
| 		}, | ||||
| 		[]byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"9.6","parameters":{"log_statement":"all","max_connections":"10","shared_buffers":"32MB"}},"volume":{"size":"5Gi","storageClass":"SSD"},"patroni":{"initdb":{"data-checksums":"true","encoding":"UTF8","locale":"en_US.UTF-8"},"pg_hba":["hostssl all all 0.0.0.0/0 md5","host    all all 0.0.0.0/0 md5"],"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432},"resources":{"requests":{"cpu":"10m","memory":"50Mi"},"limits":{"cpu":"300m","memory":"3000Mi"}},"teamId":"ACID","allowedSourceRanges":["127.0.0.1/32"],"numberOfInstances":2,"users":{"zalando":["superuser","createdb"]},"maintenanceWindows":["Mon:01:00-06:00","Sat:00:00-04:00","05:00-05:15"]}}`), nil}, | ||||
| 		[]byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"9.6","parameters":{"log_statement":"all","max_connections":"10","shared_buffers":"32MB"}},"volume":{"size":"5Gi","storageClass":"SSD"},"patroni":{"initdb":{"data-checksums":"true","encoding":"UTF8","locale":"en_US.UTF-8"},"pg_hba":["hostssl all all 0.0.0.0/0 md5","host    all all 0.0.0.0/0 md5"],"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":33554432},"resources":{"requests":{"cpu":"10m","memory":"50Mi"},"limits":{"cpu":"300m","memory":"3000Mi"}},"teamId":"ACID","allowedSourceRanges":["127.0.0.1/32"],"numberOfInstances":2,"users":{"zalando":["superuser","createdb"]},"maintenanceWindows":["Mon:01:00-06:00","Sat:00:00-04:00","05:00-05:15"],"clone":{"cluster":"acid-batman"}}}`), nil}, | ||||
| 	{ | ||||
| 		[]byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1","metadata": {"name": "teapot-testcluster1"}, "spec": {"teamId": "acid"}}`), | ||||
| 		Postgresql{ | ||||
|  | @ -260,12 +267,31 @@ var unmarshalCluster = []struct { | |||
| 			Status: ClusterStatusInvalid, | ||||
| 			Error:  errors.New("name must match {TEAM}-{NAME} format"), | ||||
| 		}, | ||||
| 		[]byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"teapot-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null},"status":"Invalid"}`), nil}, | ||||
| 		[]byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"teapot-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":"Invalid"}`), nil}, | ||||
| 	{ | ||||
| 		in: []byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1","metadata": {"name": "acid-testcluster1"}, "spec": {"teamId": "acid", "clone": {"cluster": "team-batman"}}}`), | ||||
| 		out: Postgresql{ | ||||
| 			TypeMeta: metav1.TypeMeta{ | ||||
| 				Kind:       "Postgresql", | ||||
| 				APIVersion: "acid.zalan.do/v1", | ||||
| 			}, | ||||
| 			ObjectMeta: metav1.ObjectMeta{ | ||||
| 				Name: "acid-testcluster1", | ||||
| 			}, | ||||
| 			Spec: PostgresSpec{ | ||||
| 				TeamID:      "acid", | ||||
| 				Clone:       CloneDescription{}, | ||||
| 				ClusterName: "testcluster1", | ||||
| 			}, | ||||
| 			Status: ClusterStatusInvalid, | ||||
| 			Error:  errors.New("name must match {TEAM}-{NAME} format for the cluster to clone"), | ||||
| 		}, | ||||
| 		marshal: []byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster1","creationTimestamp":null},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":"Invalid"}`), err: nil}, | ||||
| 	{[]byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1"`), | ||||
| 		Postgresql{}, | ||||
| 		[]byte{}, | ||||
| 		errors.New("unexpected end of JSON input")}, | ||||
| 	{[]byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster","creationTimestamp":qaz},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null},"status":"Invalid"}`), | ||||
| 	{[]byte(`{"kind":"Postgresql","apiVersion":"acid.zalan.do/v1","metadata":{"name":"acid-testcluster","creationTimestamp":qaz},"spec":{"postgresql":{"version":"","parameters":null},"volume":{"size":"","storageClass":""},"patroni":{"initdb":null,"pg_hba":null,"ttl":0,"loop_wait":0,"retry_timeout":0,"maximum_lag_on_failover":0},"resources":{"requests":{"cpu":"","memory":""},"limits":{"cpu":"","memory":""}},"teamId":"acid","allowedSourceRanges":null,"numberOfInstances":0,"users":null,"clone":{}},"status":"Invalid"}`), | ||||
| 		Postgresql{}, | ||||
| 		[]byte{}, | ||||
| 		errors.New("invalid character 'q' looking for beginning of value")}} | ||||
|  |  | |||
|  | @ -38,7 +38,7 @@ type Auth struct { | |||
| 	OAuthTokenSecretName          spec.NamespacedName `name:"oauth_token_secret_name" default:"postgresql-operator"` | ||||
| 	InfrastructureRolesSecretName spec.NamespacedName `name:"infrastructure_roles_secret_name"` | ||||
| 	SuperUsername                 string              `name:"super_username" default:"postgres"` | ||||
| 	ReplicationUsername           string              `name:"replication_username" default:"replication"` | ||||
| 	ReplicationUsername           string              `name:"replication_username" default:"standby"` | ||||
| } | ||||
| 
 | ||||
| // Config describes operator config
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue