continue syncing databases and extensions on err (#2262)
This commit is contained in:
		
							parent
							
								
									6953f72bee
								
							
						
					
					
						commit
						80fee5bda4
					
				|  | @ -1051,7 +1051,7 @@ DBUSERS: | |||
| 
 | ||||
| func (c *Cluster) syncDatabases() error { | ||||
| 	c.setProcessName("syncing databases") | ||||
| 
 | ||||
| 	errors := make([]string, 0) | ||||
| 	createDatabases := make(map[string]string) | ||||
| 	alterOwnerDatabases := make(map[string]string) | ||||
| 	preparedDatabases := make([]string, 0) | ||||
|  | @ -1097,12 +1097,12 @@ func (c *Cluster) syncDatabases() error { | |||
| 
 | ||||
| 	for databaseName, owner := range createDatabases { | ||||
| 		if err = c.executeCreateDatabase(databaseName, owner); err != nil { | ||||
| 			return err | ||||
| 			errors = append(errors, err.Error()) | ||||
| 		} | ||||
| 	} | ||||
| 	for databaseName, owner := range alterOwnerDatabases { | ||||
| 		if err = c.executeAlterDatabaseOwner(databaseName, owner); err != nil { | ||||
| 			return err | ||||
| 			errors = append(errors, err.Error()) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
|  | @ -1118,24 +1118,32 @@ func (c *Cluster) syncDatabases() error { | |||
| 	// set default privileges for prepared database
 | ||||
| 	for _, preparedDatabase := range preparedDatabases { | ||||
| 		if err := c.initDbConnWithName(preparedDatabase); err != nil { | ||||
| 			return fmt.Errorf("could not init database connection to %s", preparedDatabase) | ||||
| 			errors = append(errors, fmt.Sprintf("could not init database connection to %s", preparedDatabase)) | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		for _, owner := range c.getOwnerRoles(preparedDatabase, c.Spec.PreparedDatabases[preparedDatabase].DefaultUsers) { | ||||
| 			if err = c.execAlterGlobalDefaultPrivileges(owner, preparedDatabase); err != nil { | ||||
| 				return err | ||||
| 				errors = append(errors, err.Error()) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if len(errors) > 0 { | ||||
| 		return fmt.Errorf("error(s) while syncing databases: %v", strings.Join(errors, `', '`)) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncPreparedDatabases() error { | ||||
| 	c.setProcessName("syncing prepared databases") | ||||
| 	errors := make([]string, 0) | ||||
| 
 | ||||
| 	for preparedDbName, preparedDB := range c.Spec.PreparedDatabases { | ||||
| 		if err := c.initDbConnWithName(preparedDbName); err != nil { | ||||
| 			return fmt.Errorf("could not init connection to database %s: %v", preparedDbName, err) | ||||
| 			errors = append(errors, fmt.Sprintf("could not init connection to database %s: %v", preparedDbName, err)) | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		c.logger.Debugf("syncing prepared database %q", preparedDbName) | ||||
|  | @ -1145,12 +1153,13 @@ func (c *Cluster) syncPreparedDatabases() error { | |||
| 			preparedSchemas = map[string]acidv1.PreparedSchema{"data": {DefaultRoles: util.True()}} | ||||
| 		} | ||||
| 		if err := c.syncPreparedSchemas(preparedDbName, preparedSchemas); err != nil { | ||||
| 			return err | ||||
| 			errors = append(errors, err.Error()) | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		// install extensions
 | ||||
| 		if err := c.syncExtensions(preparedDB.Extensions); err != nil { | ||||
| 			return err | ||||
| 			errors = append(errors, err.Error()) | ||||
| 		} | ||||
| 
 | ||||
| 		if err := c.closeDbConn(); err != nil { | ||||
|  | @ -1158,11 +1167,16 @@ func (c *Cluster) syncPreparedDatabases() error { | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if len(errors) > 0 { | ||||
| 		return fmt.Errorf("error(s) while syncing prepared databases: %v", strings.Join(errors, `', '`)) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncPreparedSchemas(databaseName string, preparedSchemas map[string]acidv1.PreparedSchema) error { | ||||
| 	c.setProcessName("syncing prepared schemas") | ||||
| 	errors := make([]string, 0) | ||||
| 
 | ||||
| 	currentSchemas, err := c.getSchemas() | ||||
| 	if err != nil { | ||||
|  | @ -1185,17 +1199,21 @@ func (c *Cluster) syncPreparedSchemas(databaseName string, preparedSchemas map[s | |||
| 				owner = dbOwner | ||||
| 			} | ||||
| 			if err = c.executeCreateDatabaseSchema(databaseName, schemaName, dbOwner, owner); err != nil { | ||||
| 				return err | ||||
| 				errors = append(errors, err.Error()) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if len(errors) > 0 { | ||||
| 		return fmt.Errorf("error(s) while syncing schemas of prepared databases: %v", strings.Join(errors, `', '`)) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncExtensions(extensions map[string]string) error { | ||||
| 	c.setProcessName("syncing database extensions") | ||||
| 
 | ||||
| 	errors := make([]string, 0) | ||||
| 	createExtensions := make(map[string]string) | ||||
| 	alterExtensions := make(map[string]string) | ||||
| 
 | ||||
|  | @ -1215,15 +1233,19 @@ func (c *Cluster) syncExtensions(extensions map[string]string) error { | |||
| 
 | ||||
| 	for extName, schema := range createExtensions { | ||||
| 		if err = c.executeCreateExtension(extName, schema); err != nil { | ||||
| 			return err | ||||
| 			errors = append(errors, err.Error()) | ||||
| 		} | ||||
| 	} | ||||
| 	for extName, schema := range alterExtensions { | ||||
| 		if err = c.executeAlterExtension(extName, schema); err != nil { | ||||
| 			return err | ||||
| 			errors = append(errors, err.Error()) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if len(errors) > 0 { | ||||
| 		return fmt.Errorf("error(s) while syncing database extensions: %v", strings.Join(errors, `', '`)) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue