Fixes for the case of re-creating the cluster after deletion.
- make sure that the secrets for the system users (superuser, replication) are not deleted when the main cluster is. Therefore, we can re-create the cluster, potentially forcing Patroni to restore it from the backup and enable Patroni to connect, since it will use the old password, not the newly generated random one. - when syncing users, always check whether they are already in the DB. Previously, we did this only for the sync cluster case, but the new cluster could be actually the one restored from the backup by Patroni, having all or some of the users already in place. - delete endponts last. Patroni uses the $clustername endpoint in order to store the leader related metadata. If we remove it before removing all pods, one of those pods running Patroni will re-create it and the next attempt to create the cluster with the same name will stuble on the existing endpoint. - Use db.Exec instead of db.Query for queries that expect no result. This also fixes the issue with the DB creation, since we didn't release an empty Row object it was not possible to create more than one database for a cluster.
This commit is contained in:
parent
1fb8cf7ea0
commit
87bc47d8d0
|
|
@ -466,7 +466,7 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
|
||||||
|
|
||||||
if !c.databaseAccessDisabled() {
|
if !c.databaseAccessDisabled() {
|
||||||
c.logger.Debugf("syncing roles")
|
c.logger.Debugf("syncing roles")
|
||||||
if err := c.syncRoles(true); err != nil {
|
if err := c.syncRoles(); err != nil {
|
||||||
c.logger.Errorf("could not sync roles: %v", err)
|
c.logger.Errorf("could not sync roles: %v", err)
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
}
|
}
|
||||||
|
|
@ -488,14 +488,14 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
|
||||||
func() {
|
func() {
|
||||||
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
|
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("could not generate old statefulset spec")
|
c.logger.Errorf("could not generate old statefulset spec: %v", err)
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
newSs, err := c.generateStatefulSet(&newSpec.Spec)
|
newSs, err := c.generateStatefulSet(&newSpec.Spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Errorf("could not generate new statefulset spec")
|
c.logger.Errorf("could not generate new statefulset spec: %v", err)
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -523,10 +523,32 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
|
// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets).
|
||||||
|
// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes
|
||||||
|
// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint
|
||||||
|
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
|
||||||
|
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
|
||||||
func (c *Cluster) Delete() error {
|
func (c *Cluster) Delete() error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if err := c.deleteStatefulSet(); err != nil {
|
||||||
|
return fmt.Errorf("could not delete statefulset: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, obj := range c.Secrets {
|
||||||
|
if delete, user := c.shouldDeleteSecret(obj); !delete {
|
||||||
|
c.logger.Infof("not removing secret %q for the system user %q", obj.GetName(), user)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := c.deleteSecret(obj); err != nil {
|
||||||
|
return fmt.Errorf("could not delete secret: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.deletePodDisruptionBudget(); err != nil {
|
||||||
|
return fmt.Errorf("could not delete pod disruption budget: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
for _, role := range []PostgresRole{Master, Replica} {
|
for _, role := range []PostgresRole{Master, Replica} {
|
||||||
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
if role == Replica && !c.Spec.ReplicaLoadBalancer {
|
||||||
continue
|
continue
|
||||||
|
|
@ -541,20 +563,6 @@ func (c *Cluster) Delete() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.deleteStatefulSet(); err != nil {
|
|
||||||
return fmt.Errorf("could not delete statefulset: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, obj := range c.Secrets {
|
|
||||||
if err := c.deleteSecret(obj); err != nil {
|
|
||||||
return fmt.Errorf("could not delete secret: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.deletePodDisruptionBudget(); err != nil {
|
|
||||||
return fmt.Errorf("could not delete pod disruption budget: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -784,3 +792,8 @@ func (c *Cluster) Lock() {
|
||||||
func (c *Cluster) Unlock() {
|
func (c *Cluster) Unlock() {
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) shouldDeleteSecret(secret *v1.Secret) (delete bool, userName string) {
|
||||||
|
secretUser := string(secret.Data["username"])
|
||||||
|
return (secretUser != c.OpConfig.ReplicationUsername && secretUser != c.OpConfig.SuperUsername), secretUser
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,14 +7,20 @@ import (
|
||||||
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
|
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
|
||||||
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
|
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
|
||||||
"github.com/zalando-incubator/postgres-operator/pkg/util/teams"
|
"github.com/zalando-incubator/postgres-operator/pkg/util/teams"
|
||||||
|
"k8s.io/client-go/pkg/api/v1"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
superUserName = "postgres"
|
||||||
|
replicationUserName = "standby"
|
||||||
|
)
|
||||||
|
|
||||||
var logger = logrus.New().WithField("test", "cluster")
|
var logger = logrus.New().WithField("test", "cluster")
|
||||||
var cl = New(Config{OpConfig: config.Config{ProtectedRoles: []string{"admin"},
|
var cl = New(Config{OpConfig: config.Config{ProtectedRoles: []string{"admin"},
|
||||||
Auth: config.Auth{SuperUsername: "postgres",
|
Auth: config.Auth{SuperUsername: superUserName,
|
||||||
ReplicationUsername: "standby"}}},
|
ReplicationUsername: replicationUserName}}},
|
||||||
k8sutil.KubernetesClient{}, spec.Postgresql{}, logger)
|
k8sutil.KubernetesClient{}, spec.Postgresql{}, logger)
|
||||||
|
|
||||||
func TestInitRobotUsers(t *testing.T) {
|
func TestInitRobotUsers(t *testing.T) {
|
||||||
|
|
@ -52,7 +58,7 @@ func TestInitRobotUsers(t *testing.T) {
|
||||||
`conflicting user flags: "NOINHERIT" and "INHERIT"`),
|
`conflicting user flags: "NOINHERIT" and "INHERIT"`),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
manifestUsers: map[string]spec.UserFlags{"admin": {"superuser"}, "postgres": {"createdb"}},
|
manifestUsers: map[string]spec.UserFlags{"admin": {"superuser"}, superUserName: {"createdb"}},
|
||||||
infraRoles: map[string]spec.PgUser{},
|
infraRoles: map[string]spec.PgUser{},
|
||||||
result: map[string]spec.PgUser{},
|
result: map[string]spec.PgUser{},
|
||||||
err: nil,
|
err: nil,
|
||||||
|
|
@ -121,7 +127,7 @@ func TestInitHumanUsers(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
existingRoles: map[string]spec.PgUser{},
|
existingRoles: map[string]spec.PgUser{},
|
||||||
teamRoles: []string{"admin", "standby"},
|
teamRoles: []string{"admin", replicationUserName},
|
||||||
result: map[string]spec.PgUser{},
|
result: map[string]spec.PgUser{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -138,3 +144,33 @@ func TestInitHumanUsers(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestShouldDeleteSecret(t *testing.T) {
|
||||||
|
testName := "TestShouldDeleteSecret"
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
secret *v1.Secret
|
||||||
|
outcome bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
secret: &v1.Secret{Data: map[string][]byte{"username": []byte("foobar")}},
|
||||||
|
outcome: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
secret: &v1.Secret{Data: map[string][]byte{"username": []byte(superUserName)}},
|
||||||
|
|
||||||
|
outcome: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
secret: &v1.Secret{Data: map[string][]byte{"username": []byte(replicationUserName)}},
|
||||||
|
outcome: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
if outcome, username := cl.shouldDeleteSecret(tt.secret); outcome != tt.outcome {
|
||||||
|
t.Errorf("%s expects the check for deletion of the username %q secret to return %t, got %t",
|
||||||
|
testName, username, tt.outcome, outcome)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -191,7 +191,7 @@ func (c *Cluster) executeCreateDatabase(datname, owner string) error {
|
||||||
}
|
}
|
||||||
c.logger.Infof("creating database %q with owner %q", datname, owner)
|
c.logger.Infof("creating database %q with owner %q", datname, owner)
|
||||||
|
|
||||||
if _, err := c.pgDb.Query(fmt.Sprintf(createDatabaseSQL, datname, owner)); err != nil {
|
if _, err := c.pgDb.Exec(fmt.Sprintf(createDatabaseSQL, datname, owner)); err != nil {
|
||||||
return fmt.Errorf("could not execute create database: %v", err)
|
return fmt.Errorf("could not execute create database: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -204,7 +204,7 @@ func (c *Cluster) executeAlterDatabaseOwner(datname string, owner string) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
c.logger.Infof("changing database %q owner to %q", datname, owner)
|
c.logger.Infof("changing database %q owner to %q", datname, owner)
|
||||||
if _, err := c.pgDb.Query(fmt.Sprintf(alterDatabaseOwnerSQL, datname, owner)); err != nil {
|
if _, err := c.pgDb.Exec(fmt.Sprintf(alterDatabaseOwnerSQL, datname, owner)); err != nil {
|
||||||
return fmt.Errorf("could not execute alter database owner: %v", err)
|
return fmt.Errorf("could not execute alter database owner: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -473,7 +473,7 @@ func (c *Cluster) deleteSecret(secret *v1.Secret) error {
|
||||||
|
|
||||||
func (c *Cluster) createRoles() (err error) {
|
func (c *Cluster) createRoles() (err error) {
|
||||||
// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
|
// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
|
||||||
return c.syncRoles(false)
|
return c.syncRoles()
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetServiceMaster returns cluster's kubernetes master Service
|
// GetServiceMaster returns cluster's kubernetes master Service
|
||||||
|
|
|
||||||
|
|
@ -59,7 +59,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
|
||||||
|
|
||||||
if !c.databaseAccessDisabled() {
|
if !c.databaseAccessDisabled() {
|
||||||
c.logger.Debugf("syncing roles")
|
c.logger.Debugf("syncing roles")
|
||||||
if err = c.syncRoles(true); err != nil {
|
if err = c.syncRoles(); err != nil {
|
||||||
err = fmt.Errorf("could not sync roles: %v", err)
|
err = fmt.Errorf("could not sync roles: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -346,7 +346,7 @@ func (c *Cluster) syncSecrets() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) syncRoles(readFromDatabase bool) error {
|
func (c *Cluster) syncRoles() error {
|
||||||
c.setProcessName("syncing roles")
|
c.setProcessName("syncing roles")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -365,7 +365,6 @@ func (c *Cluster) syncRoles(readFromDatabase bool) error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if readFromDatabase {
|
|
||||||
for _, u := range c.pgUsers {
|
for _, u := range c.pgUsers {
|
||||||
userNames = append(userNames, u.Name)
|
userNames = append(userNames, u.Name)
|
||||||
}
|
}
|
||||||
|
|
@ -373,7 +372,6 @@ func (c *Cluster) syncRoles(readFromDatabase bool) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error getting users from the database: %v", err)
|
return fmt.Errorf("error getting users from the database: %v", err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
pgSyncRequests := c.userSyncStrategy.ProduceSyncRequests(dbUsers, c.pgUsers)
|
pgSyncRequests := c.userSyncStrategy.ProduceSyncRequests(dbUsers, c.pgUsers)
|
||||||
if err = c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb); err != nil {
|
if err = c.userSyncStrategy.ExecuteSyncRequests(pgSyncRequests, c.pgDb); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -95,7 +95,7 @@ func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserReque
|
||||||
func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) (err error) {
|
func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql.DB) (err error) {
|
||||||
queries := produceAlterRoleSetStmts(user)
|
queries := produceAlterRoleSetStmts(user)
|
||||||
query := fmt.Sprintf(doBlockStmt, strings.Join(queries, ";"))
|
query := fmt.Sprintf(doBlockStmt, strings.Join(queries, ";"))
|
||||||
if err = runQueryDiscardResult(db, query); err != nil {
|
if _, err = db.Exec(query); err != nil {
|
||||||
err = fmt.Errorf("dB error: %v, query: %s", err, query)
|
err = fmt.Errorf("dB error: %v, query: %s", err, query)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -120,7 +120,7 @@ func (s DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) (err
|
||||||
}
|
}
|
||||||
query := fmt.Sprintf(createUserSQL, user.Name, strings.Join(userFlags, " "), userPassword)
|
query := fmt.Sprintf(createUserSQL, user.Name, strings.Join(userFlags, " "), userPassword)
|
||||||
|
|
||||||
err = runQueryDiscardResult(db, query) // TODO: Try several times
|
_, err = db.Exec(query) // TODO: Try several times
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("dB error: %v, query: %s", err, query)
|
err = fmt.Errorf("dB error: %v, query: %s", err, query)
|
||||||
return
|
return
|
||||||
|
|
@ -146,7 +146,7 @@ func (s DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) (err
|
||||||
|
|
||||||
query := fmt.Sprintf(doBlockStmt, strings.Join(resultStmt, ";"))
|
query := fmt.Sprintf(doBlockStmt, strings.Join(resultStmt, ";"))
|
||||||
|
|
||||||
err = runQueryDiscardResult(db, query) // TODO: Try several times
|
_, err = db.Exec(query) // TODO: Try several times
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("dB error: %v query %s", err, query)
|
err = fmt.Errorf("dB error: %v query %s", err, query)
|
||||||
return
|
return
|
||||||
|
|
@ -215,11 +215,3 @@ func quoteParameterValue(name, val string) string {
|
||||||
}
|
}
|
||||||
return fmt.Sprintf(`'%s'`, strings.Trim(val, " "))
|
return fmt.Sprintf(`'%s'`, strings.Trim(val, " "))
|
||||||
}
|
}
|
||||||
|
|
||||||
func runQueryDiscardResult(db *sql.DB, sql string) error {
|
|
||||||
rows, err := db.Query(sql)
|
|
||||||
if rows != nil {
|
|
||||||
rows.Close()
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue