move cluster config struct to the spec package

This commit is contained in:
Murat Kabilov 2017-07-13 16:51:41 +02:00
parent 333dfdd640
commit ea0ed11e86
8 changed files with 63 additions and 65 deletions

View File

@ -11,12 +11,10 @@ import (
"sync"
"github.com/Sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/apps/v1beta1"
"k8s.io/client-go/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
@ -24,7 +22,6 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
"github.com/zalando-incubator/postgres-operator/pkg/util/teams"
"github.com/zalando-incubator/postgres-operator/pkg/util/users"
"github.com/zalando-incubator/postgres-operator/pkg/util/volumes"
)
@ -34,16 +31,6 @@ var (
userRegexp = regexp.MustCompile(`^[a-z0-9]([-_a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-_a-z0-9]*[a-z0-9])?)*$`)
)
// Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
type Config struct {
KubeClient *kubernetes.Clientset //TODO: move clients to the better place?
RestClient *rest.RESTClient
RestConfig *rest.Config
TeamsAPIClient *teams.API
OpConfig config.Config
InfrastructureRoles map[string]spec.PgUser // inherited from the controller
}
type kubeResources struct {
Service map[PostgresRole]*v1.Service
Endpoint *v1.Endpoints
@ -56,7 +43,8 @@ type kubeResources struct {
type Cluster struct {
kubeResources
spec.Postgresql
Config
spec.ClusterConfig
config.Config
logger *logrus.Entry
pgUsers map[string]spec.PgUser
systemUsers map[string]spec.PgUser
@ -78,7 +66,7 @@ type compareStatefulsetResult struct {
}
// New creates a new cluster. This function should be called from a controller.
func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
func New(cfg spec.ClusterConfig, opCfg config.Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name)
kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)}
orphanDependents := true
@ -93,7 +81,8 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
})
cluster := &Cluster{
Config: cfg,
ClusterConfig: cfg,
Config: opCfg,
Postgresql: pgSpec,
logger: lg,
pgUsers: make(map[string]spec.PgUser),
@ -586,11 +575,11 @@ func (c *Cluster) initSystemUsers() {
// secrets, therefore, setting flags like SUPERUSER or REPLICATION
// is not necessary here
c.systemUsers[constants.SuperuserKeyName] = spec.PgUser{
Name: c.OpConfig.SuperUsername,
Name: c.SuperUsername,
Password: util.RandomPassword(constants.PasswordLength),
}
c.systemUsers[constants.ReplicationUserKeyName] = spec.PgUser{
Name: c.OpConfig.ReplicationUsername,
Name: c.ReplicationUsername,
Password: util.RandomPassword(constants.PasswordLength),
}
}
@ -623,7 +612,7 @@ func (c *Cluster) initHumanUsers() error {
}
for _, username := range teamMembers {
flags := []string{constants.RoleFlagLogin, constants.RoleFlagSuperuser}
memberOf := []string{c.OpConfig.PamRoleName}
memberOf := []string{c.PamRoleName}
c.pgUsers[username] = spec.PgUser{Name: username, Flags: flags, MemberOf: memberOf}
}

View File

@ -50,10 +50,8 @@ func (c *Cluster) resourceRequirements(resources spec.Resources) (*v1.ResourceRe
specRequests := resources.ResourceRequest
specLimits := resources.ResourceLimits
config := c.OpConfig
defaultRequests := spec.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest}
defaultLimits := spec.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit}
defaultRequests := spec.ResourceDescription{CPU: c.DefaultCPURequest, Memory: c.DefaultMemoryRequest}
defaultLimits := spec.ResourceDescription{CPU: c.DefaultCPULimit, Memory: c.DefaultMemoryLimit}
result := v1.ResourceRequirements{}
@ -166,7 +164,7 @@ PATRONI_INITDB_PARAMS:
} else {
config.Bootstrap.PgHBA = []string{
"hostnossl all all all reject",
fmt.Sprintf("hostssl all +%s all pam", c.OpConfig.PamRoleName),
fmt.Sprintf("hostssl all +%s all pam", c.PamRoleName),
"hostssl all all all md5",
}
}
@ -190,7 +188,7 @@ PATRONI_INITDB_PARAMS:
config.PgLocalConfiguration[patroniPGParametersParameterName] = pg.Parameters
}
config.Bootstrap.Users = map[string]pgUser{
c.OpConfig.PamRoleName: {
c.PamRoleName: {
Password: "",
Options: []string{constants.RoleFlagCreateDB, constants.RoleFlagNoLogin},
},
@ -217,7 +215,7 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements,
},
{
Name: "ETCD_HOST",
Value: c.OpConfig.EtcdHost,
Value: c.EtcdHost,
},
{
Name: "POD_IP",
@ -242,7 +240,7 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements,
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: c.credentialSecretName(c.OpConfig.SuperUsername),
Name: c.credentialSecretName(c.SuperUsername),
},
Key: "password",
},
@ -253,7 +251,7 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements,
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: c.credentialSecretName(c.OpConfig.ReplicationUsername),
Name: c.credentialSecretName(c.ReplicationUsername),
},
Key: "password",
},
@ -261,19 +259,19 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements,
},
{
Name: "PAM_OAUTH2",
Value: c.OpConfig.PamConfiguration,
Value: c.PamConfiguration,
},
}
if spiloConfiguration != "" {
envVars = append(envVars, v1.EnvVar{Name: "SPILO_CONFIGURATION", Value: spiloConfiguration})
}
if c.OpConfig.WALES3Bucket != "" {
envVars = append(envVars, v1.EnvVar{Name: "WAL_S3_BUCKET", Value: c.OpConfig.WALES3Bucket})
if c.WALES3Bucket != "" {
envVars = append(envVars, v1.EnvVar{Name: "WAL_S3_BUCKET", Value: c.WALES3Bucket})
}
privilegedMode := bool(true)
container := v1.Container{
Name: c.Metadata.Name,
Image: c.OpConfig.DockerImage,
Image: c.DockerImage,
ImagePullPolicy: v1.PullAlways,
Resources: *resourceRequirements,
Ports: []v1.ContainerPort{
@ -304,7 +302,7 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements,
terminateGracePeriodSeconds := int64(30)
podSpec := v1.PodSpec{
ServiceAccountName: c.OpConfig.ServiceAccountName,
ServiceAccountName: c.ServiceAccountName,
TerminationGracePeriodSeconds: &terminateGracePeriodSeconds,
Containers: []v1.Container{container},
}
@ -316,8 +314,8 @@ func (c *Cluster) genPodTemplate(resourceRequirements *v1.ResourceRequirements,
},
Spec: podSpec,
}
if c.OpConfig.KubeIAMRole != "" {
template.Annotations = map[string]string{constants.KubeIAmAnnotation: c.OpConfig.KubeIAMRole}
if c.KubeIAMRole != "" {
template.Annotations = map[string]string{constants.KubeIAmAnnotation: c.KubeIAMRole}
}
return &template
@ -451,7 +449,7 @@ func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v
},
}
if role == Replica {
service.Spec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)}
service.Spec.Selector = map[string]string{c.PodRoleLabel: string(Replica)}
}
return service

View File

@ -33,11 +33,11 @@ func (c *Cluster) pgConnectionString() string {
}
func (c *Cluster) databaseAccessDisabled() bool {
if !c.OpConfig.EnableDBAccess {
if !c.EnableDBAccess {
c.logger.Debugf("Database access is disabled")
}
return !c.OpConfig.EnableDBAccess
return !c.EnableDBAccess
}
func (c *Cluster) initDbConn() (err error) {

View File

@ -28,7 +28,7 @@ func (c *Cluster) loadResources() error {
return fmt.Errorf("too many(%d) services for a cluster", len(services.Items))
}
for i, svc := range services.Items {
switch PostgresRole(svc.Labels[c.OpConfig.PodRoleLabel]) {
switch PostgresRole(svc.Labels[c.PodRoleLabel]) {
case Replica:
c.Service[Replica] = &services.Items[i]
default:
@ -45,7 +45,7 @@ func (c *Cluster) loadResources() error {
}
for i, ep := range endpoints.Items {
if ep.Labels[c.OpConfig.PodRoleLabel] != string(Replica) {
if ep.Labels[c.PodRoleLabel] != string(Replica) {
c.Endpoint = &endpoints.Items[i]
break
}

View File

@ -111,11 +111,11 @@ func (c *Cluster) getOAuthToken() (string, error) {
//TODO: we can move this function to the Controller in case it will be needed there. As for now we use it only in the Cluster
// Temporary getting postgresql-operator secret from the NamespaceDefault
credentialsSecret, err := c.KubeClient.
Secrets(c.OpConfig.OAuthTokenSecretName.Namespace).
Get(c.OpConfig.OAuthTokenSecretName.Name)
Secrets(c.OAuthTokenSecretName.Namespace).
Get(c.OAuthTokenSecretName.Name)
if err != nil {
c.logger.Debugf("Oauth token secret name: %s", c.OpConfig.OAuthTokenSecretName)
c.logger.Debugf("Oauth token secret name: %s", c.OAuthTokenSecretName)
return "", fmt.Errorf("could not get credentials secret: %v", err)
}
data := credentialsSecret.Data
@ -131,7 +131,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) {
if c.Spec.TeamID == "" {
return nil, fmt.Errorf("no teamId specified")
}
if !c.OpConfig.EnableTeamsAPI {
if !c.EnableTeamsAPI {
c.logger.Debug("Team API is disabled, returning empty list of members")
return []string{}, nil
}
@ -160,7 +160,7 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent) error {
if role == constants.PodRoleMaster || role == constants.PodRoleReplica {
return nil
}
case <-time.After(c.OpConfig.PodLabelWaitTimeout):
case <-time.After(c.PodLabelWaitTimeout):
return fmt.Errorf("pod label wait timeout")
}
}
@ -173,14 +173,14 @@ func (c *Cluster) waitForPodDeletion(podEvents chan spec.PodEvent) error {
if podEvent.EventType == spec.EventDelete {
return nil
}
case <-time.After(c.OpConfig.PodDeletionWaitTimeout):
case <-time.After(c.PodDeletionWaitTimeout):
return fmt.Errorf("pod deletion wait timeout")
}
}
}
func (c *Cluster) waitStatefulsetReady() error {
return retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
return retryutil.Retry(c.ResourceCheckInterval, c.ResourceCheckTimeout,
func() (bool, error) {
listOptions := v1.ListOptions{
LabelSelector: c.labelsSet().String(),
@ -207,12 +207,12 @@ func (c *Cluster) waitPodLabelsReady() error {
}
masterListOption := v1.ListOptions{
LabelSelector: labels.Merge(ls, labels.Set{
c.OpConfig.PodRoleLabel: constants.PodRoleMaster,
c.PodRoleLabel: constants.PodRoleMaster,
}).String(),
}
replicaListOption := v1.ListOptions{
LabelSelector: labels.Merge(ls, labels.Set{
c.OpConfig.PodRoleLabel: constants.PodRoleReplica,
c.PodRoleLabel: constants.PodRoleReplica,
}).String(),
}
pods, err := c.KubeClient.Pods(namespace).List(listOptions)
@ -221,7 +221,7 @@ func (c *Cluster) waitPodLabelsReady() error {
}
podsNumber := len(pods.Items)
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
err = retryutil.Retry(c.ResourceCheckInterval, c.ResourceCheckTimeout,
func() (bool, error) {
masterPods, err := c.KubeClient.Pods(namespace).List(masterListOption)
if err != nil {
@ -263,32 +263,32 @@ func (c *Cluster) waitStatefulsetPodsReady() error {
func (c *Cluster) labelsSet() labels.Set {
lbls := make(map[string]string)
for k, v := range c.OpConfig.ClusterLabels {
for k, v := range c.ClusterLabels {
lbls[k] = v
}
lbls[c.OpConfig.ClusterNameLabel] = c.Metadata.Name
lbls[c.ClusterNameLabel] = c.Metadata.Name
return labels.Set(lbls)
}
func (c *Cluster) roleLabelsSet(role PostgresRole) labels.Set {
lbls := c.labelsSet()
lbls[c.OpConfig.PodRoleLabel] = string(role)
lbls[c.PodRoleLabel] = string(role)
return lbls
}
func (c *Cluster) masterDnsName() string {
return strings.ToLower(c.OpConfig.MasterDNSNameFormat.Format(
return strings.ToLower(c.MasterDNSNameFormat.Format(
"cluster", c.Spec.ClusterName,
"team", c.teamName(),
"hostedzone", c.OpConfig.DbHostedZone))
"hostedzone", c.DbHostedZone))
}
func (c *Cluster) replicaDnsName() string {
return strings.ToLower(c.OpConfig.ReplicaDNSNameFormat.Format(
return strings.ToLower(c.ReplicaDNSNameFormat.Format(
"cluster", c.Spec.ClusterName,
"team", c.teamName(),
"hostedzone", c.OpConfig.DbHostedZone))
"hostedzone", c.DbHostedZone))
}
func (c *Cluster) credentialSecretName(username string) string {
@ -300,5 +300,5 @@ func (c *Cluster) credentialSecretName(username string) string {
}
func (c *Cluster) podSpiloRole(pod *v1.Pod) string {
return pod.Labels[c.OpConfig.PodRoleLabel]
return pod.Labels[c.PodRoleLabel]
}

View File

@ -17,6 +17,7 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/cluster"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util"
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
)
@ -125,7 +126,7 @@ func (c *Controller) processEvent(obj interface{}) error {
logger.Infof("Creation of the '%s' cluster started", clusterName)
stopCh := make(chan struct{})
cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger)
cl = cluster.New(c.makeClusterConfig(), config.Copy(c.opConfig), *event.NewSpec, logger)
cl.Run(stopCh)
c.clustersMu.Lock()
@ -176,7 +177,7 @@ func (c *Controller) processEvent(obj interface{}) error {
// no race condition because a cluster is always processed by single worker
if !clusterFound {
stopCh := make(chan struct{})
cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger)
cl = cluster.New(c.makeClusterConfig(), config.Copy(c.opConfig), *event.NewSpec, logger)
cl.Run(stopCh)
c.clustersMu.Lock()

View File

@ -7,25 +7,22 @@ import (
"k8s.io/client-go/pkg/api/v1"
extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"github.com/zalando-incubator/postgres-operator/pkg/cluster"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
)
func (c *Controller) makeClusterConfig() cluster.Config {
func (c *Controller) makeClusterConfig() spec.ClusterConfig {
infrastructureRoles := make(map[string]spec.PgUser)
for k, v := range c.InfrastructureRoles {
infrastructureRoles[k] = v
}
return cluster.Config{
return spec.ClusterConfig{
KubeClient: c.KubeClient,
RestClient: c.RestClient,
RestConfig: c.RestConfig,
TeamsAPIClient: c.TeamsAPIClient,
OpConfig: config.Copy(c.opConfig),
InfrastructureRoles: infrastructureRoles,
}
}

View File

@ -5,8 +5,12 @@ import (
"fmt"
"strings"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/types"
"k8s.io/client-go/rest"
"github.com/zalando-incubator/postgres-operator/pkg/util/teams"
)
// EvenType contains type of the events for the TPRs and Pods received from Kubernetes
@ -67,6 +71,15 @@ type PgSyncUserRequest struct {
User PgUser
}
// Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
type ClusterConfig struct {
KubeClient *kubernetes.Clientset //TODO: move clients to the better place?
RestClient *rest.RESTClient
TeamsAPIClient *teams.API
RestConfig *rest.Config
InfrastructureRoles map[string]PgUser // inherited from the controller
}
// UserSyncer defines an interface for the implementations to sync users from the manifest to the DB.
type UserSyncer interface {
ProduceSyncRequests(dbUsers PgUserMap, newUsers PgUserMap) (req []PgSyncUserRequest)