Merge 90314537cd into 1af4c50ed0
				
					
				
			This commit is contained in:
		
						commit
						5dcd0283bd
					
				|  | @ -1003,7 +1003,8 @@ class EndToEndTestCase(unittest.TestCase): | ||||||
|                             "Origin": 2, |                             "Origin": 2, | ||||||
|                             "IsDbOwner": False, |                             "IsDbOwner": False, | ||||||
|                             "Deleted": False, |                             "Deleted": False, | ||||||
|                             "Rotated": False |                             "Rotated": False, | ||||||
|  |                             "Degraded": False, | ||||||
|                         }) |                         }) | ||||||
|                         return True |                         return True | ||||||
|                 except: |                 except: | ||||||
|  |  | ||||||
|  | @ -1059,40 +1059,52 @@ func (c *Cluster) syncStandbyClusterConfiguration() error { | ||||||
| func (c *Cluster) syncSecrets() error { | func (c *Cluster) syncSecrets() error { | ||||||
| 	c.logger.Debug("syncing secrets") | 	c.logger.Debug("syncing secrets") | ||||||
| 	c.setProcessName("syncing secrets") | 	c.setProcessName("syncing secrets") | ||||||
|  | 	errors := make([]string, 0) | ||||||
| 	generatedSecrets := c.generateUserSecrets() | 	generatedSecrets := c.generateUserSecrets() | ||||||
| 	retentionUsers := make([]string, 0) | 	retentionUsers := make([]string, 0) | ||||||
| 	currentTime := time.Now() | 	currentTime := time.Now() | ||||||
| 
 | 
 | ||||||
| 	for secretUsername, generatedSecret := range generatedSecrets { | 	for secretUsername, generatedSecret := range generatedSecrets { | ||||||
| 		secret, err := c.KubeClient.Secrets(generatedSecret.Namespace).Create(context.TODO(), generatedSecret, metav1.CreateOptions{}) | 		pgUserDegraded := false | ||||||
|  | 		createdSecret, err := c.KubeClient.Secrets(generatedSecret.Namespace).Create(context.TODO(), generatedSecret, metav1.CreateOptions{}) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			c.Secrets[secret.UID] = secret | 			c.Secrets[createdSecret.UID] = createdSecret | ||||||
| 			c.logger.Infof("created new secret %s, namespace: %s, uid: %s", util.NameFromMeta(secret.ObjectMeta), generatedSecret.Namespace, secret.UID) | 			c.logger.Infof("created new secret %s, namespace: %s, uid: %s", util.NameFromMeta(createdSecret.ObjectMeta), generatedSecret.Namespace, createdSecret.UID) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if k8sutil.ResourceAlreadyExists(err) { | 		if k8sutil.ResourceAlreadyExists(err) { | ||||||
| 			if err = c.updateSecret(secretUsername, generatedSecret, &retentionUsers, currentTime); err != nil { | 			updatedSecret, err := c.updateSecret(secretUsername, generatedSecret, &retentionUsers, currentTime) | ||||||
| 				c.logger.Warningf("syncing secret %s failed: %v", util.NameFromMeta(secret.ObjectMeta), err) | 			if err == nil { | ||||||
|  | 				c.Secrets[updatedSecret.UID] = updatedSecret | ||||||
|  | 				continue | ||||||
| 			} | 			} | ||||||
|  | 			errors = append(errors, fmt.Sprintf("syncing secret %s failed: %v", util.NameFromMeta(updatedSecret.ObjectMeta), err)) | ||||||
|  | 			pgUserDegraded = true | ||||||
| 		} else { | 		} else { | ||||||
| 			return fmt.Errorf("could not create secret for user %s: in namespace %s: %v", secretUsername, generatedSecret.Namespace, err) | 			errors = append(errors, fmt.Sprintf("could not create secret for user %s: in namespace %s: %v", secretUsername, generatedSecret.Namespace, err)) | ||||||
|  | 			pgUserDegraded = true | ||||||
| 		} | 		} | ||||||
|  | 		c.updatePgUser(secretUsername, pgUserDegraded) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// remove rotation users that exceed the retention interval
 | 	// remove rotation users that exceed the retention interval
 | ||||||
| 	if len(retentionUsers) > 0 { | 	if len(retentionUsers) > 0 { | ||||||
| 		err := c.initDbConn() | 		err := c.initDbConn() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not init db connection: %v", err) | 			errors = append(errors, fmt.Sprintf("could not init db connection: %v", err)) | ||||||
| 		} | 		} | ||||||
| 		if err = c.cleanupRotatedUsers(retentionUsers, c.pgDb); err != nil { | 		if err = c.cleanupRotatedUsers(retentionUsers, c.pgDb); err != nil { | ||||||
| 			return fmt.Errorf("error removing users exceeding configured retention interval: %v", err) | 			errors = append(errors, fmt.Sprintf("error removing users exceeding configured retention interval: %v", err)) | ||||||
| 		} | 		} | ||||||
| 		if err := c.closeDbConn(); err != nil { | 		if err := c.closeDbConn(); err != nil { | ||||||
| 			c.logger.Errorf("could not close database connection after removing users exceeding configured retention interval: %v", err) | 			errors = append(errors, fmt.Sprintf("could not close database connection after removing users exceeding configured retention interval: %v", err)) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if len(errors) > 0 { | ||||||
|  | 		return fmt.Errorf("%v", strings.Join(errors, `', '`)) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -1105,7 +1117,7 @@ func (c *Cluster) updateSecret( | ||||||
| 	secretUsername string, | 	secretUsername string, | ||||||
| 	generatedSecret *v1.Secret, | 	generatedSecret *v1.Secret, | ||||||
| 	retentionUsers *[]string, | 	retentionUsers *[]string, | ||||||
| 	currentTime time.Time) error { | 	currentTime time.Time) (*v1.Secret, error) { | ||||||
| 	var ( | 	var ( | ||||||
| 		secret          *v1.Secret | 		secret          *v1.Secret | ||||||
| 		err             error | 		err             error | ||||||
|  | @ -1115,7 +1127,7 @@ func (c *Cluster) updateSecret( | ||||||
| 
 | 
 | ||||||
| 	// get the secret first
 | 	// get the secret first
 | ||||||
| 	if secret, err = c.KubeClient.Secrets(generatedSecret.Namespace).Get(context.TODO(), generatedSecret.Name, metav1.GetOptions{}); err != nil { | 	if secret, err = c.KubeClient.Secrets(generatedSecret.Namespace).Get(context.TODO(), generatedSecret.Name, metav1.GetOptions{}); err != nil { | ||||||
| 		return fmt.Errorf("could not get current secret: %v", err) | 		return generatedSecret, fmt.Errorf("could not get current secret: %v", err) | ||||||
| 	} | 	} | ||||||
| 	c.Secrets[secret.UID] = secret | 	c.Secrets[secret.UID] = secret | ||||||
| 
 | 
 | ||||||
|  | @ -1211,24 +1223,22 @@ func (c *Cluster) updateSecret( | ||||||
| 	if updateSecret { | 	if updateSecret { | ||||||
| 		c.logger.Infof("%s", updateSecretMsg) | 		c.logger.Infof("%s", updateSecretMsg) | ||||||
| 		if secret, err = c.KubeClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { | 		if secret, err = c.KubeClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { | ||||||
| 			return fmt.Errorf("could not update secret %s: %v", secretName, err) | 			return secret, fmt.Errorf("could not update secret %s: %v", secretName, err) | ||||||
| 		} | 		} | ||||||
| 		c.Secrets[secret.UID] = secret |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations, nil); changed { | 	if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations, nil); changed { | ||||||
| 		patchData, err := metaAnnotationsPatch(generatedSecret.Annotations) | 		patchData, err := metaAnnotationsPatch(generatedSecret.Annotations) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) | 			return secret, fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) | ||||||
| 		} | 		} | ||||||
| 		secret, err = c.KubeClient.Secrets(secret.Namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) | 		secret, err = c.KubeClient.Secrets(secret.Namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return fmt.Errorf("could not patch annotations for secret %q: %v", secret.Name, err) | 			return secret, fmt.Errorf("could not patch annotations for secret %q: %v", secret.Name, err) | ||||||
| 		} | 		} | ||||||
| 		c.Secrets[secret.UID] = secret |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return secret, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Cluster) rotatePasswordInSecret( | func (c *Cluster) rotatePasswordInSecret( | ||||||
|  | @ -1334,6 +1344,23 @@ func (c *Cluster) rotatePasswordInSecret( | ||||||
| 	return updateSecretMsg, nil | 	return updateSecretMsg, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *Cluster) updatePgUser(secretUsername string, degraded bool) { | ||||||
|  | 	for key, pgUser := range c.pgUsers { | ||||||
|  | 		if pgUser.Name == secretUsername { | ||||||
|  | 			pgUser.Degraded = degraded | ||||||
|  | 			c.pgUsers[key] = pgUser | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	for key, pgUser := range c.systemUsers { | ||||||
|  | 		if pgUser.Name == secretUsername { | ||||||
|  | 			pgUser.Degraded = degraded | ||||||
|  | 			c.systemUsers[key] = pgUser | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *Cluster) syncRoles() (err error) { | func (c *Cluster) syncRoles() (err error) { | ||||||
| 	c.setProcessName("syncing roles") | 	c.setProcessName("syncing roles") | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -12,9 +12,12 @@ import ( | ||||||
| 
 | 
 | ||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
|  | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
|  | 	k8stesting "k8s.io/client-go/testing" | ||||||
| 
 | 
 | ||||||
| 	"github.com/golang/mock/gomock" | 	"github.com/golang/mock/gomock" | ||||||
|  | 	"github.com/pkg/errors" | ||||||
| 	"github.com/sirupsen/logrus" | 	"github.com/sirupsen/logrus" | ||||||
| 	"github.com/stretchr/testify/assert" | 	"github.com/stretchr/testify/assert" | ||||||
| 	"github.com/zalando/postgres-operator/mocks" | 	"github.com/zalando/postgres-operator/mocks" | ||||||
|  | @ -50,6 +53,16 @@ func newFakeK8sSyncClient() (k8sutil.KubernetesClient, *fake.Clientset) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func newFakeK8sSyncSecretsClient() (k8sutil.KubernetesClient, *fake.Clientset) { | func newFakeK8sSyncSecretsClient() (k8sutil.KubernetesClient, *fake.Clientset) { | ||||||
|  | 	// add a reactor that checks namespace existence before creating secrets
 | ||||||
|  | 	clientSet.PrependReactor("create", "secrets", func(action k8stesting.Action) (bool, runtime.Object, error) { | ||||||
|  | 		createAction := action.(k8stesting.CreateAction) | ||||||
|  | 		secret := createAction.GetObject().(*v1.Secret) | ||||||
|  | 		if secret.Namespace != "default" { | ||||||
|  | 			return true, nil, errors.New("namespace does not exist") | ||||||
|  | 		} | ||||||
|  | 		return false, nil, nil | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
| 	return k8sutil.KubernetesClient{ | 	return k8sutil.KubernetesClient{ | ||||||
| 		SecretsGetter: clientSet.CoreV1(), | 		SecretsGetter: clientSet.CoreV1(), | ||||||
| 	}, clientSet | 	}, clientSet | ||||||
|  | @ -810,7 +823,7 @@ func TestUpdateSecret(t *testing.T) { | ||||||
| 		}, | 		}, | ||||||
| 		Spec: acidv1.PostgresSpec{ | 		Spec: acidv1.PostgresSpec{ | ||||||
| 			Databases:                      map[string]string{dbname: dbowner}, | 			Databases:                      map[string]string{dbname: dbowner}, | ||||||
| 			Users:                          map[string]acidv1.UserFlags{appUser: {}, "bar": {}, dbowner: {}}, | 			Users:                          map[string]acidv1.UserFlags{appUser: {}, "bar": {}, dbowner: {}, "not-exist.test_user": {}}, | ||||||
| 			UsersIgnoringSecretRotation:    []string{"bar"}, | 			UsersIgnoringSecretRotation:    []string{"bar"}, | ||||||
| 			UsersWithInPlaceSecretRotation: []string{dbowner}, | 			UsersWithInPlaceSecretRotation: []string{dbowner}, | ||||||
| 			Streams: []acidv1.Stream{ | 			Streams: []acidv1.Stream{ | ||||||
|  | @ -842,6 +855,7 @@ func TestUpdateSecret(t *testing.T) { | ||||||
| 					PasswordRotationInterval:      1, | 					PasswordRotationInterval:      1, | ||||||
| 					PasswordRotationUserRetention: 3, | 					PasswordRotationUserRetention: 3, | ||||||
| 				}, | 				}, | ||||||
|  | 				EnableCrossNamespaceSecret: true, | ||||||
| 				Resources: config.Resources{ | 				Resources: config.Resources{ | ||||||
| 					ClusterLabels:    map[string]string{"application": "spilo"}, | 					ClusterLabels:    map[string]string{"application": "spilo"}, | ||||||
| 					ClusterNameLabel: "cluster-name", | 					ClusterNameLabel: "cluster-name", | ||||||
|  | @ -864,7 +878,9 @@ func TestUpdateSecret(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 	allUsers := make(map[string]spec.PgUser) | 	allUsers := make(map[string]spec.PgUser) | ||||||
| 	for _, pgUser := range cluster.pgUsers { | 	for _, pgUser := range cluster.pgUsers { | ||||||
| 		allUsers[pgUser.Name] = pgUser | 		if !pgUser.Degraded { | ||||||
|  | 			allUsers[pgUser.Name] = pgUser | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	for _, systemUser := range cluster.systemUsers { | 	for _, systemUser := range cluster.systemUsers { | ||||||
| 		allUsers[systemUser.Name] = systemUser | 		allUsers[systemUser.Name] = systemUser | ||||||
|  |  | ||||||
|  | @ -58,6 +58,7 @@ type PgUser struct { | ||||||
| 	IsDbOwner  bool              `yaml:"is_db_owner"` | 	IsDbOwner  bool              `yaml:"is_db_owner"` | ||||||
| 	Deleted    bool              `yaml:"deleted"` | 	Deleted    bool              `yaml:"deleted"` | ||||||
| 	Rotated    bool              `yaml:"rotated"` | 	Rotated    bool              `yaml:"rotated"` | ||||||
|  | 	Degraded   bool              `yaml:"degraded"` | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (user *PgUser) Valid() bool { | func (user *PgUser) Valid() bool { | ||||||
|  |  | ||||||
|  | @ -48,6 +48,10 @@ func (strategy DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserM | ||||||
| 		if newUser.Deleted { | 		if newUser.Deleted { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  | 		// when the secret of the user could not be created or updated skip any database actions
 | ||||||
|  | 		if newUser.Degraded { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
| 		dbUser, exists := dbUsers[name] | 		dbUser, exists := dbUsers[name] | ||||||
| 		if !exists { | 		if !exists { | ||||||
| 			reqs = append(reqs, spec.PgSyncUserRequest{Kind: spec.PGSyncUserAdd, User: newUser}) | 			reqs = append(reqs, spec.PgSyncUserRequest{Kind: spec.PGSyncUserAdd, User: newUser}) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue