skip db user actions when its secret failed to sync on update
This commit is contained in:
		
							parent
							
								
									1af4c50ed0
								
							
						
					
					
						commit
						12d1019dec
					
				|  | @ -1064,19 +1064,26 @@ func (c *Cluster) syncSecrets() error { | |||
| 	currentTime := time.Now() | ||||
| 
 | ||||
| 	for secretUsername, generatedSecret := range generatedSecrets { | ||||
| 		secret, err := c.KubeClient.Secrets(generatedSecret.Namespace).Create(context.TODO(), generatedSecret, metav1.CreateOptions{}) | ||||
| 		pgUserDegraded := false | ||||
| 		createdSecret, err := c.KubeClient.Secrets(generatedSecret.Namespace).Create(context.TODO(), generatedSecret, metav1.CreateOptions{}) | ||||
| 		if err == nil { | ||||
| 			c.Secrets[secret.UID] = secret | ||||
| 			c.logger.Infof("created new secret %s, namespace: %s, uid: %s", util.NameFromMeta(secret.ObjectMeta), generatedSecret.Namespace, secret.UID) | ||||
| 			c.Secrets[createdSecret.UID] = createdSecret | ||||
| 			c.logger.Infof("created new secret %s, namespace: %s, uid: %s", util.NameFromMeta(createdSecret.ObjectMeta), generatedSecret.Namespace, createdSecret.UID) | ||||
| 			continue | ||||
| 		} | ||||
| 		if k8sutil.ResourceAlreadyExists(err) { | ||||
| 			if err = c.updateSecret(secretUsername, generatedSecret, &retentionUsers, currentTime); err != nil { | ||||
| 				c.logger.Warningf("syncing secret %s failed: %v", util.NameFromMeta(secret.ObjectMeta), err) | ||||
| 			updatedSecret, err := c.updateSecret(secretUsername, generatedSecret, &retentionUsers, currentTime) | ||||
| 			if err == nil { | ||||
| 				c.Secrets[updatedSecret.UID] = updatedSecret | ||||
| 				continue | ||||
| 			} | ||||
| 			c.logger.Warningf("syncing secret %s failed: %v", util.NameFromMeta(updatedSecret.ObjectMeta), err) | ||||
| 			pgUserDegraded = true | ||||
| 		} else { | ||||
| 			return fmt.Errorf("could not create secret for user %s: in namespace %s: %v", secretUsername, generatedSecret.Namespace, err) | ||||
| 			c.logger.Warningf("could not create secret for user %s: in namespace %s: %v", secretUsername, generatedSecret.Namespace, err) | ||||
| 			pgUserDegraded = true | ||||
| 		} | ||||
| 		c.updatePgUser(secretUsername, pgUserDegraded) | ||||
| 	} | ||||
| 
 | ||||
| 	// remove rotation users that exceed the retention interval
 | ||||
|  | @ -1105,7 +1112,7 @@ func (c *Cluster) updateSecret( | |||
| 	secretUsername string, | ||||
| 	generatedSecret *v1.Secret, | ||||
| 	retentionUsers *[]string, | ||||
| 	currentTime time.Time) error { | ||||
| 	currentTime time.Time) (*v1.Secret, error) { | ||||
| 	var ( | ||||
| 		secret          *v1.Secret | ||||
| 		err             error | ||||
|  | @ -1115,7 +1122,7 @@ func (c *Cluster) updateSecret( | |||
| 
 | ||||
| 	// get the secret first
 | ||||
| 	if secret, err = c.KubeClient.Secrets(generatedSecret.Namespace).Get(context.TODO(), generatedSecret.Name, metav1.GetOptions{}); err != nil { | ||||
| 		return fmt.Errorf("could not get current secret: %v", err) | ||||
| 		return generatedSecret, fmt.Errorf("could not get current secret: %v", err) | ||||
| 	} | ||||
| 	c.Secrets[secret.UID] = secret | ||||
| 
 | ||||
|  | @ -1211,24 +1218,22 @@ func (c *Cluster) updateSecret( | |||
| 	if updateSecret { | ||||
| 		c.logger.Infof("%s", updateSecretMsg) | ||||
| 		if secret, err = c.KubeClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { | ||||
| 			return fmt.Errorf("could not update secret %s: %v", secretName, err) | ||||
| 			return secret, fmt.Errorf("could not update secret %s: %v", secretName, err) | ||||
| 		} | ||||
| 		c.Secrets[secret.UID] = secret | ||||
| 	} | ||||
| 
 | ||||
| 	if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations, nil); changed { | ||||
| 		patchData, err := metaAnnotationsPatch(generatedSecret.Annotations) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) | ||||
| 			return secret, fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err) | ||||
| 		} | ||||
| 		secret, err = c.KubeClient.Secrets(secret.Namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("could not patch annotations for secret %q: %v", secret.Name, err) | ||||
| 			return secret, fmt.Errorf("could not patch annotations for secret %q: %v", secret.Name, err) | ||||
| 		} | ||||
| 		c.Secrets[secret.UID] = secret | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| 	return secret, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) rotatePasswordInSecret( | ||||
|  | @ -1334,6 +1339,23 @@ func (c *Cluster) rotatePasswordInSecret( | |||
| 	return updateSecretMsg, nil | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) updatePgUser(secretUsername string, degraded bool) { | ||||
| 	for key, pgUser := range c.pgUsers { | ||||
| 		if pgUser.Name == secretUsername { | ||||
| 			pgUser.Degraded = degraded | ||||
| 			c.pgUsers[key] = pgUser | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	for key, pgUser := range c.systemUsers { | ||||
| 		if pgUser.Name == secretUsername { | ||||
| 			pgUser.Degraded = degraded | ||||
| 			c.systemUsers[key] = pgUser | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Cluster) syncRoles() (err error) { | ||||
| 	c.setProcessName("syncing roles") | ||||
| 
 | ||||
|  |  | |||
|  | @ -12,9 +12,12 @@ import ( | |||
| 
 | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	k8stesting "k8s.io/client-go/testing" | ||||
| 
 | ||||
| 	"github.com/golang/mock/gomock" | ||||
| 	"github.com/pkg/errors" | ||||
| 	"github.com/sirupsen/logrus" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"github.com/zalando/postgres-operator/mocks" | ||||
|  | @ -50,6 +53,16 @@ func newFakeK8sSyncClient() (k8sutil.KubernetesClient, *fake.Clientset) { | |||
| } | ||||
| 
 | ||||
| func newFakeK8sSyncSecretsClient() (k8sutil.KubernetesClient, *fake.Clientset) { | ||||
| 	// add a reactor that checks namespace existence before creating secrets
 | ||||
| 	clientSet.PrependReactor("create", "secrets", func(action k8stesting.Action) (bool, runtime.Object, error) { | ||||
| 		createAction := action.(k8stesting.CreateAction) | ||||
| 		secret := createAction.GetObject().(*v1.Secret) | ||||
| 		if secret.Namespace != "default" { | ||||
| 			return true, nil, errors.New("namespace does not exist") | ||||
| 		} | ||||
| 		return false, nil, nil | ||||
| 	}) | ||||
| 
 | ||||
| 	return k8sutil.KubernetesClient{ | ||||
| 		SecretsGetter: clientSet.CoreV1(), | ||||
| 	}, clientSet | ||||
|  | @ -810,7 +823,7 @@ func TestUpdateSecret(t *testing.T) { | |||
| 		}, | ||||
| 		Spec: acidv1.PostgresSpec{ | ||||
| 			Databases:                      map[string]string{dbname: dbowner}, | ||||
| 			Users:                          map[string]acidv1.UserFlags{appUser: {}, "bar": {}, dbowner: {}}, | ||||
| 			Users:                          map[string]acidv1.UserFlags{appUser: {}, "bar": {}, dbowner: {}, "not-exist.test_user": {}}, | ||||
| 			UsersIgnoringSecretRotation:    []string{"bar"}, | ||||
| 			UsersWithInPlaceSecretRotation: []string{dbowner}, | ||||
| 			Streams: []acidv1.Stream{ | ||||
|  | @ -842,6 +855,7 @@ func TestUpdateSecret(t *testing.T) { | |||
| 					PasswordRotationInterval:      1, | ||||
| 					PasswordRotationUserRetention: 3, | ||||
| 				}, | ||||
| 				EnableCrossNamespaceSecret: true, | ||||
| 				Resources: config.Resources{ | ||||
| 					ClusterLabels:    map[string]string{"application": "spilo"}, | ||||
| 					ClusterNameLabel: "cluster-name", | ||||
|  | @ -864,7 +878,9 @@ func TestUpdateSecret(t *testing.T) { | |||
| 
 | ||||
| 	allUsers := make(map[string]spec.PgUser) | ||||
| 	for _, pgUser := range cluster.pgUsers { | ||||
| 		allUsers[pgUser.Name] = pgUser | ||||
| 		if !pgUser.Degraded { | ||||
| 			allUsers[pgUser.Name] = pgUser | ||||
| 		} | ||||
| 	} | ||||
| 	for _, systemUser := range cluster.systemUsers { | ||||
| 		allUsers[systemUser.Name] = systemUser | ||||
|  |  | |||
|  | @ -58,6 +58,7 @@ type PgUser struct { | |||
| 	IsDbOwner  bool              `yaml:"is_db_owner"` | ||||
| 	Deleted    bool              `yaml:"deleted"` | ||||
| 	Rotated    bool              `yaml:"rotated"` | ||||
| 	Degraded   bool              `yaml:"degraded"` | ||||
| } | ||||
| 
 | ||||
| func (user *PgUser) Valid() bool { | ||||
|  |  | |||
|  | @ -48,6 +48,10 @@ func (strategy DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserM | |||
| 		if newUser.Deleted { | ||||
| 			continue | ||||
| 		} | ||||
| 		// when the secret of the user could not be created or updated skip any database actions
 | ||||
| 		if newUser.Degraded { | ||||
| 			continue | ||||
| 		} | ||||
| 		dbUser, exists := dbUsers[name] | ||||
| 		if !exists { | ||||
| 			reqs = append(reqs, spec.PgSyncUserRequest{Kind: spec.PGSyncUserAdd, User: newUser}) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue