Get config from environment variables;
ignore pg major version change; get rid of resources package;
This commit is contained in:
parent
28e3b7b5d9
commit
c2d2a67ad5
13
cmd/main.go
13
cmd/main.go
|
|
@ -9,7 +9,7 @@ import (
|
|||
"syscall"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/controller"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/config"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/teams"
|
||||
)
|
||||
|
|
@ -19,6 +19,7 @@ var (
|
|||
podNamespace string
|
||||
OutOfCluster bool
|
||||
version string
|
||||
cfg *config.Config
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
@ -30,6 +31,8 @@ func init() {
|
|||
if len(podNamespace) == 0 {
|
||||
podNamespace = "default"
|
||||
}
|
||||
|
||||
cfg = config.LoadFromEnv()
|
||||
}
|
||||
|
||||
func ControllerConfig() *controller.Config {
|
||||
|
|
@ -45,7 +48,7 @@ func ControllerConfig() *controller.Config {
|
|||
|
||||
restClient, err := k8sutil.KubernetesRestClient(restConfig)
|
||||
|
||||
teamsApi := teams.NewTeamsAPI(constants.TeamsAPIUrl)
|
||||
teamsApi := teams.NewTeamsAPI(cfg.TeamsAPIUrl)
|
||||
return &controller.Config{
|
||||
PodNamespace: podNamespace,
|
||||
KubeClient: client,
|
||||
|
|
@ -57,7 +60,7 @@ func ControllerConfig() *controller.Config {
|
|||
func main() {
|
||||
log.SetOutput(os.Stdout)
|
||||
log.Printf("Spilo operator %s\n", version)
|
||||
log.Printf("MY_POD_NAMESPACE=%s\n", podNamespace)
|
||||
log.Printf("ServiceAccountName: %s\n", cfg.ServiceAccountName)
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
stop := make(chan struct{})
|
||||
|
|
@ -65,9 +68,9 @@ func main() {
|
|||
|
||||
wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on
|
||||
|
||||
cfg := ControllerConfig()
|
||||
controllerConfig := ControllerConfig()
|
||||
|
||||
c := controller.New(cfg)
|
||||
c := controller.New(controllerConfig, cfg)
|
||||
c.Run(stop, wg)
|
||||
|
||||
sig := <-sigs
|
||||
|
|
|
|||
|
|
@ -27,3 +27,4 @@ import:
|
|||
- rest
|
||||
- tools/cache
|
||||
- tools/clientcmd
|
||||
- package: github.com/kelseyhightower/envconfig
|
||||
|
|
@ -14,11 +14,45 @@ spec:
|
|||
- name: postgres-operator
|
||||
image: pierone.example.com/acid/postgres-operator:0.1
|
||||
env:
|
||||
- name: MY_POD_NAMESPACE
|
||||
- name: MY_POD_NAMESPACE #TODO: use PGOP_ prefix
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
- name: MY_POD_NAME
|
||||
- name: PGOP_SERVICE_ACCOUNT_NAME
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.name
|
||||
fieldPath: spec.serviceAccountName
|
||||
- name: PGOP_READY_WAIT_INTERVAL
|
||||
value: "3s"
|
||||
- name: PGOP_READY_WAIT_TIMEOUT
|
||||
value: "30s"
|
||||
- name: PGOP_RESYNC_PERIOD
|
||||
value: "5m"
|
||||
- name: PGOP_RESYNC_PERIOD_POD
|
||||
value: "5m"
|
||||
- name: PGOP_RESOURCE_CHECK_INTERVAL
|
||||
value: "3s"
|
||||
- name: PGOP_RESOURCE_CHECK_TIMEOUT
|
||||
value: "10m"
|
||||
- name: PGOP_POD_LABEL_WAIT_TIMEOUT
|
||||
value: "10m"
|
||||
- name: PGOP_POD_DELETION_WAIT_TIMEOUT
|
||||
value: "10m"
|
||||
- name: PGOP_PAM_ROLE_NAME
|
||||
value: "zalandos"
|
||||
- name: PGOP_PAM_CONFIGURATION
|
||||
value: "https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"
|
||||
- name: PGOP_TEAMS_API_URL
|
||||
value: "https://teams.example.com/api/"
|
||||
- name: PGOP_OAUTH_TOKEN_SECRET_NAME
|
||||
value: "postgresql-operator"
|
||||
- name: PGOP_SUPER_USERNAME
|
||||
value: "postgres"
|
||||
- name: PGOP_REPLICATION_USERNAME
|
||||
value: "replication"
|
||||
- name: PGOP_ETCD_HOST
|
||||
value: "etcd-client.default.svc.cluster.local:2379"
|
||||
- name: PGOP_DOCKER_IMAGE
|
||||
value: "registry.opensource.zalan.do/acid/spilo-9.6:1.2-p12"
|
||||
- name: PGOP_DB_HOSTED_ZONE
|
||||
value: "db.example.com"
|
||||
|
|
@ -21,9 +21,9 @@ import (
|
|||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/config"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/resources"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/teams"
|
||||
)
|
||||
|
||||
|
|
@ -37,6 +37,7 @@ type Config struct {
|
|||
RestClient *rest.RESTClient
|
||||
EtcdClient etcdclient.KeysAPI
|
||||
TeamsAPIClient *teams.TeamsAPI
|
||||
OpConfig *config.Config
|
||||
}
|
||||
|
||||
type KubeResources struct {
|
||||
|
|
@ -51,10 +52,8 @@ type KubeResources struct {
|
|||
type Cluster struct {
|
||||
KubeResources
|
||||
spec.Postgresql
|
||||
config Config
|
||||
Config
|
||||
logger *logrus.Entry
|
||||
etcdHost string
|
||||
dockerImage string
|
||||
pgUsers map[string]spec.PgUser
|
||||
podEvents chan spec.PodEvent
|
||||
podSubscribers map[spec.PodName]chan spec.PodEvent
|
||||
|
|
@ -67,11 +66,9 @@ func New(cfg Config, pgSpec spec.Postgresql) *Cluster {
|
|||
kubeResources := KubeResources{Secrets: make(map[types.UID]*v1.Secret)}
|
||||
|
||||
cluster := &Cluster{
|
||||
config: cfg,
|
||||
Config: cfg,
|
||||
Postgresql: pgSpec,
|
||||
logger: lg,
|
||||
etcdHost: constants.EtcdHost,
|
||||
dockerImage: constants.SpiloImage,
|
||||
pgUsers: make(map[string]spec.PgUser),
|
||||
podEvents: make(chan spec.PodEvent),
|
||||
podSubscribers: make(map[spec.PodName]chan spec.PodEvent),
|
||||
|
|
@ -106,7 +103,7 @@ func (c *Cluster) SetStatus(status spec.PostgresStatus) {
|
|||
}
|
||||
request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods
|
||||
|
||||
_, err = c.config.RestClient.Patch(api.MergePatchType).
|
||||
_, err = c.RestClient.Patch(api.MergePatchType).
|
||||
RequestURI(c.Metadata.GetSelfLink()).
|
||||
Body(request).
|
||||
DoRaw()
|
||||
|
|
@ -237,7 +234,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
|
|||
c.logger.Infof("Cluster update from version %s to %s",
|
||||
c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion)
|
||||
|
||||
newService := resources.Service(c.ClusterName(), c.TeamName(), newSpec.Spec.AllowedSourceRanges)
|
||||
newService := c.genService(newSpec.Spec.AllowedSourceRanges)
|
||||
if !c.sameServiceWith(newService) {
|
||||
c.logger.Infof("LoadBalancer configuration has changed for Service '%s': %+v -> %+v",
|
||||
util.NameFromMeta(c.Service.ObjectMeta),
|
||||
|
|
@ -255,7 +252,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
|
|||
//TODO: update PVC
|
||||
}
|
||||
|
||||
newStatefulSet := genStatefulSet(c.ClusterName(), newSpec.Spec, c.etcdHost, c.dockerImage)
|
||||
newStatefulSet := c.genStatefulSet(newSpec.Spec)
|
||||
sameSS, rollingUpdate := c.compareStatefulSetWith(newStatefulSet)
|
||||
|
||||
if !sameSS {
|
||||
|
|
@ -340,13 +337,13 @@ func (c *Cluster) ReceivePodEvent(event spec.PodEvent) {
|
|||
}
|
||||
|
||||
func (c *Cluster) initSystemUsers() {
|
||||
c.pgUsers[constants.SuperuserName] = spec.PgUser{
|
||||
Name: constants.SuperuserName,
|
||||
c.pgUsers[c.OpConfig.SuperUsername] = spec.PgUser{
|
||||
Name: c.OpConfig.SuperUsername,
|
||||
Password: util.RandomPassword(constants.PasswordLength),
|
||||
}
|
||||
|
||||
c.pgUsers[constants.ReplicationUsername] = spec.PgUser{
|
||||
Name: constants.ReplicationUsername,
|
||||
c.pgUsers[c.OpConfig.ReplicationUsername] = spec.PgUser{
|
||||
Name: c.OpConfig.ReplicationUsername,
|
||||
Password: util.RandomPassword(constants.PasswordLength),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package resources
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
|
@ -6,8 +6,6 @@ import (
|
|||
"k8s.io/client-go/pkg/api/resource"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/client-go/pkg/labels"
|
||||
"k8s.io/client-go/pkg/util/intstr"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
|
|
@ -15,23 +13,7 @@ import (
|
|||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
)
|
||||
|
||||
func credentialSecretName(clusterName, username string) string {
|
||||
return fmt.Sprintf(
|
||||
constants.UserSecretTemplate,
|
||||
username,
|
||||
clusterName,
|
||||
constants.TPRName,
|
||||
constants.TPRVendor)
|
||||
}
|
||||
|
||||
func labelsSet(clusterName string) labels.Set {
|
||||
return labels.Set{
|
||||
"application": "spilo",
|
||||
"spilo-cluster": clusterName,
|
||||
}
|
||||
}
|
||||
|
||||
func ResourceList(resources spec.Resources) *v1.ResourceList {
|
||||
func resourceList(resources spec.Resources) *v1.ResourceList {
|
||||
resourceList := v1.ResourceList{}
|
||||
if resources.Cpu != "" {
|
||||
resourceList[v1.ResourceCPU] = resource.MustParse(resources.Cpu)
|
||||
|
|
@ -44,11 +26,11 @@ func ResourceList(resources spec.Resources) *v1.ResourceList {
|
|||
return &resourceList
|
||||
}
|
||||
|
||||
func PodTemplate(cluster spec.ClusterName, resourceList *v1.ResourceList, pgVersion string, dockerImage, etcdHost string) *v1.PodTemplateSpec {
|
||||
func (c *Cluster) genPodTemplate(resourceList *v1.ResourceList, pgVersion string) *v1.PodTemplateSpec {
|
||||
envVars := []v1.EnvVar{
|
||||
{
|
||||
Name: "SCOPE",
|
||||
Value: cluster.Name,
|
||||
Value: c.Metadata.Name,
|
||||
},
|
||||
{
|
||||
Name: "PGROOT",
|
||||
|
|
@ -56,7 +38,7 @@ func PodTemplate(cluster spec.ClusterName, resourceList *v1.ResourceList, pgVers
|
|||
},
|
||||
{
|
||||
Name: "ETCD_HOST",
|
||||
Value: etcdHost,
|
||||
Value: c.OpConfig.EtcdHost,
|
||||
},
|
||||
{
|
||||
Name: "POD_IP",
|
||||
|
|
@ -81,7 +63,7 @@ func PodTemplate(cluster spec.ClusterName, resourceList *v1.ResourceList, pgVers
|
|||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: &v1.SecretKeySelector{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: credentialSecretName(cluster.Name, constants.SuperuserName),
|
||||
Name: c.credentialSecretName(c.OpConfig.SuperUsername),
|
||||
},
|
||||
Key: "password",
|
||||
},
|
||||
|
|
@ -92,18 +74,18 @@ func PodTemplate(cluster spec.ClusterName, resourceList *v1.ResourceList, pgVers
|
|||
ValueFrom: &v1.EnvVarSource{
|
||||
SecretKeyRef: &v1.SecretKeySelector{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: credentialSecretName(cluster.Name, constants.ReplicationUsername),
|
||||
Name: c.credentialSecretName(c.OpConfig.ReplicationUsername),
|
||||
},
|
||||
Key: "password",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PAM_OAUTH2", //TODO: get from the operator tpr spec
|
||||
Value: constants.PamConfiguration, //space before uid is obligatory
|
||||
Name: "PAM_OAUTH2",
|
||||
Value: c.OpConfig.PamConfiguration,
|
||||
},
|
||||
{
|
||||
Name: "SPILO_CONFIGURATION", //TODO: get from the operator tpr spec
|
||||
Name: "SPILO_CONFIGURATION",
|
||||
Value: fmt.Sprintf(`
|
||||
postgresql:
|
||||
bin_dir: /usr/lib/postgresql/%s/bin
|
||||
|
|
@ -120,13 +102,13 @@ bootstrap:
|
|||
pg_hba:
|
||||
- hostnossl all all all reject
|
||||
- hostssl all +%s all pam
|
||||
- hostssl all all all md5`, pgVersion, constants.PamRoleName, constants.PamRoleName),
|
||||
- hostssl all all all md5`, pgVersion, c.OpConfig.PamRoleName, c.OpConfig.PamRoleName),
|
||||
},
|
||||
}
|
||||
|
||||
container := v1.Container{
|
||||
Name: cluster.Name,
|
||||
Image: dockerImage,
|
||||
Name: c.Metadata.Name,
|
||||
Image: c.OpConfig.DockerImage,
|
||||
ImagePullPolicy: v1.PullAlways,
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: *resourceList,
|
||||
|
|
@ -156,16 +138,15 @@ bootstrap:
|
|||
terminateGracePeriodSeconds := int64(30)
|
||||
|
||||
podSpec := v1.PodSpec{
|
||||
ServiceAccountName: constants.ServiceAccountName,
|
||||
ServiceAccountName: c.OpConfig.ServiceAccountName,
|
||||
TerminationGracePeriodSeconds: &terminateGracePeriodSeconds,
|
||||
Containers: []v1.Container{container},
|
||||
}
|
||||
|
||||
template := v1.PodTemplateSpec{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Labels: labelsSet(cluster.Name),
|
||||
Namespace: cluster.Namespace,
|
||||
Annotations: map[string]string{"pod.alpha.kubernetes.io/initialized": "true"},
|
||||
Labels: c.labelsSet(),
|
||||
Namespace: c.Metadata.Name,
|
||||
},
|
||||
Spec: podSpec,
|
||||
}
|
||||
|
|
@ -173,7 +154,29 @@ bootstrap:
|
|||
return &template
|
||||
}
|
||||
|
||||
func VolumeClaimTemplate(volumeSize, volumeStorageClass string) *v1.PersistentVolumeClaim {
|
||||
func (c *Cluster) genStatefulSet(spec spec.PostgresSpec) *v1beta1.StatefulSet {
|
||||
resourceList := resourceList(spec.Resources)
|
||||
podTemplate := c.genPodTemplate(resourceList, spec.PgVersion)
|
||||
volumeClaimTemplate := persistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass)
|
||||
|
||||
statefulSet := &v1beta1.StatefulSet{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: c.Metadata.Name,
|
||||
Namespace: c.Metadata.Namespace,
|
||||
Labels: c.labelsSet(),
|
||||
},
|
||||
Spec: v1beta1.StatefulSetSpec{
|
||||
Replicas: &spec.NumberOfInstances,
|
||||
ServiceName: c.Metadata.Name,
|
||||
Template: *podTemplate,
|
||||
VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate},
|
||||
},
|
||||
}
|
||||
|
||||
return statefulSet
|
||||
}
|
||||
|
||||
func persistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) *v1.PersistentVolumeClaim {
|
||||
metadata := v1.ObjectMeta{
|
||||
Name: constants.DataVolumeName,
|
||||
}
|
||||
|
|
@ -198,38 +201,19 @@ func VolumeClaimTemplate(volumeSize, volumeStorageClass string) *v1.PersistentVo
|
|||
return volumeClaim
|
||||
}
|
||||
|
||||
func StatefulSet(cluster spec.ClusterName, podTemplate *v1.PodTemplateSpec,
|
||||
persistenVolumeClaim *v1.PersistentVolumeClaim, numberOfInstances int32) *v1beta1.StatefulSet {
|
||||
statefulSet := &v1beta1.StatefulSet{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: cluster.Name,
|
||||
Namespace: cluster.Namespace,
|
||||
Labels: labelsSet(cluster.Name),
|
||||
},
|
||||
Spec: v1beta1.StatefulSetSpec{
|
||||
Replicas: &numberOfInstances,
|
||||
ServiceName: cluster.Name,
|
||||
Template: *podTemplate,
|
||||
VolumeClaimTemplates: []v1.PersistentVolumeClaim{*persistenVolumeClaim},
|
||||
},
|
||||
}
|
||||
|
||||
return statefulSet
|
||||
}
|
||||
|
||||
func UserSecrets(cluster spec.ClusterName, pgUsers map[string]spec.PgUser) (secrets map[string]*v1.Secret, err error) {
|
||||
secrets = make(map[string]*v1.Secret, len(pgUsers))
|
||||
namespace := cluster.Namespace
|
||||
for username, pgUser := range pgUsers {
|
||||
func (c *Cluster) genUserSecrets() (secrets map[string]*v1.Secret, err error) {
|
||||
secrets = make(map[string]*v1.Secret, len(c.pgUsers))
|
||||
namespace := c.Metadata.Namespace
|
||||
for username, pgUser := range c.pgUsers {
|
||||
//Skip users with no password i.e. human users (they'll be authenticated using pam)
|
||||
if pgUser.Password == "" {
|
||||
continue
|
||||
}
|
||||
secret := v1.Secret{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: credentialSecretName(cluster.Name, username),
|
||||
Name: c.credentialSecretName(username),
|
||||
Namespace: namespace,
|
||||
Labels: labelsSet(cluster.Name),
|
||||
Labels: c.labelsSet(),
|
||||
},
|
||||
Type: v1.SecretTypeOpaque,
|
||||
Data: map[string][]byte{
|
||||
|
|
@ -243,14 +227,14 @@ func UserSecrets(cluster spec.ClusterName, pgUsers map[string]spec.PgUser) (secr
|
|||
return
|
||||
}
|
||||
|
||||
func Service(cluster spec.ClusterName, teamName string, allowedSourceRanges []string) *v1.Service {
|
||||
func (c *Cluster) genService(allowedSourceRanges []string) *v1.Service {
|
||||
service := &v1.Service{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: cluster.Name,
|
||||
Namespace: cluster.Namespace,
|
||||
Labels: labelsSet(cluster.Name),
|
||||
Name: c.Metadata.Name,
|
||||
Namespace: c.Metadata.Namespace,
|
||||
Labels: c.labelsSet(),
|
||||
Annotations: map[string]string{
|
||||
constants.ZalandoDnsNameAnnotation: util.ClusterDNSName(cluster.Name, teamName, constants.DbHostedZone),
|
||||
constants.ZalandoDnsNameAnnotation: util.ClusterDNSName(c.Metadata.Name, c.TeamName(), c.OpConfig.DbHostedZone),
|
||||
},
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
|
|
@ -263,27 +247,14 @@ func Service(cluster spec.ClusterName, teamName string, allowedSourceRanges []st
|
|||
return service
|
||||
}
|
||||
|
||||
func Endpoint(cluster spec.ClusterName) *v1.Endpoints {
|
||||
func (c *Cluster) genEndpoints() *v1.Endpoints {
|
||||
endpoints := &v1.Endpoints{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: cluster.Name,
|
||||
Namespace: cluster.Namespace,
|
||||
Labels: labelsSet(cluster.Name),
|
||||
Name: c.Metadata.Name,
|
||||
Namespace: c.Metadata.Namespace,
|
||||
Labels: c.labelsSet(),
|
||||
},
|
||||
}
|
||||
|
||||
return endpoints
|
||||
}
|
||||
|
||||
func ThirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource {
|
||||
return &extv1beta.ThirdPartyResource{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
//ThirdPartyResources are cluster-wide
|
||||
Name: TPRName,
|
||||
},
|
||||
Versions: []extv1beta.APIVersion{
|
||||
{Name: constants.TPRApiVersion},
|
||||
},
|
||||
Description: constants.TPRDescription,
|
||||
}
|
||||
}
|
||||
|
|
@ -9,18 +9,17 @@ import (
|
|||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
)
|
||||
|
||||
var createUserSQL = `SET LOCAL synchronous_commit = 'local'; CREATE ROLE "%s" %s %s;`
|
||||
|
||||
func (c *Cluster) pgConnectionString() string {
|
||||
hostname := fmt.Sprintf("%s.%s.svc.cluster.local", c.Metadata.Name, c.Metadata.Namespace)
|
||||
password := c.pgUsers[constants.SuperuserName].Password
|
||||
password := c.pgUsers[c.OpConfig.SuperUsername].Password
|
||||
|
||||
return fmt.Sprintf("host='%s' dbname=postgres sslmode=require user='%s' password='%s'",
|
||||
hostname,
|
||||
constants.SuperuserName,
|
||||
c.OpConfig.SuperUsername,
|
||||
strings.Replace(password, "$", "\\$", -1))
|
||||
}
|
||||
|
||||
|
|
@ -52,7 +51,7 @@ func (c *Cluster) createPgUser(user spec.PgUser) (isHuman bool, err error) {
|
|||
if user.Password == "" {
|
||||
isHuman = true
|
||||
flags = append(flags, "SUPERUSER")
|
||||
flags = append(flags, fmt.Sprintf("IN ROLE \"%s\"", constants.PamRoleName))
|
||||
flags = append(flags, fmt.Sprintf("IN ROLE \"%s\"", c.OpConfig.PamRoleName))
|
||||
} else {
|
||||
isHuman = false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ func (c *Cluster) listPods() ([]v1.Pod, error) {
|
|||
LabelSelector: c.labelsSet().String(),
|
||||
}
|
||||
|
||||
pods, err := c.config.KubeClient.Pods(ns).List(listOptions)
|
||||
pods, err := c.KubeClient.Pods(ns).List(listOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Can't get list of Pods: %s", err)
|
||||
}
|
||||
|
|
@ -29,7 +29,7 @@ func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, erro
|
|||
LabelSelector: c.labelsSet().String(),
|
||||
}
|
||||
|
||||
pvcs, err := c.config.KubeClient.PersistentVolumeClaims(ns).List(listOptions)
|
||||
pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(listOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Can't get list of PersistentVolumeClaims: %s", err)
|
||||
}
|
||||
|
|
@ -60,7 +60,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error {
|
|||
return err
|
||||
}
|
||||
for _, pvc := range pvcs {
|
||||
if err := c.config.KubeClient.PersistentVolumeClaims(ns).Delete(pvc.Name, deleteOptions); err != nil {
|
||||
if err := c.KubeClient.PersistentVolumeClaims(ns).Delete(pvc.Name, deleteOptions); err != nil {
|
||||
c.logger.Warningf("Can't delete PersistentVolumeClaim: %s", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -83,7 +83,7 @@ func (c *Cluster) deletePod(pod *v1.Pod) error {
|
|||
delete(c.podSubscribers, podName)
|
||||
}()
|
||||
|
||||
if err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil {
|
||||
if err := c.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -126,7 +126,7 @@ func (c *Cluster) recreatePod(pod v1.Pod, spiloRole string) error {
|
|||
ch := c.registerPodSubscriber(podName)
|
||||
defer c.unregisterPodSubscriber(podName)
|
||||
|
||||
if err := c.config.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil {
|
||||
if err := c.KubeClient.Pods(pod.Namespace).Delete(pod.Name, deleteOptions); err != nil {
|
||||
return fmt.Errorf("Can't delete Pod: %s", err)
|
||||
}
|
||||
|
||||
|
|
@ -165,7 +165,7 @@ func (c *Cluster) recreatePods() error {
|
|||
listOptions := v1.ListOptions{
|
||||
LabelSelector: ls.String(),
|
||||
}
|
||||
pods, err := c.config.KubeClient.Pods(namespace).List(listOptions)
|
||||
pods, err := c.KubeClient.Pods(namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get the list of Pods: %s", err)
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -6,11 +6,8 @@ import (
|
|||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/resources"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -18,15 +15,6 @@ var (
|
|||
orphanDependents = false
|
||||
)
|
||||
|
||||
func genStatefulSet(clusterName spec.ClusterName, cSpec spec.PostgresSpec, etcdHost, dockerImage string) *v1beta1.StatefulSet {
|
||||
volumeSize := cSpec.Volume.Size
|
||||
volumeStorageClass := cSpec.Volume.StorageClass
|
||||
resourceList := resources.ResourceList(cSpec.Resources)
|
||||
template := resources.PodTemplate(clusterName, resourceList, cSpec.PgVersion, dockerImage, etcdHost)
|
||||
volumeClaimTemplate := resources.VolumeClaimTemplate(volumeSize, volumeStorageClass)
|
||||
|
||||
return resources.StatefulSet(clusterName, template, volumeClaimTemplate, cSpec.NumberOfInstances)
|
||||
}
|
||||
|
||||
func (c *Cluster) LoadResources() error {
|
||||
ns := c.Metadata.Namespace
|
||||
|
|
@ -34,7 +22,7 @@ func (c *Cluster) LoadResources() error {
|
|||
LabelSelector: c.labelsSet().String(),
|
||||
}
|
||||
|
||||
services, err := c.config.KubeClient.Services(ns).List(listOptions)
|
||||
services, err := c.KubeClient.Services(ns).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of Services: %s", err)
|
||||
}
|
||||
|
|
@ -44,7 +32,7 @@ func (c *Cluster) LoadResources() error {
|
|||
c.Service = &services.Items[0]
|
||||
}
|
||||
|
||||
endpoints, err := c.config.KubeClient.Endpoints(ns).List(listOptions)
|
||||
endpoints, err := c.KubeClient.Endpoints(ns).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of Endpoints: %s", err)
|
||||
}
|
||||
|
|
@ -54,7 +42,7 @@ func (c *Cluster) LoadResources() error {
|
|||
c.Endpoint = &endpoints.Items[0]
|
||||
}
|
||||
|
||||
secrets, err := c.config.KubeClient.Secrets(ns).List(listOptions)
|
||||
secrets, err := c.KubeClient.Secrets(ns).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of Secrets: %s", err)
|
||||
}
|
||||
|
|
@ -66,7 +54,7 @@ func (c *Cluster) LoadResources() error {
|
|||
c.logger.Debugf("Secret loaded, uid: %s", secret.UID)
|
||||
}
|
||||
|
||||
statefulSets, err := c.config.KubeClient.StatefulSets(ns).List(listOptions)
|
||||
statefulSets, err := c.KubeClient.StatefulSets(ns).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of StatefulSets: %s", err)
|
||||
}
|
||||
|
|
@ -121,8 +109,8 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
|
|||
if c.Statefulset != nil {
|
||||
return nil, fmt.Errorf("StatefulSet already exists in the cluster")
|
||||
}
|
||||
statefulSetSpec := genStatefulSet(c.ClusterName(), c.Spec, c.etcdHost, c.dockerImage)
|
||||
statefulSet, err := c.config.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create(statefulSetSpec)
|
||||
statefulSetSpec := c.genStatefulSet(c.Spec)
|
||||
statefulSet, err := c.KubeClient.StatefulSets(statefulSetSpec.Namespace).Create(statefulSetSpec)
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
return nil, fmt.Errorf("StatefulSet '%s' already exists", util.NameFromMeta(statefulSetSpec.ObjectMeta))
|
||||
}
|
||||
|
|
@ -139,7 +127,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
|
|||
if c.Statefulset == nil {
|
||||
return fmt.Errorf("There is no StatefulSet in the cluster")
|
||||
}
|
||||
statefulSet, err := c.config.KubeClient.StatefulSets(newStatefulSet.Namespace).Update(newStatefulSet)
|
||||
statefulSet, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Update(newStatefulSet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -154,7 +142,7 @@ func (c *Cluster) deleteStatefulSet() error {
|
|||
return fmt.Errorf("There is no StatefulSet in the cluster")
|
||||
}
|
||||
|
||||
err := c.config.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(c.Statefulset.Name, deleteOptions)
|
||||
err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Delete(c.Statefulset.Name, deleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -167,9 +155,9 @@ func (c *Cluster) createService() (*v1.Service, error) {
|
|||
if c.Service != nil {
|
||||
return nil, fmt.Errorf("Service already exists in the cluster")
|
||||
}
|
||||
serviceSpec := resources.Service(c.ClusterName(), c.TeamName(), c.Spec.AllowedSourceRanges)
|
||||
serviceSpec := c.genService(c.Spec.AllowedSourceRanges)
|
||||
|
||||
service, err := c.config.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec)
|
||||
service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec)
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
return nil, fmt.Errorf("Service '%s' already exists", util.NameFromMeta(serviceSpec.ObjectMeta))
|
||||
}
|
||||
|
|
@ -188,7 +176,7 @@ func (c *Cluster) updateService(newService *v1.Service) error {
|
|||
newService.ObjectMeta = c.Service.ObjectMeta
|
||||
newService.Spec.ClusterIP = c.Service.Spec.ClusterIP
|
||||
|
||||
svc, err := c.config.KubeClient.Services(newService.Namespace).Update(newService)
|
||||
svc, err := c.KubeClient.Services(newService.Namespace).Update(newService)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -201,7 +189,7 @@ func (c *Cluster) deleteService() error {
|
|||
if c.Service == nil {
|
||||
return fmt.Errorf("There is no Service in the cluster")
|
||||
}
|
||||
err := c.config.KubeClient.Services(c.Service.Namespace).Delete(c.Service.Name, deleteOptions)
|
||||
err := c.KubeClient.Services(c.Service.Namespace).Delete(c.Service.Name, deleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -214,18 +202,18 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) {
|
|||
if c.Endpoint != nil {
|
||||
return nil, fmt.Errorf("Endpoint already exists in the cluster")
|
||||
}
|
||||
endpointSpec := resources.Endpoint(c.ClusterName())
|
||||
endpointsSpec := c.genEndpoints()
|
||||
|
||||
endpoint, err := c.config.KubeClient.Endpoints(endpointSpec.Namespace).Create(endpointSpec)
|
||||
endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec)
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
return nil, fmt.Errorf("Endpoint '%s' already exists", util.NameFromMeta(endpointSpec.ObjectMeta))
|
||||
return nil, fmt.Errorf("Endpoint '%s' already exists", util.NameFromMeta(endpointsSpec.ObjectMeta))
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Endpoint = endpoint
|
||||
c.Endpoint = endpoints
|
||||
|
||||
return endpoint, nil
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
func (c *Cluster) updateEndpoint(newEndpoint *v1.Endpoints) error {
|
||||
|
|
@ -238,7 +226,7 @@ func (c *Cluster) deleteEndpoint() error {
|
|||
if c.Endpoint == nil {
|
||||
return fmt.Errorf("There is no Endpoint in the cluster")
|
||||
}
|
||||
err := c.config.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, deleteOptions)
|
||||
err := c.KubeClient.Endpoints(c.Endpoint.Namespace).Delete(c.Endpoint.Name, deleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -248,16 +236,16 @@ func (c *Cluster) deleteEndpoint() error {
|
|||
}
|
||||
|
||||
func (c *Cluster) applySecrets() error {
|
||||
secrets, err := resources.UserSecrets(c.ClusterName(), c.pgUsers)
|
||||
secrets, err := c.genUserSecrets()
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get user Secrets")
|
||||
}
|
||||
|
||||
for secretUsername, secretSpec := range secrets {
|
||||
secret, err := c.config.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec)
|
||||
secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec)
|
||||
if k8sutil.ResourceAlreadyExists(err) {
|
||||
curSecrets, err := c.config.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name)
|
||||
curSecrets, err := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get current Secret: %s", err)
|
||||
}
|
||||
|
|
@ -279,7 +267,7 @@ func (c *Cluster) applySecrets() error {
|
|||
}
|
||||
|
||||
func (c *Cluster) deleteSecret(secret *v1.Secret) error {
|
||||
err := c.config.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, deleteOptions)
|
||||
err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, deleteOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -291,7 +279,7 @@ func (c *Cluster) deleteSecret(secret *v1.Secret) error {
|
|||
func (c *Cluster) createUsers() error {
|
||||
// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
|
||||
for username, user := range c.pgUsers {
|
||||
if username == constants.SuperuserName || username == constants.ReplicationUsername {
|
||||
if username == c.OpConfig.SuperUsername || username == c.OpConfig.ReplicationUsername {
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/resources"
|
||||
)
|
||||
|
||||
func (c *Cluster) SyncCluster() {
|
||||
|
|
@ -55,7 +54,7 @@ func (c *Cluster) syncService() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
desiredSvc := resources.Service(c.ClusterName(), c.Spec.TeamId, cSpec.AllowedSourceRanges)
|
||||
desiredSvc := c.genService(cSpec.AllowedSourceRanges)
|
||||
if c.sameServiceWith(desiredSvc) {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -99,7 +98,7 @@ func (c *Cluster) syncStatefulSet() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
desiredSS := genStatefulSet(c.ClusterName(), cSpec, c.etcdHost, c.dockerImage)
|
||||
desiredSS := c.genStatefulSet(cSpec)
|
||||
equalSS, rollUpdate := c.compareStatefulSetWith(desiredSS)
|
||||
if equalSS {
|
||||
return nil
|
||||
|
|
@ -132,7 +131,7 @@ func (c *Cluster) syncPods() error {
|
|||
listOptions := v1.ListOptions{
|
||||
LabelSelector: ls.String(),
|
||||
}
|
||||
pods, err := c.config.KubeClient.Pods(namespace).List(listOptions)
|
||||
pods, err := c.KubeClient.Pods(namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't get list of Pods: %s", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ func podMatchesTemplate(pod *v1.Pod, ss *v1beta1.StatefulSet) bool {
|
|||
}
|
||||
|
||||
func (c *Cluster) getTeamMembers() ([]string, error) {
|
||||
teamInfo, err := c.config.TeamsAPIClient.TeamInfo(c.Spec.TeamId)
|
||||
teamInfo, err := c.TeamsAPIClient.TeamInfo(c.Spec.TeamId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Can't get team info: %s", err)
|
||||
}
|
||||
|
|
@ -87,7 +87,7 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, spiloRole string
|
|||
if role == spiloRole { // TODO: newly-created Pods are always replicas => check against empty string only
|
||||
return nil
|
||||
}
|
||||
case <-time.After(constants.PodLabelWaitTimeout):
|
||||
case <-time.After(c.OpConfig.PodLabelWaitTimeout):
|
||||
return fmt.Errorf("Pod label wait timeout")
|
||||
}
|
||||
}
|
||||
|
|
@ -100,19 +100,19 @@ func (c *Cluster) waitForPodDeletion(podEvents chan spec.PodEvent) error {
|
|||
if podEvent.EventType == spec.PodEventDelete {
|
||||
return nil
|
||||
}
|
||||
case <-time.After(constants.PodDeletionWaitTimeout):
|
||||
case <-time.After(c.OpConfig.PodDeletionWaitTimeout):
|
||||
return fmt.Errorf("Pod deletion wait timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) waitStatefulsetReady() error {
|
||||
return retryutil.Retry(constants.ResourceCheckInterval, constants.ResourceCheckTimeout,
|
||||
return retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
|
||||
func() (bool, error) {
|
||||
listOptions := v1.ListOptions{
|
||||
LabelSelector: c.labelsSet().String(),
|
||||
}
|
||||
ss, err := c.config.KubeClient.StatefulSets(c.Metadata.Namespace).List(listOptions)
|
||||
ss, err := c.KubeClient.StatefulSets(c.Metadata.Namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
@ -138,19 +138,19 @@ func (c *Cluster) waitPodLabelsReady() error {
|
|||
replicaListOption := v1.ListOptions{
|
||||
LabelSelector: labels.Merge(ls, labels.Set{"spilo-role": "replica"}).String(),
|
||||
}
|
||||
pods, err := c.config.KubeClient.Pods(namespace).List(listOptions)
|
||||
pods, err := c.KubeClient.Pods(namespace).List(listOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
podsNumber := len(pods.Items)
|
||||
|
||||
return retryutil.Retry(constants.ResourceCheckInterval, constants.ResourceCheckTimeout,
|
||||
return retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
|
||||
func() (bool, error) {
|
||||
masterPods, err := c.config.KubeClient.Pods(namespace).List(masterListOption)
|
||||
masterPods, err := c.KubeClient.Pods(namespace).List(masterListOption)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
replicaPods, err := c.config.KubeClient.Pods(namespace).List(replicaListOption)
|
||||
replicaPods, err := c.KubeClient.Pods(namespace).List(replicaListOption)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
@ -198,7 +198,7 @@ func (c *Cluster) deleteEtcdKey() error {
|
|||
etcdKey := fmt.Sprintf("/service/%s", c.Metadata.Name)
|
||||
|
||||
//TODO: retry multiple times
|
||||
resp, err := c.config.EtcdClient.Delete(context.Background(),
|
||||
resp, err := c.EtcdClient.Delete(context.Background(),
|
||||
etcdKey,
|
||||
&etcdclient.DeleteOptions{Recursive: true})
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import (
|
|||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/cluster"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/spec"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/config"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/teams"
|
||||
)
|
||||
|
||||
|
|
@ -25,7 +25,8 @@ type Config struct {
|
|||
}
|
||||
|
||||
type Controller struct {
|
||||
config Config
|
||||
Config
|
||||
opConfig *config.Config
|
||||
logger *logrus.Entry
|
||||
clusters map[spec.ClusterName]*cluster.Cluster
|
||||
stopChMap map[spec.ClusterName]chan struct{}
|
||||
|
|
@ -37,9 +38,10 @@ type Controller struct {
|
|||
podCh chan spec.PodEvent
|
||||
}
|
||||
|
||||
func New(cfg *Config) *Controller {
|
||||
func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
|
||||
return &Controller{
|
||||
config: *cfg,
|
||||
Config: *controllerConfig,
|
||||
opConfig: operatorConfig,
|
||||
logger: logrus.WithField("pkg", "controller"),
|
||||
clusters: make(map[spec.ClusterName]*cluster.Cluster),
|
||||
stopChMap: make(map[spec.ClusterName]chan struct{}),
|
||||
|
|
@ -52,12 +54,12 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
|
|||
wg.Add(1)
|
||||
|
||||
c.initController()
|
||||
if err := c.initEtcdClient(); err != nil {
|
||||
if err := c.initEtcdClient(c.opConfig.EtcdHost); err != nil {
|
||||
c.logger.Errorf("Can't get etcd client: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Infof("'%s' namespace will be watched", c.config.PodNamespace)
|
||||
c.logger.Infof("'%s' namespace will be watched", c.PodNamespace)
|
||||
go c.runInformers(stopCh)
|
||||
|
||||
c.logger.Info("Started working in background")
|
||||
|
|
@ -68,7 +70,7 @@ func (c *Controller) initController() {
|
|||
c.logger.Fatalf("Can't register ThirdPartyResource: %s", err)
|
||||
}
|
||||
|
||||
c.config.TeamsAPIClient.RefreshTokenAction = c.getOAuthToken
|
||||
c.TeamsAPIClient.RefreshTokenAction = c.getOAuthToken
|
||||
|
||||
// Postgresqls
|
||||
clusterLw := &cache.ListWatch{
|
||||
|
|
@ -78,7 +80,7 @@ func (c *Controller) initController() {
|
|||
c.postgresqlInformer = cache.NewSharedIndexInformer(
|
||||
clusterLw,
|
||||
&spec.Postgresql{},
|
||||
constants.ResyncPeriodTPR,
|
||||
c.opConfig.ResyncPeriod,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
|
||||
c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
|
|
@ -96,7 +98,7 @@ func (c *Controller) initController() {
|
|||
c.podInformer = cache.NewSharedIndexInformer(
|
||||
podLw,
|
||||
&v1.Pod{},
|
||||
constants.ResyncPeriodPod,
|
||||
c.opConfig.ResyncPeriodPod,
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
|
||||
c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
|
|
|
|||
|
|
@ -4,12 +4,11 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
etcdclient "github.com/coreos/etcd/client"
|
||||
)
|
||||
|
||||
func (c *Controller) initEtcdClient() error {
|
||||
etcdUrl := fmt.Sprintf("http://%s", constants.EtcdHost)
|
||||
func (c *Controller) initEtcdClient(etcdHost string) error {
|
||||
etcdUrl := fmt.Sprintf("http://%s", etcdHost)
|
||||
|
||||
cfg, err := etcdclient.New(etcdclient.Config{
|
||||
Endpoints: []string{etcdUrl},
|
||||
|
|
@ -20,7 +19,7 @@ func (c *Controller) initEtcdClient() error {
|
|||
return err
|
||||
}
|
||||
|
||||
c.config.EtcdClient = etcdclient.NewKeysAPI(cfg)
|
||||
c.EtcdClient = etcdclient.NewKeysAPI(cfg)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ func (c *Controller) podListFunc(options api.ListOptions) (runtime.Object, error
|
|||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.config.KubeClient.CoreV1().Pods(c.config.PodNamespace).List(opts)
|
||||
return c.KubeClient.CoreV1().Pods(c.PodNamespace).List(opts)
|
||||
}
|
||||
|
||||
func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, error) {
|
||||
|
|
@ -52,7 +52,7 @@ func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, err
|
|||
TimeoutSeconds: options.TimeoutSeconds,
|
||||
}
|
||||
|
||||
return c.config.KubeClient.CoreV1Client.Pods(c.config.PodNamespace).Watch(opts)
|
||||
return c.KubeClient.CoreV1Client.Pods(c.PodNamespace).Watch(opts)
|
||||
}
|
||||
|
||||
func PodNameFromMeta(meta v1.ObjectMeta) spec.PodName {
|
||||
|
|
|
|||
|
|
@ -18,8 +18,8 @@ import (
|
|||
|
||||
func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) {
|
||||
c.logger.Info("Getting list of currently running clusters")
|
||||
object, err := c.config.RestClient.Get().
|
||||
Namespace(c.config.PodNamespace).
|
||||
object, err := c.RestClient.Get().
|
||||
Namespace(c.PodNamespace).
|
||||
Resource(constants.ResourceName).
|
||||
VersionedParams(&options, api.ParameterCodec).
|
||||
FieldsSelectorParam(fields.Everything()).
|
||||
|
|
@ -67,9 +67,9 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e
|
|||
}
|
||||
|
||||
func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) {
|
||||
return c.config.RestClient.Get().
|
||||
return c.RestClient.Get().
|
||||
Prefix("watch").
|
||||
Namespace(c.config.PodNamespace).
|
||||
Namespace(c.PodNamespace).
|
||||
Resource(constants.ResourceName).
|
||||
VersionedParams(&options, api.ParameterCodec).
|
||||
FieldsSelectorParam(fields.Everything()).
|
||||
|
|
@ -128,7 +128,6 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
|
|||
}
|
||||
|
||||
//TODO: Do not update cluster which is currently creating
|
||||
|
||||
if pgPrev.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion {
|
||||
c.logger.Infof("Skipping update with no resource version change")
|
||||
return
|
||||
|
|
|
|||
|
|
@ -3,25 +3,28 @@ package controller
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/client-go/pkg/api"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1"
|
||||
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/cluster"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/k8sutil"
|
||||
"github.bus.zalan.do/acid/postgres-operator/pkg/util/resources"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
)
|
||||
|
||||
func (c *Controller) makeClusterConfig() cluster.Config {
|
||||
return cluster.Config{
|
||||
KubeClient: c.config.KubeClient,
|
||||
RestClient: c.config.RestClient,
|
||||
EtcdClient: c.config.EtcdClient,
|
||||
TeamsAPIClient: c.config.TeamsAPIClient,
|
||||
KubeClient: c.KubeClient,
|
||||
RestClient: c.RestClient,
|
||||
EtcdClient: c.EtcdClient,
|
||||
TeamsAPIClient: c.TeamsAPIClient,
|
||||
OpConfig: c.opConfig,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) getOAuthToken() (string, error) {
|
||||
// Temporary getting postgresql-operator secret from the NamespaceDefault
|
||||
credentialsSecret, err := c.config.KubeClient.Secrets(api.NamespaceDefault).Get(constants.OAuthTokenSecretName)
|
||||
credentialsSecret, err := c.KubeClient.Secrets(api.NamespaceDefault).Get(c.opConfig.OAuthTokenSecretName)
|
||||
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Can't get credentials Secret: %s", err)
|
||||
|
|
@ -35,11 +38,24 @@ func (c *Controller) getOAuthToken() (string, error) {
|
|||
return string(data["read-only-token-secret"]), nil
|
||||
}
|
||||
|
||||
func thirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource {
|
||||
return &extv1beta.ThirdPartyResource{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
//ThirdPartyResources are cluster-wide
|
||||
Name: TPRName,
|
||||
},
|
||||
Versions: []extv1beta.APIVersion{
|
||||
{Name: constants.TPRApiVersion},
|
||||
},
|
||||
Description: constants.TPRDescription,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) createTPR() error {
|
||||
TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor)
|
||||
tpr := resources.ThirdPartyResource(TPRName)
|
||||
tpr := thirdPartyResource(TPRName)
|
||||
|
||||
_, err := c.config.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr)
|
||||
_, err := c.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr)
|
||||
if err != nil {
|
||||
if !k8sutil.ResourceAlreadyExists(err) {
|
||||
return err
|
||||
|
|
@ -50,7 +66,7 @@ func (c *Controller) createTPR() error {
|
|||
c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName)
|
||||
}
|
||||
|
||||
restClient := c.config.RestClient
|
||||
restClient := c.RestClient
|
||||
|
||||
return k8sutil.WaitTPRReady(restClient, constants.TPRReadyWaitInterval, constants.TPRReadyWaitTimeout, c.config.PodNamespace)
|
||||
return k8sutil.WaitTPRReady(restClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.PodNamespace)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ type Volume struct {
|
|||
}
|
||||
|
||||
type PostgresqlParam struct {
|
||||
PgVersion string `json:"version"`
|
||||
PgVersion string `json:"version"`
|
||||
Parameters map[string]string `json:"parameters"`
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,51 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
)
|
||||
|
||||
type TPR struct {
|
||||
ReadyWaitInterval time.Duration `split_words:"true",default:"3s"`
|
||||
ReadyWaitTimeout time.Duration `split_words:"true",default:"30s"`
|
||||
ResyncPeriod time.Duration `split_words:"true",default:"5m"`
|
||||
}
|
||||
|
||||
type Resources struct {
|
||||
ResyncPeriodPod time.Duration `split_words:"true",default:"5m"`
|
||||
ResourceCheckInterval time.Duration `split_words:"true",default:"3s"`
|
||||
ResourceCheckTimeout time.Duration `split_words:"true",default:"10m"`
|
||||
PodLabelWaitTimeout time.Duration `split_words:"true",default:"10m"`
|
||||
PodDeletionWaitTimeout time.Duration `split_words:"true",default:"10m"`
|
||||
}
|
||||
|
||||
type Auth struct {
|
||||
PamRoleName string `split_words:"true",default:"zalandos"`
|
||||
PamConfiguration string `split_words:"true",default:"https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"`
|
||||
TeamsAPIUrl string `envconfig:"teams_api_url",default:"https://teams.example.com/api/"`
|
||||
OAuthTokenSecretName string `envconfig:"oauth_token_secret_name",default:"postgresql-operator"`
|
||||
SuperUsername string `split_words:"true",default:"postgres"`
|
||||
ReplicationUsername string `split_words:"true",default:"replication"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
TPR
|
||||
Resources
|
||||
Auth
|
||||
EtcdHost string `split_words:"true",default:"etcd-client.default.svc.cluster.local:2379"`
|
||||
DockerImage string `split_words:"true",default:"registry.opensource.zalan.do/acid/spilo-9.6:1.2-p12"`
|
||||
ServiceAccountName string `split_words:"true",default:"operator"`
|
||||
DbHostedZone string `split_words:"true",default:"db.example.com"`
|
||||
}
|
||||
|
||||
func LoadFromEnv() *Config {
|
||||
var cfg Config
|
||||
err := envconfig.Process("PGOP", &cfg)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("Can't read config: %v", err))
|
||||
}
|
||||
|
||||
return &cfg
|
||||
}
|
||||
|
|
@ -1,41 +1,14 @@
|
|||
package constants
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
TPRName = "postgresql"
|
||||
TPRVendor = "acid.zalan.do"
|
||||
TPRDescription = "Managed PostgreSQL clusters"
|
||||
TPRReadyWaitInterval = 3 * time.Second
|
||||
TPRReadyWaitTimeout = 30 * time.Second
|
||||
TPRApiVersion = "v1"
|
||||
ResourceCheckInterval = 3 * time.Second
|
||||
ResourceCheckTimeout = 10 * time.Minute
|
||||
|
||||
PodLabelWaitTimeout = 10 * time.Minute
|
||||
PodDeletionWaitTimeout = 10 * time.Minute
|
||||
|
||||
ResourceName = TPRName + "s"
|
||||
ResyncPeriodTPR = 5 * time.Minute
|
||||
ResyncPeriodPod = 5 * time.Minute
|
||||
|
||||
SuperuserName = "postgres"
|
||||
ReplicationUsername = "replication"
|
||||
|
||||
//TODO: move to the operator spec
|
||||
EtcdHost = "etcd-client.default.svc.cluster.local:2379"
|
||||
SpiloImage = "registry.opensource.zalan.do/acid/spilo-9.6:1.2-p12"
|
||||
PamRoleName = "zalandos"
|
||||
PamConfiguration = "https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"
|
||||
PasswordLength = 64
|
||||
TeamsAPIUrl = "https://teams.example.com/api/"
|
||||
UserSecretTemplate = "%s.%s.credentials.%s.%s" // Username, ClusterName, TPRName, TPRVendor
|
||||
|
||||
OAuthTokenSecretName = "postgresql-operator"
|
||||
ServiceAccountName = "operator"
|
||||
//Constants
|
||||
TPRName = "postgresql"
|
||||
TPRVendor = "acid.zalan.do"
|
||||
TPRDescription = "Managed PostgreSQL clusters"
|
||||
TPRApiVersion = "v1"
|
||||
DataVolumeName = "pgdata"
|
||||
PasswordLength = 64
|
||||
UserSecretTemplate = "%s.%s.credentials.%s.%s" // Username, ClusterName, TPRName, TPRVendor
|
||||
ZalandoDnsNameAnnotation = "zalando.org/dnsname"
|
||||
|
||||
// TODO: move DbHostedZone to operator configuration
|
||||
DbHostedZone = "db.example.com"
|
||||
ResourceName = TPRName + "s"
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue