merge with master again
This commit is contained in:
		
						commit
						7523f10d9f
					
				|  | @ -273,6 +273,26 @@ spec: | |||
|                   type: object | ||||
|                   additionalProperties: | ||||
|                     type: string | ||||
|             preparedDatabases: | ||||
|               type: object | ||||
|               additionalProperties: | ||||
|                 type: object | ||||
|                 properties: | ||||
|                   defaultUsers: | ||||
|                     type: boolean | ||||
|                   extensions: | ||||
|                     type: object | ||||
|                     additionalProperties: | ||||
|                       type: string | ||||
|                   schemas: | ||||
|                     type: object | ||||
|                     additionalProperties: | ||||
|                       type: object | ||||
|                       properties: | ||||
|                         defaultUsers: | ||||
|                           type: boolean | ||||
|                         defaultRoles: | ||||
|                           type: boolean | ||||
|             replicaLoadBalancer:  # deprecated | ||||
|               type: boolean | ||||
|             resources: | ||||
|  |  | |||
							
								
								
									
										169
									
								
								docs/user.md
								
								
								
								
							
							
						
						
									
										169
									
								
								docs/user.md
								
								
								
								
							|  | @ -94,7 +94,10 @@ created on every cluster managed by the operator. | |||
| * `teams API roles`: automatically create users for every member of the team | ||||
| owning the database cluster. | ||||
| 
 | ||||
| In the next sections, we will cover those use cases in more details. | ||||
| In the next sections, we will cover those use cases in more details. Note, that | ||||
| the Postgres Operator can also create databases with pre-defined owner, reader | ||||
| and writer roles which saves you the manual setup. Read more in the next | ||||
| chapter. | ||||
| 
 | ||||
| ### Manifest roles | ||||
| 
 | ||||
|  | @ -216,6 +219,166 @@ to choose superusers, group roles, [PAM configuration](https://github.com/CyberD | |||
| etc. An OAuth2 token can be passed to the Teams API via a secret. The name for | ||||
| this secret is configurable with the `oauth_token_secret_name` parameter. | ||||
| 
 | ||||
| ## Prepared databases with roles and default privileges | ||||
| 
 | ||||
| The `users` section in the manifests only allows for creating database roles | ||||
| with global privileges. Fine-grained data access control or role membership can | ||||
| not be defined and must be set up by the user in the database. But, the Postgres | ||||
| Operator offers a separate section to specify `preparedDatabases` that will be | ||||
| created with pre-defined owner, reader and writer roles for each individual | ||||
| database and, optionally, for each database schema, too. `preparedDatabases` | ||||
| also enable users to specify PostgreSQL extensions that shall be created in a | ||||
| given database schema. | ||||
| 
 | ||||
| ### Default database and schema | ||||
| 
 | ||||
| A prepared database is already created by adding an empty `preparedDatabases` | ||||
| section to the manifest. The database will then be called like the Postgres | ||||
| cluster manifest (`-` are replaced with `_`) and will also contain a schema | ||||
| called `data`. | ||||
| 
 | ||||
| ```yaml | ||||
| spec: | ||||
|   preparedDatabases: {} | ||||
| ``` | ||||
| 
 | ||||
| ### Default NOLOGIN roles | ||||
| 
 | ||||
| Given an example with a specified database and schema: | ||||
| 
 | ||||
| ```yaml | ||||
| spec: | ||||
|   preparedDatabases: | ||||
|     foo: | ||||
|       schemas: | ||||
|         bar: {} | ||||
| ``` | ||||
| 
 | ||||
| Postgres Operator will create the following NOLOGIN roles: | ||||
| 
 | ||||
| | Role name      | Member of      | Admin         | | ||||
| | -------------- | -------------- | ------------- | | ||||
| | foo_owner      |                | admin         | | ||||
| | foo_reader     |                | foo_owner     | | ||||
| | foo_writer     | foo_reader     | foo_owner     | | ||||
| | foo_bar_owner  |                | foo_owner     | | ||||
| | foo_bar_reader |                | foo_bar_owner | | ||||
| | foo_bar_writer | foo_bar_reader | foo_bar_owner | | ||||
| 
 | ||||
| The `<dbname>_owner` role is the database owner and should be used when creating | ||||
| new database objects. All members of the `admin` role, e.g. teams API roles, can | ||||
| become the owner with the `SET ROLE` command. [Default privileges](https://www.postgresql.org/docs/12/sql-alterdefaultprivileges.html) | ||||
| are configured for the owner role so that the `<dbname>_reader` role | ||||
| automatically gets read-access (SELECT) to new tables and sequences and the | ||||
| `<dbname>_writer` receives write-access (INSERT, UPDATE, DELETE on tables, | ||||
| USAGE and UPDATE on sequences). Both get USAGE on types and EXECUTE on | ||||
| functions. | ||||
| 
 | ||||
| The same principle applies for database schemas which are owned by the | ||||
| `<dbname>_<schema>_owner` role. `<dbname>_<schema>_reader` is read-only, | ||||
| `<dbname>_<schema>_writer` has write access and inherit reading from the reader | ||||
| role. Note, that the `<dbname>_*` roles have access incl. default privileges on | ||||
| all schemas, too. If you don't need the dedicated schema roles - i.e. you only | ||||
| use one schema - you can disable the creation like this: | ||||
| 
 | ||||
| ```yaml | ||||
| spec: | ||||
|   preparedDatabases: | ||||
|     foo: | ||||
|       schemas: | ||||
|         bar: | ||||
|           defaultRoles: false | ||||
| ``` | ||||
| 
 | ||||
| Then, the schemas are owned by the database owner, too. | ||||
| 
 | ||||
| ### Default LOGIN roles | ||||
| 
 | ||||
| The roles described in the previous paragraph can be granted to LOGIN roles from | ||||
| the `users` section in the manifest. Optionally, the Postgres Operator can also | ||||
| create default LOGIN roles for the database an each schema individually. These | ||||
| roles will get the `_user` suffix and they inherit all rights from their NOLOGIN | ||||
| counterparts. | ||||
| 
 | ||||
| | Role name           | Member of      | Admin         | | ||||
| | ------------------- | -------------- | ------------- | | ||||
| | foo_owner_user      | foo_owner      | admin         | | ||||
| | foo_reader_user     | foo_reader     | foo_owner     | | ||||
| | foo_writer_user     | foo_writer     | foo_owner     | | ||||
| | foo_bar_owner_user  | foo_bar_owner  | foo_owner     | | ||||
| | foo_bar_reader_user | foo_bar_reader | foo_bar_owner | | ||||
| | foo_bar_writer_user | foo_bar_writer | foo_bar_owner | | ||||
| 
 | ||||
| These default users are enabled in the manifest with the `defaultUsers` flag: | ||||
| 
 | ||||
| ```yaml | ||||
| spec: | ||||
|   preparedDatabases: | ||||
|     foo: | ||||
|       defaultUsers: true | ||||
|       schemas: | ||||
|         bar: | ||||
|           defaultUsers: true | ||||
| ``` | ||||
| 
 | ||||
| ### Database extensions | ||||
| 
 | ||||
| Prepared databases also allow for creating Postgres extensions. They will be | ||||
| created by the database owner in the specified schema. | ||||
| 
 | ||||
| ```yaml | ||||
| spec: | ||||
|   preparedDatabases: | ||||
|     foo: | ||||
|       extensions: | ||||
|         pg_partman: public | ||||
|         postgis: data | ||||
| ``` | ||||
| 
 | ||||
| Some extensions require SUPERUSER rights on creation unless they are not | ||||
| whitelisted by the [pgextwlist](https://github.com/dimitri/pgextwlist) | ||||
| extension, that is shipped with the Spilo image. To see which extensions are | ||||
| on the list check the `extwlist.extension` parameter in the postgresql.conf | ||||
| file. | ||||
| 
 | ||||
| ```bash | ||||
| SHOW extwlist.extensions; | ||||
| ``` | ||||
| 
 | ||||
| Make sure that `pgextlist` is also listed under `shared_preload_libraries` in | ||||
| the PostgreSQL configuration. Then the database owner should be able to create | ||||
| the extension specified in the manifest. | ||||
| 
 | ||||
| ### From `databases` to `preparedDatabases` | ||||
| 
 | ||||
| If you wish to create the role setup described above for databases listed under | ||||
| the `databases` key, you have to make sure that the owner role follows the | ||||
| `<dbname>_owner` naming convention of `preparedDatabases`. As roles are synced | ||||
| first, this can be done with one edit: | ||||
| 
 | ||||
| ```yaml | ||||
| # before | ||||
| spec: | ||||
|   databases: | ||||
|     foo: db_owner | ||||
| 
 | ||||
| # after | ||||
| spec: | ||||
|   databases: | ||||
|     foo: foo_owner | ||||
|   preparedDatabases: | ||||
|     foo: | ||||
|       schemas: | ||||
|         my_existing_schema: {} | ||||
| ``` | ||||
| 
 | ||||
| Adding existing database schemas to the manifest to create roles for them as | ||||
| well is up the user and not done by the operator. Remember that if you don't | ||||
| specify any schema a new database schema called `data` will be created. When | ||||
| everything got synced (roles, schemas, extensions), you are free to remove the | ||||
| database from the `databases` section. Note, that the operator does not delete | ||||
| database objects or revoke privileges when removed from the manifest. | ||||
| 
 | ||||
| ## Resource definition | ||||
| 
 | ||||
| The compute resources to be used for the Postgres containers in the pods can be | ||||
|  | @ -586,8 +749,8 @@ don't know the value, use `103` which is the GID from the default spilo image | |||
| OpenShift allocates the users and groups dynamically (based on scc), and their | ||||
| range is different in every namespace. Due to this dynamic behaviour, it's not | ||||
| trivial to know at deploy time the uid/gid of the user in the cluster. | ||||
| Therefore, instead of using a global `spilo_fsgroup` setting, use the `spiloFSGroup` field | ||||
| per Postgres cluster. | ||||
| Therefore, instead of using a global `spilo_fsgroup` setting, use the | ||||
| `spiloFSGroup` field per Postgres cluster. | ||||
| 
 | ||||
| Upload the cert as a kubernetes secret: | ||||
| ```sh | ||||
|  |  | |||
|  | @ -21,6 +21,17 @@ spec: | |||
|   - 127.0.0.1/32 | ||||
|   databases: | ||||
|     foo: zalando | ||||
|   preparedDatabases: | ||||
|     bar: | ||||
|       defaultUsers: true | ||||
|       extensions: | ||||
|         pg_partman: public | ||||
|         pgcrypto: public | ||||
|       schemas: | ||||
|         data: {} | ||||
|         history: | ||||
|           defaultRoles: true | ||||
|           defaultUsers: false | ||||
|   postgresql: | ||||
|     version: "12" | ||||
|     parameters: # Expert section | ||||
|  |  | |||
|  | @ -15,5 +15,7 @@ spec: | |||
|     foo_user: []  # role for application foo | ||||
|   databases: | ||||
|     foo: zalando  # dbname: owner | ||||
|   preparedDatabases: | ||||
|     bar: {} | ||||
|   postgresql: | ||||
|     version: "12" | ||||
|  |  | |||
|  | @ -237,6 +237,26 @@ spec: | |||
|                   type: object | ||||
|                   additionalProperties: | ||||
|                     type: string | ||||
|             preparedDatabases: | ||||
|               type: object | ||||
|               additionalProperties: | ||||
|                 type: object | ||||
|                 properties: | ||||
|                   defaultUsers: | ||||
|                     type: boolean | ||||
|                   extensions: | ||||
|                     type: object | ||||
|                     additionalProperties: | ||||
|                       type: string | ||||
|                   schemas: | ||||
|                     type: object | ||||
|                     additionalProperties: | ||||
|                       type: object | ||||
|                       properties: | ||||
|                         defaultUsers: | ||||
|                           type: boolean | ||||
|                         defaultRoles: | ||||
|                           type: boolean | ||||
|             replicaLoadBalancer:  # deprecated | ||||
|               type: boolean | ||||
|             resources: | ||||
|  |  | |||
|  | @ -421,6 +421,43 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ | |||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 					"preparedDatabases": { | ||||
| 						Type: "object", | ||||
| 						AdditionalProperties: &apiextv1beta1.JSONSchemaPropsOrBool{ | ||||
| 							Schema: &apiextv1beta1.JSONSchemaProps{ | ||||
| 								Type: "object", | ||||
| 								Properties: map[string]apiextv1beta1.JSONSchemaProps{ | ||||
| 									"defaultUsers": { | ||||
| 										Type: "boolean", | ||||
| 									}, | ||||
| 									"extensions": { | ||||
| 										Type: "object", | ||||
| 										AdditionalProperties: &apiextv1beta1.JSONSchemaPropsOrBool{ | ||||
| 											Schema: &apiextv1beta1.JSONSchemaProps{ | ||||
| 												Type: "string", | ||||
| 											}, | ||||
| 										}, | ||||
| 									}, | ||||
| 									"schemas": { | ||||
| 										Type: "object", | ||||
| 										AdditionalProperties: &apiextv1beta1.JSONSchemaPropsOrBool{ | ||||
| 											Schema: &apiextv1beta1.JSONSchemaProps{ | ||||
| 												Type: "object", | ||||
| 												Properties: map[string]apiextv1beta1.JSONSchemaProps{ | ||||
| 													"defaultUsers": { | ||||
| 														Type: "boolean", | ||||
| 													}, | ||||
| 													"defaultRoles": { | ||||
| 														Type: "boolean", | ||||
| 													}, | ||||
| 												}, | ||||
| 											}, | ||||
| 										}, | ||||
| 									}, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 					"replicaLoadBalancer": { | ||||
| 						Type:        "boolean", | ||||
| 						Description: "Deprecated", | ||||
|  |  | |||
|  | @ -56,6 +56,7 @@ type PostgresSpec struct { | |||
| 	Clone                 CloneDescription            `json:"clone"` | ||||
| 	ClusterName           string                      `json:"-"` | ||||
| 	Databases             map[string]string           `json:"databases,omitempty"` | ||||
| 	PreparedDatabases     map[string]PreparedDatabase `json:"preparedDatabases,omitempty"` | ||||
| 	Tolerations           []v1.Toleration             `json:"tolerations,omitempty"` | ||||
| 	Sidecars              []Sidecar                   `json:"sidecars,omitempty"` | ||||
| 	InitContainers        []v1.Container              `json:"initContainers,omitempty"` | ||||
|  | @ -84,6 +85,19 @@ type PostgresqlList struct { | |||
| 	Items []Postgresql `json:"items"` | ||||
| } | ||||
| 
 | ||||
| // PreparedDatabase describes elements to be bootstrapped
 | ||||
| type PreparedDatabase struct { | ||||
| 	PreparedSchemas map[string]PreparedSchema `json:"schemas,omitempty"` | ||||
| 	DefaultUsers    bool                      `json:"defaultUsers,omitempty" defaults:"false"` | ||||
| 	Extensions      map[string]string         `json:"extensions,omitempty"` | ||||
| } | ||||
| 
 | ||||
| // PreparedSchema describes elements to be bootstrapped per schema
 | ||||
| type PreparedSchema struct { | ||||
| 	DefaultRoles *bool `json:"defaultRoles,omitempty" defaults:"true"` | ||||
| 	DefaultUsers bool  `json:"defaultUsers,omitempty" defaults:"false"` | ||||
| } | ||||
| 
 | ||||
| // MaintenanceWindow describes the time window when the operator is allowed to do maintenance on a cluster.
 | ||||
| type MaintenanceWindow struct { | ||||
| 	Everyday  bool | ||||
|  |  | |||
|  | @ -575,6 +575,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { | |||
| 			(*out)[key] = val | ||||
| 		} | ||||
| 	} | ||||
| 	if in.PreparedDatabases != nil { | ||||
| 		in, out := &in.PreparedDatabases, &out.PreparedDatabases | ||||
| 		*out = make(map[string]PreparedDatabase, len(*in)) | ||||
| 		for key, val := range *in { | ||||
| 			(*out)[key] = *val.DeepCopy() | ||||
| 		} | ||||
| 	} | ||||
| 	if in.Tolerations != nil { | ||||
| 		in, out := &in.Tolerations, &out.Tolerations | ||||
| 		*out = make([]corev1.Toleration, len(*in)) | ||||
|  | @ -768,6 +775,57 @@ func (in *PostgresqlParam) DeepCopy() *PostgresqlParam { | |||
| 	return out | ||||
| } | ||||
| 
 | ||||
| // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 | ||||
| func (in *PreparedDatabase) DeepCopyInto(out *PreparedDatabase) { | ||||
| 	*out = *in | ||||
| 	if in.PreparedSchemas != nil { | ||||
| 		in, out := &in.PreparedSchemas, &out.PreparedSchemas | ||||
| 		*out = make(map[string]PreparedSchema, len(*in)) | ||||
| 		for key, val := range *in { | ||||
| 			(*out)[key] = *val.DeepCopy() | ||||
| 		} | ||||
| 	} | ||||
| 	if in.Extensions != nil { | ||||
| 		in, out := &in.Extensions, &out.Extensions | ||||
| 		*out = make(map[string]string, len(*in)) | ||||
| 		for key, val := range *in { | ||||
| 			(*out)[key] = val | ||||
| 		} | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
| 
 | ||||
| // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PreparedDatabase.
 | ||||
| func (in *PreparedDatabase) DeepCopy() *PreparedDatabase { | ||||
| 	if in == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	out := new(PreparedDatabase) | ||||
| 	in.DeepCopyInto(out) | ||||
| 	return out | ||||
| } | ||||
| 
 | ||||
| // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 | ||||
| func (in *PreparedSchema) DeepCopyInto(out *PreparedSchema) { | ||||
| 	*out = *in | ||||
| 	if in.DefaultRoles != nil { | ||||
| 		in, out := &in.DefaultRoles, &out.DefaultRoles | ||||
| 		*out = new(bool) | ||||
| 		**out = **in | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
| 
 | ||||
| // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PreparedSchema.
 | ||||
| func (in *PreparedSchema) DeepCopy() *PreparedSchema { | ||||
| 	if in == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	out := new(PreparedSchema) | ||||
| 	in.DeepCopyInto(out) | ||||
| 	return out | ||||
| } | ||||
| 
 | ||||
| // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
 | ||||
| func (in *ResourceDescription) DeepCopyInto(out *ResourceDescription) { | ||||
| 	*out = *in | ||||
|  |  | |||
|  | @ -9,6 +9,7 @@ import ( | |||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"regexp" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
|  | @ -227,6 +228,10 @@ func (c *Cluster) initUsers() error { | |||
| 		return fmt.Errorf("could not init infrastructure roles: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := c.initPreparedDatabaseRoles(); err != nil { | ||||
| 		return fmt.Errorf("could not init default users: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := c.initRobotUsers(); err != nil { | ||||
| 		return fmt.Errorf("could not init robot users: %v", err) | ||||
| 	} | ||||
|  | @ -343,6 +348,9 @@ func (c *Cluster) Create() error { | |||
| 		if err = c.syncDatabases(); err != nil { | ||||
| 			return fmt.Errorf("could not sync databases: %v", err) | ||||
| 		} | ||||
| 		if err = c.syncPreparedDatabases(); err != nil { | ||||
| 			return fmt.Errorf("could not sync prepared databases: %v", err) | ||||
| 		} | ||||
| 		c.logger.Infof("databases have been successfully created") | ||||
| 	} | ||||
| 
 | ||||
|  | @ -649,7 +657,8 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | |||
| 
 | ||||
| 	// connection pooler needs one system user created, which is done in
 | ||||
| 	// initUsers. Check if it needs to be called.
 | ||||
| 	sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) | ||||
| 	sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) && | ||||
| 		reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases) | ||||
| 	needConnectionPooler := c.needConnectionPoolerWorker(&newSpec.Spec) | ||||
| 	if !sameUsers || needConnectionPooler { | ||||
| 		c.logger.Debugf("syncing secrets") | ||||
|  | @ -766,19 +775,28 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { | |||
| 			c.logger.Errorf("could not sync roles: %v", err) | ||||
| 			updateFailed = true | ||||
| 		} | ||||
| 		if !reflect.DeepEqual(oldSpec.Spec.Databases, newSpec.Spec.Databases) { | ||||
| 		if !reflect.DeepEqual(oldSpec.Spec.Databases, newSpec.Spec.Databases) || | ||||
| 			!reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases) { | ||||
| 			c.logger.Infof("syncing databases") | ||||
| 			if err := c.syncDatabases(); err != nil { | ||||
| 				c.logger.Errorf("could not sync databases: %v", err) | ||||
| 				updateFailed = true | ||||
| 			} | ||||
| 		} | ||||
| 		if !reflect.DeepEqual(oldSpec.Spec.PreparedDatabases, newSpec.Spec.PreparedDatabases) { | ||||
| 			c.logger.Infof("syncing prepared databases") | ||||
| 			if err := c.syncPreparedDatabases(); err != nil { | ||||
| 				c.logger.Errorf("could not sync prepared databases: %v", err) | ||||
| 				updateFailed = true | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// sync connection pooler
 | ||||
| 	if _, err := c.syncConnectionPooler(oldSpec, newSpec, | ||||
| 		c.installLookupFunction); err != nil { | ||||
| 		return fmt.Errorf("could not sync connection pooler: %v", err) | ||||
| 		c.logger.Errorf("could not sync connection pooler: %v", err) | ||||
| 		updateFailed = true | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
|  | @ -949,6 +967,100 @@ func (c *Cluster) initSystemUsers() { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) initPreparedDatabaseRoles() error { | ||||
| 
 | ||||
| 	if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB
 | ||||
| 		c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}} | ||||
| 	} | ||||
| 
 | ||||
| 	// create maps with default roles/users as keys and their membership as values
 | ||||
| 	defaultRoles := map[string]string{ | ||||
| 		constants.OwnerRoleNameSuffix:  "", | ||||
| 		constants.ReaderRoleNameSuffix: "", | ||||
| 		constants.WriterRoleNameSuffix: constants.ReaderRoleNameSuffix, | ||||
| 	} | ||||
| 	defaultUsers := map[string]string{ | ||||
| 		constants.OwnerRoleNameSuffix + constants.UserRoleNameSuffix:  constants.OwnerRoleNameSuffix, | ||||
| 		constants.ReaderRoleNameSuffix + constants.UserRoleNameSuffix: constants.ReaderRoleNameSuffix, | ||||
| 		constants.WriterRoleNameSuffix + constants.UserRoleNameSuffix: constants.WriterRoleNameSuffix, | ||||
| 	} | ||||
| 
 | ||||
| 	for preparedDbName, preparedDB := range c.Spec.PreparedDatabases { | ||||
| 		// default roles per database
 | ||||
| 		if err := c.initDefaultRoles(defaultRoles, "admin", preparedDbName); err != nil { | ||||
| 			return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err) | ||||
| 		} | ||||
| 		if preparedDB.DefaultUsers { | ||||
| 			if err := c.initDefaultRoles(defaultUsers, "admin", preparedDbName); err != nil { | ||||
| 				return fmt.Errorf("could not initialize default roles for database %s: %v", preparedDbName, err) | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		// default roles per database schema
 | ||||
| 		preparedSchemas := preparedDB.PreparedSchemas | ||||
| 		if len(preparedDB.PreparedSchemas) == 0 { | ||||
| 			preparedSchemas = map[string]acidv1.PreparedSchema{"data": {DefaultRoles: util.True()}} | ||||
| 		} | ||||
| 		for preparedSchemaName, preparedSchema := range preparedSchemas { | ||||
| 			if preparedSchema.DefaultRoles == nil || *preparedSchema.DefaultRoles { | ||||
| 				if err := c.initDefaultRoles(defaultRoles, | ||||
| 					preparedDbName+constants.OwnerRoleNameSuffix, | ||||
| 					preparedDbName+"_"+preparedSchemaName); err != nil { | ||||
| 					return fmt.Errorf("could not initialize default roles for database schema %s: %v", preparedSchemaName, err) | ||||
| 				} | ||||
| 				if preparedSchema.DefaultUsers { | ||||
| 					if err := c.initDefaultRoles(defaultUsers, | ||||
| 						preparedDbName+constants.OwnerRoleNameSuffix, | ||||
| 						preparedDbName+"_"+preparedSchemaName); err != nil { | ||||
| 						return fmt.Errorf("could not initialize default users for database schema %s: %v", preparedSchemaName, err) | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix string) error { | ||||
| 
 | ||||
| 	for defaultRole, inherits := range defaultRoles { | ||||
| 
 | ||||
| 		roleName := prefix + defaultRole | ||||
| 
 | ||||
| 		flags := []string{constants.RoleFlagNoLogin} | ||||
| 		if defaultRole[len(defaultRole)-5:] == constants.UserRoleNameSuffix { | ||||
| 			flags = []string{constants.RoleFlagLogin} | ||||
| 		} | ||||
| 
 | ||||
| 		memberOf := make([]string, 0) | ||||
| 		if inherits != "" { | ||||
| 			memberOf = append(memberOf, prefix+inherits) | ||||
| 		} | ||||
| 
 | ||||
| 		adminRole := "" | ||||
| 		if strings.Contains(defaultRole, constants.OwnerRoleNameSuffix) { | ||||
| 			adminRole = admin | ||||
| 		} else { | ||||
| 			adminRole = prefix + constants.OwnerRoleNameSuffix | ||||
| 		} | ||||
| 
 | ||||
| 		newRole := spec.PgUser{ | ||||
| 			Origin:    spec.RoleOriginBootstrap, | ||||
| 			Name:      roleName, | ||||
| 			Password:  util.RandomPassword(constants.PasswordLength), | ||||
| 			Flags:     flags, | ||||
| 			MemberOf:  memberOf, | ||||
| 			AdminRole: adminRole, | ||||
| 		} | ||||
| 		if currentRole, present := c.pgUsers[roleName]; present { | ||||
| 			c.pgUsers[roleName] = c.resolveNameConflict(¤tRole, &newRole) | ||||
| 		} else { | ||||
| 			c.pgUsers[roleName] = newRole | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) initRobotUsers() error { | ||||
| 	for username, userFlags := range c.Spec.Users { | ||||
| 		if !isValidUsername(username) { | ||||
|  |  | |||
|  | @ -13,6 +13,7 @@ import ( | |||
| 	"github.com/zalando/postgres-operator/pkg/util/k8sutil" | ||||
| 	"github.com/zalando/postgres-operator/pkg/util/teams" | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/client-go/tools/record" | ||||
| ) | ||||
| 
 | ||||
|  | @ -35,7 +36,7 @@ var cl = New( | |||
| 		}, | ||||
| 	}, | ||||
| 	k8sutil.NewMockKubernetesClient(), | ||||
| 	acidv1.Postgresql{}, | ||||
| 	acidv1.Postgresql{ObjectMeta: metav1.ObjectMeta{Name: "acid-test", Namespace: "test"}}, | ||||
| 	logger, | ||||
| 	eventRecorder, | ||||
| ) | ||||
|  | @ -760,3 +761,89 @@ func TestInitSystemUsers(t *testing.T) { | |||
| 		t.Errorf("%s, System users are not allowed to be a connection pool user", testName) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestPreparedDatabases(t *testing.T) { | ||||
| 	testName := "TestDefaultPreparedDatabase" | ||||
| 
 | ||||
| 	cl.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{} | ||||
| 	cl.initPreparedDatabaseRoles() | ||||
| 
 | ||||
| 	for _, role := range []string{"acid_test_owner", "acid_test_reader", "acid_test_writer", | ||||
| 		"acid_test_data_owner", "acid_test_data_reader", "acid_test_data_writer"} { | ||||
| 		if _, exist := cl.pgUsers[role]; !exist { | ||||
| 			t.Errorf("%s, default role %q for prepared database not present", testName, role) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	testName = "TestPreparedDatabaseWithSchema" | ||||
| 
 | ||||
| 	cl.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{ | ||||
| 		"foo": { | ||||
| 			DefaultUsers: true, | ||||
| 			PreparedSchemas: map[string]acidv1.PreparedSchema{ | ||||
| 				"bar": { | ||||
| 					DefaultUsers: true, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	cl.initPreparedDatabaseRoles() | ||||
| 
 | ||||
| 	for _, role := range []string{ | ||||
| 		"foo_owner", "foo_reader", "foo_writer", | ||||
| 		"foo_owner_user", "foo_reader_user", "foo_writer_user", | ||||
| 		"foo_bar_owner", "foo_bar_reader", "foo_bar_writer", | ||||
| 		"foo_bar_owner_user", "foo_bar_reader_user", "foo_bar_writer_user"} { | ||||
| 		if _, exist := cl.pgUsers[role]; !exist { | ||||
| 			t.Errorf("%s, default role %q for prepared database not present", testName, role) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	roleTests := []struct { | ||||
| 		subTest  string | ||||
| 		role     string | ||||
| 		memberOf string | ||||
| 		admin    string | ||||
| 	}{ | ||||
| 		{ | ||||
| 			subTest:  "Test admin role of owner", | ||||
| 			role:     "foo_owner", | ||||
| 			memberOf: "", | ||||
| 			admin:    "admin", | ||||
| 		}, | ||||
| 		{ | ||||
| 			subTest:  "Test writer is a member of reader", | ||||
| 			role:     "foo_writer", | ||||
| 			memberOf: "foo_reader", | ||||
| 			admin:    "foo_owner", | ||||
| 		}, | ||||
| 		{ | ||||
| 			subTest:  "Test reader LOGIN role", | ||||
| 			role:     "foo_reader_user", | ||||
| 			memberOf: "foo_reader", | ||||
| 			admin:    "foo_owner", | ||||
| 		}, | ||||
| 		{ | ||||
| 			subTest:  "Test schema owner", | ||||
| 			role:     "foo_bar_owner", | ||||
| 			memberOf: "", | ||||
| 			admin:    "foo_owner", | ||||
| 		}, | ||||
| 		{ | ||||
| 			subTest:  "Test schema writer LOGIN role", | ||||
| 			role:     "foo_bar_writer_user", | ||||
| 			memberOf: "foo_bar_writer", | ||||
| 			admin:    "foo_bar_owner", | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	for _, tt := range roleTests { | ||||
| 		user := cl.pgUsers[tt.role] | ||||
| 		if (tt.memberOf == "" && len(user.MemberOf) > 0) || (tt.memberOf != "" && user.MemberOf[0] != tt.memberOf) { | ||||
| 			t.Errorf("%s, incorrect membership for default role %q. Expected %q, got %q", tt.subTest, tt.role, tt.memberOf, user.MemberOf[0]) | ||||
| 		} | ||||
| 		if user.AdminRole != tt.admin { | ||||
| 			t.Errorf("%s, incorrect admin role for default role %q. Expected %q, got %q", tt.subTest, tt.role, tt.admin, user.AdminRole) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -28,8 +28,34 @@ const ( | |||
| 	 ORDER BY 1;` | ||||
| 
 | ||||
| 	getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` | ||||
| 	getSchemasSQL   = `SELECT n.nspname AS dbschema FROM pg_catalog.pg_namespace n | ||||
| 			WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' ORDER BY 1` | ||||
| 	getExtensionsSQL = `SELECT e.extname, n.nspname FROM pg_catalog.pg_extension e | ||||
| 	        LEFT JOIN pg_catalog.pg_namespace n ON n.oid = e.extnamespace ORDER BY 1;` | ||||
| 
 | ||||
| 	createDatabaseSQL       = `CREATE DATABASE "%s" OWNER "%s";` | ||||
| 	createDatabaseSchemaSQL = `SET ROLE TO "%s"; CREATE SCHEMA IF NOT EXISTS "%s" AUTHORIZATION "%s"` | ||||
| 	alterDatabaseOwnerSQL   = `ALTER DATABASE "%s" OWNER TO "%s";` | ||||
| 	createExtensionSQL      = `CREATE EXTENSION IF NOT EXISTS "%s" SCHEMA "%s"` | ||||
| 	alterExtensionSQL       = `ALTER EXTENSION "%s" SET SCHEMA "%s"` | ||||
| 
 | ||||
| 	globalDefaultPrivilegesSQL = `SET ROLE TO "%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES GRANT SELECT ON TABLES TO "%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES GRANT SELECT ON SEQUENCES TO "%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES GRANT INSERT, UPDATE, DELETE ON TABLES TO "%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES GRANT USAGE, UPDATE ON SEQUENCES TO "%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES GRANT EXECUTE ON FUNCTIONS TO "%s","%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES GRANT USAGE ON TYPES TO "%s","%s";` | ||||
| 	schemaDefaultPrivilegesSQL = `SET ROLE TO "%s"; | ||||
| 			GRANT USAGE ON SCHEMA "%s" TO "%s","%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT SELECT ON TABLES TO "%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT SELECT ON SEQUENCES TO "%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT INSERT, UPDATE, DELETE ON TABLES TO "%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT USAGE, UPDATE ON SEQUENCES TO "%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT EXECUTE ON FUNCTIONS TO "%s","%s"; | ||||
| 			ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT USAGE ON TYPES TO "%s","%s";` | ||||
| 
 | ||||
| 	connectionPoolerLookup = ` | ||||
| 		CREATE SCHEMA IF NOT EXISTS {{.pooler_schema}}; | ||||
| 
 | ||||
|  | @ -221,43 +247,141 @@ func (c *Cluster) getDatabases() (dbs map[string]string, err error) { | |||
| } | ||||
| 
 | ||||
| // executeCreateDatabase creates new database with the given owner.
 | ||||
| // The caller is responsible for openinging and closing the database connection.
 | ||||
| func (c *Cluster) executeCreateDatabase(datname, owner string) error { | ||||
| 	return c.execCreateOrAlterDatabase(datname, owner, createDatabaseSQL, | ||||
| // The caller is responsible for opening and closing the database connection.
 | ||||
| func (c *Cluster) executeCreateDatabase(databaseName, owner string) error { | ||||
| 	return c.execCreateOrAlterDatabase(databaseName, owner, createDatabaseSQL, | ||||
| 		"creating database", "create database") | ||||
| } | ||||
| 
 | ||||
| // executeCreateDatabase changes the owner of the given database.
 | ||||
| // The caller is responsible for openinging and closing the database connection.
 | ||||
| func (c *Cluster) executeAlterDatabaseOwner(datname string, owner string) error { | ||||
| 	return c.execCreateOrAlterDatabase(datname, owner, alterDatabaseOwnerSQL, | ||||
| // executeAlterDatabaseOwner changes the owner of the given database.
 | ||||
| // The caller is responsible for opening and closing the database connection.
 | ||||
| func (c *Cluster) executeAlterDatabaseOwner(databaseName string, owner string) error { | ||||
| 	return c.execCreateOrAlterDatabase(databaseName, owner, alterDatabaseOwnerSQL, | ||||
| 		"changing owner for database", "alter database owner") | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) execCreateOrAlterDatabase(datname, owner, statement, doing, operation string) error { | ||||
| 	if !c.databaseNameOwnerValid(datname, owner) { | ||||
| func (c *Cluster) execCreateOrAlterDatabase(databaseName, owner, statement, doing, operation string) error { | ||||
| 	if !c.databaseNameOwnerValid(databaseName, owner) { | ||||
| 		return nil | ||||
| 	} | ||||
| 	c.logger.Infof("%s %q owner %q", doing, datname, owner) | ||||
| 	if _, err := c.pgDb.Exec(fmt.Sprintf(statement, datname, owner)); err != nil { | ||||
| 	c.logger.Infof("%s %q owner %q", doing, databaseName, owner) | ||||
| 	if _, err := c.pgDb.Exec(fmt.Sprintf(statement, databaseName, owner)); err != nil { | ||||
| 		return fmt.Errorf("could not execute %s: %v", operation, err) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) databaseNameOwnerValid(datname, owner string) bool { | ||||
| func (c *Cluster) databaseNameOwnerValid(databaseName, owner string) bool { | ||||
| 	if _, ok := c.pgUsers[owner]; !ok { | ||||
| 		c.logger.Infof("skipping creation of the %q database, user %q does not exist", datname, owner) | ||||
| 		c.logger.Infof("skipping creation of the %q database, user %q does not exist", databaseName, owner) | ||||
| 		return false | ||||
| 	} | ||||
| 
 | ||||
| 	if !databaseNameRegexp.MatchString(datname) { | ||||
| 		c.logger.Infof("database %q has invalid name", datname) | ||||
| 	if !databaseNameRegexp.MatchString(databaseName) { | ||||
| 		c.logger.Infof("database %q has invalid name", databaseName) | ||||
| 		return false | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| // getSchemas returns the list of current database schemas
 | ||||
| // The caller is responsible for opening and closing the database connection
 | ||||
| func (c *Cluster) getSchemas() (schemas []string, err error) { | ||||
| 	var ( | ||||
| 		rows      *sql.Rows | ||||
| 		dbschemas []string | ||||
| 	) | ||||
| 
 | ||||
| 	if rows, err = c.pgDb.Query(getSchemasSQL); err != nil { | ||||
| 		return nil, fmt.Errorf("could not query database schemas: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	defer func() { | ||||
| 		if err2 := rows.Close(); err2 != nil { | ||||
| 			if err != nil { | ||||
| 				err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err) | ||||
| 			} else { | ||||
| 				err = fmt.Errorf("error when closing query cursor: %v", err2) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	for rows.Next() { | ||||
| 		var dbschema string | ||||
| 
 | ||||
| 		if err = rows.Scan(&dbschema); err != nil { | ||||
| 			return nil, fmt.Errorf("error when processing row: %v", err) | ||||
| 		} | ||||
| 		dbschemas = append(dbschemas, dbschema) | ||||
| 	} | ||||
| 
 | ||||
| 	return dbschemas, err | ||||
| } | ||||
| 
 | ||||
| // executeCreateDatabaseSchema creates new database schema with the given owner.
 | ||||
| // The caller is responsible for opening and closing the database connection.
 | ||||
| func (c *Cluster) executeCreateDatabaseSchema(databaseName, schemaName, dbOwner string, schemaOwner string) error { | ||||
| 	return c.execCreateDatabaseSchema(databaseName, schemaName, dbOwner, schemaOwner, createDatabaseSchemaSQL, | ||||
| 		"creating database schema", "create database schema") | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) execCreateDatabaseSchema(databaseName, schemaName, dbOwner, schemaOwner, statement, doing, operation string) error { | ||||
| 	if !c.databaseSchemaNameValid(schemaName) { | ||||
| 		return nil | ||||
| 	} | ||||
| 	c.logger.Infof("%s %q owner %q", doing, schemaName, schemaOwner) | ||||
| 	if _, err := c.pgDb.Exec(fmt.Sprintf(statement, dbOwner, schemaName, schemaOwner)); err != nil { | ||||
| 		return fmt.Errorf("could not execute %s: %v", operation, err) | ||||
| 	} | ||||
| 
 | ||||
| 	// set default privileges for schema
 | ||||
| 	c.execAlterSchemaDefaultPrivileges(schemaName, schemaOwner, databaseName) | ||||
| 	if schemaOwner != dbOwner { | ||||
| 		c.execAlterSchemaDefaultPrivileges(schemaName, dbOwner, databaseName+"_"+schemaName) | ||||
| 		c.execAlterSchemaDefaultPrivileges(schemaName, schemaOwner, databaseName+"_"+schemaName) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) databaseSchemaNameValid(schemaName string) bool { | ||||
| 	if !databaseNameRegexp.MatchString(schemaName) { | ||||
| 		c.logger.Infof("database schema %q has invalid name", schemaName) | ||||
| 		return false | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) execAlterSchemaDefaultPrivileges(schemaName, owner, rolePrefix string) error { | ||||
| 	if _, err := c.pgDb.Exec(fmt.Sprintf(schemaDefaultPrivilegesSQL, owner, | ||||
| 		schemaName, rolePrefix+constants.ReaderRoleNameSuffix, rolePrefix+constants.WriterRoleNameSuffix, // schema
 | ||||
| 		schemaName, rolePrefix+constants.ReaderRoleNameSuffix, // tables
 | ||||
| 		schemaName, rolePrefix+constants.ReaderRoleNameSuffix, // sequences
 | ||||
| 		schemaName, rolePrefix+constants.WriterRoleNameSuffix, // tables
 | ||||
| 		schemaName, rolePrefix+constants.WriterRoleNameSuffix, // sequences
 | ||||
| 		schemaName, rolePrefix+constants.ReaderRoleNameSuffix, rolePrefix+constants.WriterRoleNameSuffix, // types
 | ||||
| 		schemaName, rolePrefix+constants.ReaderRoleNameSuffix, rolePrefix+constants.WriterRoleNameSuffix)); err != nil { // functions
 | ||||
| 		return fmt.Errorf("could not alter default privileges for database schema %s: %v", schemaName, err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) execAlterGlobalDefaultPrivileges(owner, rolePrefix string) error { | ||||
| 	if _, err := c.pgDb.Exec(fmt.Sprintf(globalDefaultPrivilegesSQL, owner, | ||||
| 		rolePrefix+constants.WriterRoleNameSuffix, rolePrefix+constants.ReaderRoleNameSuffix, // schemas
 | ||||
| 		rolePrefix+constants.ReaderRoleNameSuffix,                                            // tables
 | ||||
| 		rolePrefix+constants.ReaderRoleNameSuffix,                                            // sequences
 | ||||
| 		rolePrefix+constants.WriterRoleNameSuffix,                                            // tables
 | ||||
| 		rolePrefix+constants.WriterRoleNameSuffix,                                            // sequences
 | ||||
| 		rolePrefix+constants.ReaderRoleNameSuffix, rolePrefix+constants.WriterRoleNameSuffix, // types
 | ||||
| 		rolePrefix+constants.ReaderRoleNameSuffix, rolePrefix+constants.WriterRoleNameSuffix)); err != nil { // functions
 | ||||
| 		return fmt.Errorf("could not alter default privileges for database %s: %v", rolePrefix, err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin bool) (result []string) { | ||||
| 	if rolsuper { | ||||
| 		result = append(result, constants.RoleFlagSuperuser) | ||||
|  | @ -278,8 +402,67 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin | |||
| 	return result | ||||
| } | ||||
| 
 | ||||
| // Creates a connection pooler credentials lookup function in every database to
 | ||||
| // perform remote authentication.
 | ||||
| // getExtension returns the list of current database extensions
 | ||||
| // The caller is responsible for opening and closing the database connection
 | ||||
| func (c *Cluster) getExtensions() (dbExtensions map[string]string, err error) { | ||||
| 	var ( | ||||
| 		rows *sql.Rows | ||||
| 	) | ||||
| 
 | ||||
| 	if rows, err = c.pgDb.Query(getExtensionsSQL); err != nil { | ||||
| 		return nil, fmt.Errorf("could not query database extensions: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	defer func() { | ||||
| 		if err2 := rows.Close(); err2 != nil { | ||||
| 			if err != nil { | ||||
| 				err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err) | ||||
| 			} else { | ||||
| 				err = fmt.Errorf("error when closing query cursor: %v", err2) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	dbExtensions = make(map[string]string) | ||||
| 
 | ||||
| 	for rows.Next() { | ||||
| 		var extension, schema string | ||||
| 
 | ||||
| 		if err = rows.Scan(&extension, &schema); err != nil { | ||||
| 			return nil, fmt.Errorf("error when processing row: %v", err) | ||||
| 		} | ||||
| 		dbExtensions[extension] = schema | ||||
| 	} | ||||
| 
 | ||||
| 	return dbExtensions, err | ||||
| } | ||||
| 
 | ||||
| // executeCreateExtension creates new extension in the given schema.
 | ||||
| // The caller is responsible for opening and closing the database connection.
 | ||||
| func (c *Cluster) executeCreateExtension(extName, schemaName string) error { | ||||
| 	return c.execCreateOrAlterExtension(extName, schemaName, createExtensionSQL, | ||||
| 		"creating extension", "create extension") | ||||
| } | ||||
| 
 | ||||
| // executeAlterExtension changes the schema of the given extension.
 | ||||
| // The caller is responsible for opening and closing the database connection.
 | ||||
| func (c *Cluster) executeAlterExtension(extName, schemaName string) error { | ||||
| 	return c.execCreateOrAlterExtension(extName, schemaName, alterExtensionSQL, | ||||
| 		"changing schema for extension", "alter extension schema") | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doing, operation string) error { | ||||
| 
 | ||||
| 	c.logger.Infof("%s %q schema %q", doing, extName, schemaName) | ||||
| 	if _, err := c.pgDb.Exec(fmt.Sprintf(statement, extName, schemaName)); err != nil { | ||||
| 		return fmt.Errorf("could not execute %s: %v", operation, err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Creates a connection pool credentials lookup function in every database to
 | ||||
| // perform remote authentification.
 | ||||
| func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { | ||||
| 	var stmtBytes bytes.Buffer | ||||
| 	c.logger.Info("Installing lookup function") | ||||
|  | @ -305,7 +488,7 @@ func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { | |||
| 
 | ||||
| 	templater := template.Must(template.New("sql").Parse(connectionPoolerLookup)) | ||||
| 
 | ||||
| 	for dbname, _ := range currentDatabases { | ||||
| 	for dbname := range currentDatabases { | ||||
| 		if dbname == "template0" || dbname == "template1" { | ||||
| 			continue | ||||
| 		} | ||||
|  |  | |||
|  | @ -1474,6 +1474,13 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) | |||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	//skip NOLOGIN users
 | ||||
| 	for _, flag := range pgUser.Flags { | ||||
| 		if flag == constants.RoleFlagNoLogin { | ||||
| 			return nil | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	username := pgUser.Name | ||||
| 	secret := v1.Secret{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
|  |  | |||
|  | @ -4,6 +4,7 @@ import ( | |||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"regexp" | ||||
| 	"strings" | ||||
| 
 | ||||
| 	acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" | ||||
| 	"github.com/zalando/postgres-operator/pkg/spec" | ||||
|  | @ -109,6 +110,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { | |||
| 			err = fmt.Errorf("could not sync databases: %v", err) | ||||
| 			return err | ||||
| 		} | ||||
| 		c.logger.Debugf("syncing prepared databases with schemas") | ||||
| 		if err = c.syncPreparedDatabases(); err != nil { | ||||
| 			err = fmt.Errorf("could not sync prepared database: %v", err) | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// sync connection pooler
 | ||||
|  | @ -590,6 +596,7 @@ func (c *Cluster) syncDatabases() error { | |||
| 
 | ||||
| 	createDatabases := make(map[string]string) | ||||
| 	alterOwnerDatabases := make(map[string]string) | ||||
| 	preparedDatabases := make([]string, 0) | ||||
| 
 | ||||
| 	if err := c.initDbConn(); err != nil { | ||||
| 		return fmt.Errorf("could not init database connection") | ||||
|  | @ -605,12 +612,24 @@ func (c *Cluster) syncDatabases() error { | |||
| 		return fmt.Errorf("could not get current databases: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	for datname, newOwner := range c.Spec.Databases { | ||||
| 		currentOwner, exists := currentDatabases[datname] | ||||
| 	// if no prepared databases are specified create a database named like the cluster
 | ||||
| 	if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB
 | ||||
| 		c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}} | ||||
| 	} | ||||
| 	for preparedDatabaseName := range c.Spec.PreparedDatabases { | ||||
| 		_, exists := currentDatabases[preparedDatabaseName] | ||||
| 		if !exists { | ||||
| 			createDatabases[datname] = newOwner | ||||
| 			createDatabases[preparedDatabaseName] = preparedDatabaseName + constants.OwnerRoleNameSuffix | ||||
| 			preparedDatabases = append(preparedDatabases, preparedDatabaseName) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	for databaseName, newOwner := range c.Spec.Databases { | ||||
| 		currentOwner, exists := currentDatabases[databaseName] | ||||
| 		if !exists { | ||||
| 			createDatabases[databaseName] = newOwner | ||||
| 		} else if currentOwner != newOwner { | ||||
| 			alterOwnerDatabases[datname] = newOwner | ||||
| 			alterOwnerDatabases[databaseName] = newOwner | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
|  | @ -618,13 +637,116 @@ func (c *Cluster) syncDatabases() error { | |||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	for datname, owner := range createDatabases { | ||||
| 		if err = c.executeCreateDatabase(datname, owner); err != nil { | ||||
| 	for databaseName, owner := range createDatabases { | ||||
| 		if err = c.executeCreateDatabase(databaseName, owner); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	for datname, owner := range alterOwnerDatabases { | ||||
| 		if err = c.executeAlterDatabaseOwner(datname, owner); err != nil { | ||||
| 	for databaseName, owner := range alterOwnerDatabases { | ||||
| 		if err = c.executeAlterDatabaseOwner(databaseName, owner); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// set default privileges for prepared database
 | ||||
| 	for _, preparedDatabase := range preparedDatabases { | ||||
| 		if err = c.execAlterGlobalDefaultPrivileges(preparedDatabase+constants.OwnerRoleNameSuffix, preparedDatabase); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncPreparedDatabases() error { | ||||
| 	c.setProcessName("syncing prepared databases") | ||||
| 	for preparedDbName, preparedDB := range c.Spec.PreparedDatabases { | ||||
| 		if err := c.initDbConnWithName(preparedDbName); err != nil { | ||||
| 			return fmt.Errorf("could not init connection to database %s: %v", preparedDbName, err) | ||||
| 		} | ||||
| 		defer func() { | ||||
| 			if err := c.closeDbConn(); err != nil { | ||||
| 				c.logger.Errorf("could not close database connection: %v", err) | ||||
| 			} | ||||
| 		}() | ||||
| 
 | ||||
| 		// now, prepare defined schemas
 | ||||
| 		preparedSchemas := preparedDB.PreparedSchemas | ||||
| 		if len(preparedDB.PreparedSchemas) == 0 { | ||||
| 			preparedSchemas = map[string]acidv1.PreparedSchema{"data": {DefaultRoles: util.True()}} | ||||
| 		} | ||||
| 		if err := c.syncPreparedSchemas(preparedDbName, preparedSchemas); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		// install extensions
 | ||||
| 		if err := c.syncExtensions(preparedDB.Extensions); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncPreparedSchemas(databaseName string, preparedSchemas map[string]acidv1.PreparedSchema) error { | ||||
| 	c.setProcessName("syncing prepared schemas") | ||||
| 
 | ||||
| 	currentSchemas, err := c.getSchemas() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get current schemas: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	var schemas []string | ||||
| 
 | ||||
| 	for schema := range preparedSchemas { | ||||
| 		schemas = append(schemas, schema) | ||||
| 	} | ||||
| 
 | ||||
| 	if createPreparedSchemas, equal := util.SubstractStringSlices(schemas, currentSchemas); !equal { | ||||
| 		for _, schemaName := range createPreparedSchemas { | ||||
| 			owner := constants.OwnerRoleNameSuffix | ||||
| 			dbOwner := databaseName + owner | ||||
| 			if preparedSchemas[schemaName].DefaultRoles == nil || *preparedSchemas[schemaName].DefaultRoles { | ||||
| 				owner = databaseName + "_" + schemaName + owner | ||||
| 			} else { | ||||
| 				owner = dbOwner | ||||
| 			} | ||||
| 			if err = c.executeCreateDatabaseSchema(databaseName, schemaName, dbOwner, owner); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncExtensions(extensions map[string]string) error { | ||||
| 	c.setProcessName("syncing database extensions") | ||||
| 
 | ||||
| 	createExtensions := make(map[string]string) | ||||
| 	alterExtensions := make(map[string]string) | ||||
| 
 | ||||
| 	currentExtensions, err := c.getExtensions() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("could not get current database extensions: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	for extName, newSchema := range extensions { | ||||
| 		currentSchema, exists := currentExtensions[extName] | ||||
| 		if !exists { | ||||
| 			createExtensions[extName] = newSchema | ||||
| 		} else if currentSchema != newSchema { | ||||
| 			alterExtensions[extName] = newSchema | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	for extName, schema := range createExtensions { | ||||
| 		if err = c.executeCreateExtension(extName, schema); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	for extName, schema := range alterExtensions { | ||||
| 		if err = c.executeAlterExtension(extName, schema); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  |  | |||
|  | @ -1,9 +1,10 @@ | |||
| package controller | ||||
| 
 | ||||
| import ( | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 
 | ||||
| 	acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" | ||||
| ) | ||||
| 
 | ||||
|  |  | |||
|  | @ -31,6 +31,7 @@ const ( | |||
| 	RoleOriginInfrastructure | ||||
| 	RoleOriginTeamsAPI | ||||
| 	RoleOriginSystem | ||||
| 	RoleOriginBootstrap | ||||
| 	RoleConnectionPooler | ||||
| ) | ||||
| 
 | ||||
|  | @ -180,6 +181,8 @@ func (r RoleOrigin) String() string { | |||
| 		return "teams API role" | ||||
| 	case RoleOriginSystem: | ||||
| 		return "system role" | ||||
| 	case RoleOriginBootstrap: | ||||
| 		return "bootstrapped role" | ||||
| 	case RoleConnectionPooler: | ||||
| 		return "connection pooler role" | ||||
| 	default: | ||||
|  |  | |||
|  | @ -14,4 +14,8 @@ const ( | |||
| 	RoleFlagCreateDB          = "CREATEDB" | ||||
| 	RoleFlagReplication       = "REPLICATION" | ||||
| 	RoleFlagByPassRLS         = "BYPASSRLS" | ||||
| 	OwnerRoleNameSuffix       = "_owner" | ||||
| 	ReaderRoleNameSuffix      = "_reader" | ||||
| 	WriterRoleNameSuffix      = "_writer" | ||||
| 	UserRoleNameSuffix        = "_user" | ||||
| ) | ||||
|  |  | |||
|  | @ -73,26 +73,44 @@ func (strategy DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserM | |||
| } | ||||
| 
 | ||||
| // ExecuteSyncRequests makes actual database changes from the requests passed in its arguments.
 | ||||
| func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserRequest, db *sql.DB) error { | ||||
| 	for _, r := range reqs { | ||||
| 		switch r.Kind { | ||||
| func (strategy DefaultUserSyncStrategy) ExecuteSyncRequests(requests []spec.PgSyncUserRequest, db *sql.DB) error { | ||||
| 	var reqretries []spec.PgSyncUserRequest | ||||
| 	var errors []string | ||||
| 	for _, request := range requests { | ||||
| 		switch request.Kind { | ||||
| 		case spec.PGSyncUserAdd: | ||||
| 			if err := strategy.createPgUser(r.User, db); err != nil { | ||||
| 				return fmt.Errorf("could not create user %q: %v", r.User.Name, err) | ||||
| 			if err := strategy.createPgUser(request.User, db); err != nil { | ||||
| 				reqretries = append(reqretries, request) | ||||
| 				errors = append(errors, fmt.Sprintf("could not create user %q: %v", request.User.Name, err)) | ||||
| 			} | ||||
| 		case spec.PGsyncUserAlter: | ||||
| 			if err := strategy.alterPgUser(r.User, db); err != nil { | ||||
| 				return fmt.Errorf("could not alter user %q: %v", r.User.Name, err) | ||||
| 			if err := strategy.alterPgUser(request.User, db); err != nil { | ||||
| 				reqretries = append(reqretries, request) | ||||
| 				errors = append(errors, fmt.Sprintf("could not alter user %q: %v", request.User.Name, err)) | ||||
| 			} | ||||
| 		case spec.PGSyncAlterSet: | ||||
| 			if err := strategy.alterPgUserSet(r.User, db); err != nil { | ||||
| 				return fmt.Errorf("could not set custom user %q parameters: %v", r.User.Name, err) | ||||
| 			if err := strategy.alterPgUserSet(request.User, db); err != nil { | ||||
| 				reqretries = append(reqretries, request) | ||||
| 				errors = append(errors, fmt.Sprintf("could not set custom user %q parameters: %v", request.User.Name, err)) | ||||
| 			} | ||||
| 		default: | ||||
| 			return fmt.Errorf("unrecognized operation: %v", r.Kind) | ||||
| 			return fmt.Errorf("unrecognized operation: %v", request.Kind) | ||||
| 		} | ||||
| 
 | ||||
| 	} | ||||
| 
 | ||||
| 	// creating roles might fail if group role members are created before the parent role
 | ||||
| 	// retry adding roles as long as the number of failed attempts is shrinking
 | ||||
| 	if len(reqretries) > 0 { | ||||
| 		if len(reqretries) < len(requests) { | ||||
| 			if err := strategy.ExecuteSyncRequests(reqretries, db); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} else { | ||||
| 			return fmt.Errorf("could not execute sync requests for users: %v", errors) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) (err error) { | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue