Add action-based version of secret sync
This commit is contained in:
parent
a4224f6063
commit
5563078a12
|
|
@ -0,0 +1,121 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/spec"
|
||||
"github.com/zalando-incubator/postgres-operator/pkg/util"
|
||||
"k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
var NoActions []Action = []Action{}
|
||||
|
||||
type MetaData struct {
|
||||
cluster *Cluster
|
||||
namespace string
|
||||
}
|
||||
|
||||
type CreateSecret struct {
|
||||
ActionSecret
|
||||
}
|
||||
|
||||
func NewCreateSecret(username string, secret *v1.Secret, cluster *Cluster) CreateSecret {
|
||||
return CreateSecret{ActionSecret{
|
||||
meta: MetaData{
|
||||
cluster: cluster,
|
||||
},
|
||||
secretUsername: username,
|
||||
secret: secret,
|
||||
}}
|
||||
}
|
||||
|
||||
func NewUpdateSecret(username string, secret *v1.Secret, cluster *Cluster) UpdateSecret {
|
||||
return UpdateSecret{ActionSecret{
|
||||
meta: MetaData{
|
||||
cluster: cluster,
|
||||
},
|
||||
secretUsername: username,
|
||||
secret: secret,
|
||||
}}
|
||||
}
|
||||
|
||||
type UpdateSecret struct {
|
||||
ActionSecret
|
||||
}
|
||||
|
||||
type ActionSecret struct {
|
||||
meta MetaData
|
||||
secretUsername string
|
||||
secret *v1.Secret
|
||||
}
|
||||
|
||||
type Action interface {
|
||||
Name() string
|
||||
Validate() error
|
||||
Apply() error
|
||||
}
|
||||
|
||||
func (action CreateSecret) Apply() error {
|
||||
cluster := action.meta.cluster
|
||||
secret, err := cluster.KubeClient.
|
||||
Secrets(action.secret.Namespace).
|
||||
Create(action.secret)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Cannot apply action %s: %v", action.Name(), err)
|
||||
}
|
||||
|
||||
cluster.Secrets[secret.UID] = secret
|
||||
cluster.logger.Debugf(
|
||||
"created new secret %q, uid: %q",
|
||||
util.NameFromMeta(secret.ObjectMeta),
|
||||
secret.UID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (action ActionSecret) Validate() error {
|
||||
if action.secret.Data["username"] == nil {
|
||||
return fmt.Errorf("Field 'username' is empty for %v", action.secret)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (action CreateSecret) Name() string {
|
||||
return fmt.Sprintf("Create secret %v", action.secret)
|
||||
}
|
||||
|
||||
func (action UpdateSecret) Apply() error {
|
||||
cluster := action.meta.cluster
|
||||
user := cluster.getSecretUser(action.secretUsername)
|
||||
|
||||
// if this secret belongs to the infrastructure role and the password has
|
||||
// changed - replace it in the secret
|
||||
updateSecret := (user.Password != string(action.secret.Data["password"]) &&
|
||||
user.Origin == spec.RoleOriginInfrastructure)
|
||||
|
||||
if updateSecret {
|
||||
msg := "Updating the secret %q from the infrastructure roles"
|
||||
cluster.logger.Debugf(msg, action.secret.Name)
|
||||
|
||||
_, err := cluster.KubeClient.
|
||||
Secrets(action.secret.Namespace).
|
||||
Update(action.secret)
|
||||
|
||||
if err != nil {
|
||||
msg = "Could not update infrastructure role secret for role %q: %v"
|
||||
return fmt.Errorf(msg, action.secretUsername, err)
|
||||
}
|
||||
} else {
|
||||
// for non-infrastructure role - update the role with the password from
|
||||
// the secret
|
||||
user.Password = string(action.secret.Data["password"])
|
||||
cluster.setSecretUser(action.secretUsername, user)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (action UpdateSecret) Name() string {
|
||||
return fmt.Sprintf("Update secret %v", action.secret)
|
||||
}
|
||||
|
|
@ -200,6 +200,44 @@ func (c *Cluster) initUsers() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func CreateSecrets() []Action {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cluster) PlanForCreate() (plan []Action) {
|
||||
plan = append(plan, c.PlanForSecrets()...)
|
||||
return plan
|
||||
}
|
||||
|
||||
func (c *Cluster) PlanForSecrets() (plan []Action) {
|
||||
var msg string
|
||||
secrets := c.generateUserSecrets()
|
||||
|
||||
for secretUsername, secretSpec := range secrets {
|
||||
secret, err := c.KubeClient.
|
||||
Secrets(secretSpec.Namespace).
|
||||
Get(secretSpec.Name, metav1.GetOptions{})
|
||||
|
||||
if k8sutil.ResourceNotFound(err) {
|
||||
msg = "Generate plan to create new secret %q"
|
||||
c.logger.Debugf(msg, util.NameFromMeta(secret.ObjectMeta))
|
||||
plan = append(plan, NewCreateSecret(secretUsername, secret, c))
|
||||
}
|
||||
|
||||
if secretUsername != string(secret.Data["username"]) {
|
||||
msg = "Secret %q does not contain the role %q, skip it"
|
||||
c.logger.Warningf(msg, secretSpec.Name, secretUsername)
|
||||
continue
|
||||
}
|
||||
|
||||
msg = "Secret %q already exists, generate update plan"
|
||||
c.logger.Debugf(msg, util.NameFromMeta(secret.ObjectMeta))
|
||||
plan = append(plan, NewUpdateSecret(secretUsername, secret, c))
|
||||
}
|
||||
|
||||
return plan
|
||||
}
|
||||
|
||||
// Create creates the new kubernetes objects associated with the cluster.
|
||||
func (c *Cluster) Create() error {
|
||||
c.mu.Lock()
|
||||
|
|
@ -1001,3 +1039,41 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error {
|
|||
|
||||
return c.deleteClusterObject(get, deleteConfigMapFn, "configmap")
|
||||
}
|
||||
|
||||
func (c *Cluster) getSecretUser(username string) spec.PgUser {
|
||||
var usersMap map[string]spec.PgUser
|
||||
|
||||
superUser := c.systemUsers[constants.SuperuserKeyName]
|
||||
replicationUser := c.systemUsers[constants.ReplicationUserKeyName]
|
||||
|
||||
if username == superUser.Name {
|
||||
username = constants.SuperuserKeyName
|
||||
usersMap = c.systemUsers
|
||||
} else if username == replicationUser.Name {
|
||||
username = constants.ReplicationUserKeyName
|
||||
usersMap = c.systemUsers
|
||||
} else {
|
||||
usersMap = c.pgUsers
|
||||
}
|
||||
|
||||
return usersMap[username]
|
||||
}
|
||||
|
||||
func (c *Cluster) setSecretUser(username string, user spec.PgUser) {
|
||||
var usersMap map[string]spec.PgUser
|
||||
|
||||
superUser := c.systemUsers[constants.SuperuserKeyName]
|
||||
replicationUser := c.systemUsers[constants.ReplicationUserKeyName]
|
||||
|
||||
if username == superUser.Name {
|
||||
username = constants.SuperuserKeyName
|
||||
usersMap = c.systemUsers
|
||||
} else if username == replicationUser.Name {
|
||||
username = constants.ReplicationUserKeyName
|
||||
usersMap = c.systemUsers
|
||||
} else {
|
||||
usersMap = c.pgUsers
|
||||
}
|
||||
|
||||
usersMap[username] = user
|
||||
}
|
||||
|
|
|
|||
|
|
@ -160,6 +160,61 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam
|
|||
return cl
|
||||
}
|
||||
|
||||
func getClusterName(event ClusterEvent) spec.NamespacedName {
|
||||
hasNewName := eventInSlice(event.EventType, []EventType{
|
||||
EventAdd, EventSync, EventRepair,
|
||||
})
|
||||
|
||||
if hasNewName {
|
||||
return util.NameFromMeta(event.NewSpec.ObjectMeta)
|
||||
} else {
|
||||
return util.NameFromMeta(event.OldSpec.ObjectMeta)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) generatePlan(event ClusterEvent) []cluster.Action {
|
||||
var clusterName spec.NamespacedName
|
||||
|
||||
log := c.logger.WithField("worker", event.WorkerID)
|
||||
log = log.WithField("cluster-name", getClusterName(event))
|
||||
|
||||
switch event.EventType {
|
||||
case EventAdd:
|
||||
log.Infof("Creation of the cluster started")
|
||||
|
||||
newCluster := c.addCluster(log, clusterName, event.NewSpec)
|
||||
c.curWorkerCluster.Store(event.WorkerID, newCluster)
|
||||
return newCluster.PlanForCreate()
|
||||
|
||||
default:
|
||||
return cluster.NoActions
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) validatePlan(plan []cluster.Action) (err error) {
|
||||
for _, action := range plan {
|
||||
err = action.Validate()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) applyPlan(plan []cluster.Action) (err error) {
|
||||
for _, action := range plan {
|
||||
err = action.Apply()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) processEvent(event ClusterEvent) {
|
||||
var clusterName spec.NamespacedName
|
||||
var clHistory ringlog.RingLogger
|
||||
|
|
@ -323,6 +378,16 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
|
|||
c.logger.Errorf("could not cast to ClusterEvent")
|
||||
}
|
||||
|
||||
// build plan
|
||||
actions := c.generatePlan(event)
|
||||
if err := c.validatePlan(actions); err != nil {
|
||||
c.logger.Errorf("Invalid plan: %v", err)
|
||||
}
|
||||
if err := c.applyPlan(actions); err != nil {
|
||||
c.logger.Errorf("Could not apply the plan: %v", err)
|
||||
}
|
||||
|
||||
// apply legacy actions, that are not in the plan
|
||||
c.processEvent(event)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -194,3 +194,12 @@ func (c *Controller) podClusterName(pod *v1.Pod) spec.NamespacedName {
|
|||
|
||||
return spec.NamespacedName{}
|
||||
}
|
||||
|
||||
func eventInSlice(a EventType, list []EventType) bool {
|
||||
for _, b := range list {
|
||||
if b == a {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue