Merge branch 'master' into client-go-v4

# Conflicts:
#	cmd/main.go
#	glide.lock
#	pkg/cluster/cluster.go
#	pkg/cluster/exec.go
#	pkg/cluster/k8sres.go
#	pkg/cluster/pod.go
#	pkg/cluster/resources.go
#	pkg/cluster/util.go
#	pkg/cluster/volumes.go
#	pkg/controller/controller.go
#	pkg/controller/pod.go
#	pkg/controller/postgresql.go
#	pkg/controller/util.go
#	pkg/spec/postgresql.go
#	pkg/spec/postgresql_test.go
#	pkg/spec/types.go
#	pkg/util/k8sutil/k8sutil.go
#	pkg/util/util.go
#	pkg/util/util_test.go
This commit is contained in:
Murat Kabilov 2017-07-27 10:55:07 +02:00
commit 726c8b7f49
30 changed files with 604 additions and 436 deletions

View File

@ -8,67 +8,42 @@ import (
"sync" "sync"
"syscall" "syscall"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/zalando-incubator/postgres-operator/pkg/controller" "github.com/zalando-incubator/postgres-operator/pkg/controller"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
) )
var ( var (
KubeConfigFile string KubeConfigFile string
podNamespace string OutOfCluster bool
configMapName spec.NamespacedName version string
OutOfCluster bool
noTeamsAPI bool config controller.Config
noDatabaseAccess bool
version string
) )
func init() { func init() {
flag.StringVar(&KubeConfigFile, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") flag.StringVar(&KubeConfigFile, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.")
flag.BoolVar(&OutOfCluster, "outofcluster", false, "Whether the operator runs in- our outside of the Kubernetes cluster.") flag.BoolVar(&OutOfCluster, "outofcluster", false, "Whether the operator runs in- our outside of the Kubernetes cluster.")
flag.BoolVar(&noDatabaseAccess, "nodatabaseaccess", false, "Disable all access to the database from the operator side.") flag.BoolVar(&config.NoDatabaseAccess, "nodatabaseaccess", false, "Disable all access to the database from the operator side.")
flag.BoolVar(&noTeamsAPI, "noteamsapi", false, "Disable all access to the teams API") flag.BoolVar(&config.NoTeamsAPI, "noteamsapi", false, "Disable all access to the teams API")
flag.Parse() flag.Parse()
podNamespace = os.Getenv("MY_POD_NAMESPACE") config.Namespace = os.Getenv("MY_POD_NAMESPACE")
if podNamespace == "" { if config.Namespace == "" {
podNamespace = "default" config.Namespace = "default"
} }
configMap := os.Getenv("CONFIG_MAP_NAME") configMap := os.Getenv("CONFIG_MAP_NAME")
if configMap != "" { if configMap != "" {
configMapName.Decode(configMap) err := config.ConfigMapName.Decode(configMap)
} if err != nil {
} log.Fatalf("incorrect config map name")
}
func ControllerConfig() *controller.Config {
restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster)
if err != nil {
log.Fatalf("Can't get REST config: %s", err)
}
client, err := k8sutil.KubernetesClient(restConfig)
if err != nil {
log.Fatalf("Can't create client: %s", err)
}
restClient, err := k8sutil.KubernetesRestClient(restConfig)
if err != nil {
log.Fatalf("Can't create rest client: %s", err)
}
return &controller.Config{
RestConfig: restConfig,
KubeClient: client,
RestClient: restClient,
} }
} }
func main() { func main() {
configMapData := make(map[string]string) var err error
log.SetOutput(os.Stdout) log.SetOutput(os.Stdout)
log.Printf("Spilo operator %s\n", version) log.Printf("Spilo operator %s\n", version)
@ -78,33 +53,13 @@ func main() {
wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on
controllerConfig := ControllerConfig() config.RestConfig, err = k8sutil.RestConfig(KubeConfigFile, OutOfCluster)
if err != nil {
if configMapName != (spec.NamespacedName{}) { log.Fatalf("couldn't get REST config: %v", err)
configMap, err := controllerConfig.KubeClient.ConfigMaps(configMapName.Namespace).Get(configMapName.Name, meta_v1.GetOptions{})
if err != nil {
panic(err)
}
configMapData = configMap.Data
} else {
log.Printf("No ConfigMap specified. Loading default values")
} }
if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var c := controller.NewController(&config)
configMapData["namespace"] = podNamespace
}
if noDatabaseAccess {
configMapData["enable_database_access"] = "false"
}
if noTeamsAPI {
configMapData["enable_teams_api"] = "false"
}
cfg := config.NewFromMap(configMapData)
log.Printf("Config: %s", cfg.MustMarshal())
c := controller.New(controllerConfig, cfg)
c.Run(stop, wg) c.Run(stop, wg)
sig := <-sigs sig := <-sigs

13
glide.lock generated
View File

@ -1,8 +1,8 @@
hash: 140e0c8a606d18ca405e9c50359cc673e4aa0cc88bcae5d7f83791e7002bd6a1 hash: 140e0c8a606d18ca405e9c50359cc673e4aa0cc88bcae5d7f83791e7002bd6a1
updated: 2017-07-12T12:52:55.896264+02:00 updated: 2017-07-24T19:24:17.604824235+02:00
imports: imports:
- name: github.com/aws/aws-sdk-go - name: github.com/aws/aws-sdk-go
version: b1a7b51924b90a6ecdbaeb17e96418740ff07a1e version: afd601335e2a72d43caa3af6bd2abe512fcc3bfd
subpackages: subpackages:
- aws - aws
- aws/awserr - aws/awserr
@ -30,7 +30,7 @@ imports:
- service/ec2 - service/ec2
- service/sts - service/sts
- name: github.com/davecgh/go-spew - name: github.com/davecgh/go-spew
version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d version: 782f4967f2dc4564575ca782fe2d04090b5faca8
subpackages: subpackages:
- spew - spew
- name: github.com/docker/distribution - name: github.com/docker/distribution
@ -104,7 +104,7 @@ imports:
- name: github.com/PuerkitoBio/urlesc - name: github.com/PuerkitoBio/urlesc
version: 5bd2802263f21d8788851d5305584c82a5c75d7e version: 5bd2802263f21d8788851d5305584c82a5c75d7e
- name: github.com/Sirupsen/logrus - name: github.com/Sirupsen/logrus
version: 7f976d3a76720c4c27af2ba716b85d2e0a7e38b1 version: a3f95b5c423586578a4e099b11a46c2479628cac
- name: github.com/spf13/pflag - name: github.com/spf13/pflag
version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7
- name: github.com/ugorji/go - name: github.com/ugorji/go
@ -118,12 +118,15 @@ imports:
- name: golang.org/x/net - name: golang.org/x/net
version: f2499483f923065a842d38eb4c7f1927e6fc6e6d version: f2499483f923065a842d38eb4c7f1927e6fc6e6d
subpackages: subpackages:
- html
- html/atom
- http2 - http2
- http2/hpack - http2/hpack
- idna - idna
- lex/httplex - lex/httplex
- websocket
- name: golang.org/x/sys - name: golang.org/x/sys
version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 version: c4489faa6e5ab84c0ef40d6ee878f7a030281f0f
subpackages: subpackages:
- unix - unix
- name: golang.org/x/text - name: golang.org/x/text

View File

@ -11,9 +11,8 @@ import (
"sync" "sync"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/apis/apps/v1beta1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
@ -36,11 +35,8 @@ var (
// Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
type Config struct { type Config struct {
KubeClient *kubernetes.Clientset //TODO: move clients to the better place?
RestClient rest.Interface
RestConfig *rest.Config
TeamsAPIClient *teams.API
OpConfig config.Config OpConfig config.Config
RestConfig *rest.Config
InfrastructureRoles map[string]spec.PgUser // inherited from the controller InfrastructureRoles map[string]spec.PgUser // inherited from the controller
} }
@ -66,8 +62,11 @@ type Cluster struct {
mu sync.Mutex mu sync.Mutex
masterLess bool masterLess bool
userSyncStrategy spec.UserSyncer userSyncStrategy spec.UserSyncer
deleteOptions *meta_v1.DeleteOptions deleteOptions *metav1.DeleteOptions
podEventsQueue *cache.FIFO podEventsQueue *cache.FIFO
teamsAPIClient *teams.API
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
} }
type compareStatefulsetResult struct { type compareStatefulsetResult struct {
@ -78,8 +77,8 @@ type compareStatefulsetResult struct {
} }
// New creates a new cluster. This function should be called from a controller. // New creates a new cluster. This function should be called from a controller.
func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Name)
kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)}
orphanDependents := true orphanDependents := true
@ -102,15 +101,17 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
kubeResources: kubeResources, kubeResources: kubeResources,
masterLess: false, masterLess: false,
userSyncStrategy: users.DefaultUserSyncStrategy{}, userSyncStrategy: users.DefaultUserSyncStrategy{},
deleteOptions: &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents}, deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
podEventsQueue: podEventsQueue, podEventsQueue: podEventsQueue,
KubeClient: kubeClient,
teamsAPIClient: teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger.Logger),
} }
return cluster return cluster
} }
func (c *Cluster) clusterName() spec.NamespacedName { func (c *Cluster) clusterName() spec.NamespacedName {
return util.NameFromMeta(c.Metadata) return util.NameFromMeta(c.ObjectMeta)
} }
func (c *Cluster) teamName() string { func (c *Cluster) teamName() string {
@ -126,8 +127,8 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) {
} }
request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods
_, err = c.RestClient.Patch(types.MergePatchType). _, err = c.KubeClient.RESTClient.Patch(types.MergePatchType).
RequestURI(c.Metadata.GetSelfLink()). RequestURI(c.GetSelfLink()).
Body(request). Body(request).
DoRaw() DoRaw()
@ -137,7 +138,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) {
} }
if err != nil { if err != nil {
c.logger.Warningf("could not set status for cluster '%s': %s", c.clusterName(), err) c.logger.Warningf("could not set status for cluster %q: %v", c.clusterName(), err)
} }
} }
@ -180,7 +181,7 @@ func (c *Cluster) Create() error {
if err != nil { if err != nil {
return fmt.Errorf("could not create endpoint: %v", err) return fmt.Errorf("could not create endpoint: %v", err)
} }
c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []PostgresRole{Master, Replica} {
if role == Replica && !c.Spec.ReplicaLoadBalancer { if role == Replica && !c.Spec.ReplicaLoadBalancer {
@ -190,7 +191,7 @@ func (c *Cluster) Create() error {
if err != nil { if err != nil {
return fmt.Errorf("could not create %s service: %v", role, err) return fmt.Errorf("could not create %s service: %v", role, err)
} }
c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta))
} }
if err = c.initUsers(); err != nil { if err = c.initUsers(); err != nil {
@ -207,12 +208,12 @@ func (c *Cluster) Create() error {
if err != nil { if err != nil {
return fmt.Errorf("could not create statefulset: %v", err) return fmt.Errorf("could not create statefulset: %v", err)
} }
c.logger.Infof("statefulset '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta))
c.logger.Info("Waiting for cluster being ready") c.logger.Info("Waiting for cluster being ready")
if err = c.waitStatefulsetPodsReady(); err != nil { if err = c.waitStatefulsetPodsReady(); err != nil {
c.logger.Errorf("Failed to create cluster: %s", err) c.logger.Errorf("Failed to create cluster: %v", err)
return err return err
} }
c.logger.Infof("pods are ready") c.logger.Infof("pods are ready")
@ -233,7 +234,7 @@ func (c *Cluster) Create() error {
err = c.listResources() err = c.listResources()
if err != nil { if err != nil {
c.logger.Errorf("could not list resources: %s", err) c.logger.Errorf("could not list resources: %v", err)
} }
return nil return nil
@ -243,7 +244,7 @@ func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match
//TODO: improve comparison //TODO: improve comparison
match = true match = true
if c.Service[role].Spec.Type != service.Spec.Type { if c.Service[role].Spec.Type != service.Spec.Type {
return false, fmt.Sprintf("new %s service's type %s doesn't match the current one %s", return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q",
role, service.Spec.Type, c.Service[role].Spec.Type) role, service.Spec.Type, c.Service[role].Spec.Type)
} }
oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges
@ -259,7 +260,7 @@ func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match
oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation] oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation]
newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation] newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation]
if oldDNSAnnotation != newDNSAnnotation { if oldDNSAnnotation != newDNSAnnotation {
return false, fmt.Sprintf("new %s service's '%s' annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation)
} }
return true, "" return true, ""
@ -290,7 +291,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
} }
if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 {
c.logger.Warnf("statefulset '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) c.logger.Warnf("statefulset %q has no container", util.NameFromMeta(c.Statefulset.ObjectMeta))
return &compareStatefulsetResult{} return &compareStatefulsetResult{}
} }
// In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through // In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through
@ -333,12 +334,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
} }
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) {
needsReplace = true needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name)) reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q doesn't match the current one", name))
} }
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
needsReplace = true needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name)) reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %q doesn't match the current one", name))
} }
} }
@ -405,8 +406,8 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
defer c.mu.Unlock() defer c.mu.Unlock()
c.setStatus(spec.ClusterStatusUpdating) c.setStatus(spec.ClusterStatusUpdating)
c.logger.Debugf("Cluster update from version %s to %s", c.logger.Debugf("Cluster update from version %q to %q",
c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion) c.ResourceVersion, newSpec.ResourceVersion)
/* Make sure we update when this function exists */ /* Make sure we update when this function exists */
defer func() { defer func() {
@ -431,7 +432,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
if err != nil { if err != nil {
return fmt.Errorf("could not create new %s service: %v", role, err) return fmt.Errorf("could not create new %s service: %v", role, err)
} }
c.logger.Infof("%s service '%s' has been created", role, util.NameFromMeta(service.ObjectMeta)) c.logger.Infof("%s service %q has been created", role, util.NameFromMeta(service.ObjectMeta))
} }
} }
// only proceed further if both old and new load balancer were present // only proceed further if both old and new load balancer were present
@ -446,7 +447,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
c.setStatus(spec.ClusterStatusUpdateFailed) c.setStatus(spec.ClusterStatusUpdateFailed)
return fmt.Errorf("could not update %s service: %v", role, err) return fmt.Errorf("could not update %s service: %v", role, err)
} }
c.logger.Infof("%s service '%s' has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta))
} }
} }
@ -471,11 +472,11 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
} }
} }
//TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted //TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted
c.logger.Infof("statefulset '%s' has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) c.logger.Infof("statefulset %q has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta))
} }
if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison
c.logger.Warnf("Postgresql version change(%s -> %s) is not allowed", c.logger.Warnf("Postgresql version change(%q -> %q) is not allowed",
c.Spec.PgVersion, newSpec.Spec.PgVersion) c.Spec.PgVersion, newSpec.Spec.PgVersion)
//TODO: rewrite pg version in tpr spec //TODO: rewrite pg version in tpr spec
} }

View File

@ -4,9 +4,10 @@ import (
"bytes" "bytes"
"fmt" "fmt"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/client-go/pkg/api" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
@ -18,7 +19,7 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (
execErr bytes.Buffer execErr bytes.Buffer
) )
pod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, meta_v1.GetOptions{}) pod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{})
if err != nil { if err != nil {
return "", fmt.Errorf("could not get pod info: %v", err) return "", fmt.Errorf("could not get pod info: %v", err)
} }
@ -27,17 +28,17 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (
return "", fmt.Errorf("could not determine which container to use") return "", fmt.Errorf("could not determine which container to use")
} }
req := c.RestClient.Post(). req := c.KubeClient.RESTClient.Post().
Resource("pods"). Resource("pods").
Name(podName.Name). Name(podName.Name).
Namespace(podName.Namespace). Namespace(podName.Namespace).
SubResource("exec") SubResource("exec")
req.VersionedParams(&api.PodExecOptions{ req.VersionedParams(&v1.PodExecOptions{
Container: pod.Spec.Containers[0].Name, Container: pod.Spec.Containers[0].Name,
Command: command, Command: command,
Stdout: true, Stdout: true,
Stderr: true, Stderr: true,
}, api.ParameterCodec) }, scheme.ParameterCodec)
exec, err := remotecommand.NewExecutor(c.RestConfig, "POST", req.URL()) exec, err := remotecommand.NewExecutor(c.RestConfig, "POST", req.URL())
if err != nil { if err != nil {

View File

@ -39,5 +39,5 @@ func (c *Cluster) resizePostgresFilesystem(podName *spec.NamespacedName, resizer
return err return err
} }
return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %s", fsType) return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %q", fsType)
} }

View File

@ -6,7 +6,7 @@ import (
"sort" "sort"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/apis/apps/v1beta1"
@ -199,7 +199,7 @@ PATRONI_INITDB_PARAMS:
} }
result, err := json.Marshal(config) result, err := json.Marshal(config)
if err != nil { if err != nil {
c.logger.Errorf("Cannot convert spilo configuration into JSON: %s", err) c.logger.Errorf("Cannot convert spilo configuration into JSON: %v", err)
return "" return ""
} }
return string(result) return string(result)
@ -211,7 +211,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme
envVars := []v1.EnvVar{ envVars := []v1.EnvVar{
{ {
Name: "SCOPE", Name: "SCOPE",
Value: c.Metadata.Name, Value: c.Name,
}, },
{ {
Name: "PGROOT", Name: "PGROOT",
@ -274,7 +274,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme
} }
privilegedMode := bool(true) privilegedMode := bool(true)
container := v1.Container{ container := v1.Container{
Name: c.Metadata.Name, Name: c.Name,
Image: c.OpConfig.DockerImage, Image: c.OpConfig.DockerImage,
ImagePullPolicy: v1.PullAlways, ImagePullPolicy: v1.PullAlways,
Resources: *resourceRequirements, Resources: *resourceRequirements,
@ -312,9 +312,9 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme
} }
template := v1.PodTemplateSpec{ template := v1.PodTemplateSpec{
ObjectMeta: meta_v1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: c.labelsSet(), Labels: c.labelsSet(),
Namespace: c.Metadata.Name, Namespace: c.Name,
}, },
Spec: podSpec, Spec: podSpec,
} }
@ -338,14 +338,14 @@ func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.Stateful
} }
statefulSet := &v1beta1.StatefulSet{ statefulSet := &v1beta1.StatefulSet{
ObjectMeta: meta_v1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.Metadata.Name, Name: c.Name,
Namespace: c.Metadata.Namespace, Namespace: c.Namespace,
Labels: c.labelsSet(), Labels: c.labelsSet(),
}, },
Spec: v1beta1.StatefulSetSpec{ Spec: v1beta1.StatefulSetSpec{
Replicas: &spec.NumberOfInstances, Replicas: &spec.NumberOfInstances,
ServiceName: c.Metadata.Name, ServiceName: c.Name,
Template: *podTemplate, Template: *podTemplate,
VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate}, VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate},
}, },
@ -355,7 +355,7 @@ func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.Stateful
} }
func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) {
metadata := meta_v1.ObjectMeta{ metadata := metav1.ObjectMeta{
Name: constants.DataVolumeName, Name: constants.DataVolumeName,
} }
if volumeStorageClass != "" { if volumeStorageClass != "" {
@ -387,7 +387,7 @@ func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string
func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) { func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) {
secrets = make(map[string]*v1.Secret, len(c.pgUsers)) secrets = make(map[string]*v1.Secret, len(c.pgUsers))
namespace := c.Metadata.Namespace namespace := c.Namespace
for username, pgUser := range c.pgUsers { for username, pgUser := range c.pgUsers {
//Skip users with no password i.e. human users (they'll be authenticated using pam) //Skip users with no password i.e. human users (they'll be authenticated using pam)
secret := c.generateSingleUserSecret(namespace, pgUser) secret := c.generateSingleUserSecret(namespace, pgUser)
@ -413,7 +413,7 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser)
} }
username := pgUser.Name username := pgUser.Name
secret := v1.Secret{ secret := v1.Secret{
ObjectMeta: meta_v1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.credentialSecretName(username), Name: c.credentialSecretName(username),
Namespace: namespace, Namespace: namespace,
Labels: c.labelsSet(), Labels: c.labelsSet(),
@ -430,7 +430,7 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser)
func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service { func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service {
dnsNameFunction := c.masterDnsName dnsNameFunction := c.masterDnsName
name := c.Metadata.Name name := c.Name
if role == Replica { if role == Replica {
dnsNameFunction = c.replicaDnsName dnsNameFunction = c.replicaDnsName
name = name + "-repl" name = name + "-repl"
@ -451,7 +451,7 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec)
if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) || if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) ||
(newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) { (newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) {
// safe default value: lock load balancer to only local address unless overriden explicitely. // safe default value: lock load balancer to only local address unless overridden explicitly.
sourceRanges := []string{localHost} sourceRanges := []string{localHost}
allowedSourceRanges := newSpec.AllowedSourceRanges allowedSourceRanges := newSpec.AllowedSourceRanges
if len(allowedSourceRanges) >= 0 { if len(allowedSourceRanges) >= 0 {
@ -469,9 +469,9 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec)
} }
service := &v1.Service{ service := &v1.Service{
ObjectMeta: meta_v1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
Namespace: c.Metadata.Namespace, Namespace: c.Namespace,
Labels: c.roleLabelsSet(role), Labels: c.roleLabelsSet(role),
Annotations: annotations, Annotations: annotations,
}, },
@ -483,9 +483,9 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec)
func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints { func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints {
endpoints := &v1.Endpoints{ endpoints := &v1.Endpoints{
ObjectMeta: meta_v1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.Metadata.Name, Name: c.Name,
Namespace: c.Metadata.Namespace, Namespace: c.Namespace,
Labels: c.roleLabelsSet(Master), Labels: c.roleLabelsSet(Master),
}, },
} }

View File

@ -22,7 +22,7 @@ var getUserSQL = `SELECT a.rolname, COALESCE(a.rolpassword, ''), a.rolsuper, a.r
ORDER BY 1;` ORDER BY 1;`
func (c *Cluster) pgConnectionString() string { func (c *Cluster) pgConnectionString() string {
hostname := fmt.Sprintf("%s.%s.svc.cluster.local", c.Metadata.Name, c.Metadata.Namespace) hostname := fmt.Sprintf("%s.%s.svc.cluster.local", c.Name, c.Namespace)
username := c.systemUsers[constants.SuperuserKeyName].Name username := c.systemUsers[constants.SuperuserKeyName].Name
password := c.systemUsers[constants.SuperuserKeyName].Password password := c.systemUsers[constants.SuperuserKeyName].Password

View File

@ -3,7 +3,7 @@ package cluster
import ( import (
"fmt" "fmt"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
@ -12,8 +12,8 @@ import (
) )
func (c *Cluster) listPods() ([]v1.Pod, error) { func (c *Cluster) listPods() ([]v1.Pod, error) {
ns := c.Metadata.Namespace ns := c.Namespace
listOptions := meta_v1.ListOptions{ listOptions := metav1.ListOptions{
LabelSelector: c.labelsSet().String(), LabelSelector: c.labelsSet().String(),
} }
@ -35,11 +35,11 @@ func (c *Cluster) deletePods() error {
for _, obj := range pods { for _, obj := range pods {
podName := util.NameFromMeta(obj.ObjectMeta) podName := util.NameFromMeta(obj.ObjectMeta)
c.logger.Debugf("Deleting pod '%s'", podName) c.logger.Debugf("Deleting pod %q", podName)
if err := c.deletePod(podName); err != nil { if err := c.deletePod(podName); err != nil {
c.logger.Errorf("could not delete pod '%s': %s", podName, err) c.logger.Errorf("could not delete pod %q: %v", podName, err)
} else { } else {
c.logger.Infof("pod '%s' has been deleted", podName) c.logger.Infof("pod %q has been deleted", podName)
} }
} }
if len(pods) > 0 { if len(pods) > 0 {
@ -107,16 +107,16 @@ func (c *Cluster) recreatePod(pod v1.Pod) error {
if err := c.waitForPodLabel(ch); err != nil { if err := c.waitForPodLabel(ch); err != nil {
return err return err
} }
c.logger.Infof("pod '%s' is ready", podName) c.logger.Infof("pod %q is ready", podName)
return nil return nil
} }
func (c *Cluster) recreatePods() error { func (c *Cluster) recreatePods() error {
ls := c.labelsSet() ls := c.labelsSet()
namespace := c.Metadata.Namespace namespace := c.Namespace
listOptions := meta_v1.ListOptions{ listOptions := metav1.ListOptions{
LabelSelector: ls.String(), LabelSelector: ls.String(),
} }
@ -136,7 +136,7 @@ func (c *Cluster) recreatePods() error {
} }
if err := c.recreatePod(pod); err != nil { if err := c.recreatePod(pod); err != nil {
return fmt.Errorf("could not recreate replica pod '%s': %v", util.NameFromMeta(pod.ObjectMeta), err) return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
} }
} }
if masterPod.Name == "" { if masterPod.Name == "" {
@ -144,10 +144,10 @@ func (c *Cluster) recreatePods() error {
} else { } else {
//TODO: do manual failover //TODO: do manual failover
//TODO: specify master, leave new master empty //TODO: specify master, leave new master empty
c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) c.logger.Infof("Recreating master pod %q", util.NameFromMeta(masterPod.ObjectMeta))
if err := c.recreatePod(masterPod); err != nil { if err := c.recreatePod(masterPod); err != nil {
return fmt.Errorf("could not recreate master pod '%s': %v", util.NameFromMeta(masterPod.ObjectMeta), err) return fmt.Errorf("could not recreate master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err)
} }
} }

View File

@ -3,7 +3,7 @@ package cluster
import ( import (
"fmt" "fmt"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/apis/apps/v1beta1"
@ -16,8 +16,8 @@ import (
) )
func (c *Cluster) loadResources() error { func (c *Cluster) loadResources() error {
ns := c.Metadata.Namespace ns := c.Namespace
listOptions := meta_v1.ListOptions{ listOptions := metav1.ListOptions{
LabelSelector: c.labelsSet().String(), LabelSelector: c.labelsSet().String(),
} }
@ -61,7 +61,7 @@ func (c *Cluster) loadResources() error {
continue continue
} }
c.Secrets[secret.UID] = &secrets.Items[i] c.Secrets[secret.UID] = &secrets.Items[i]
c.logger.Debugf("secret loaded, uid: %s", secret.UID) c.logger.Debugf("secret loaded, uid: %q", secret.UID)
} }
statefulSets, err := c.KubeClient.StatefulSets(ns).List(listOptions) statefulSets, err := c.KubeClient.StatefulSets(ns).List(listOptions)
@ -80,19 +80,19 @@ func (c *Cluster) loadResources() error {
func (c *Cluster) listResources() error { func (c *Cluster) listResources() error {
if c.Statefulset != nil { if c.Statefulset != nil {
c.logger.Infof("Found statefulset: %s (uid: %s)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) c.logger.Infof("Found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID)
} }
for _, obj := range c.Secrets { for _, obj := range c.Secrets {
c.logger.Infof("Found secret: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) c.logger.Infof("Found secret: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
} }
if c.Endpoint != nil { if c.Endpoint != nil {
c.logger.Infof("Found endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) c.logger.Infof("Found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID)
} }
for role, service := range c.Service { for role, service := range c.Service {
c.logger.Infof("Found %s service: %s (uid: %s)", role, util.NameFromMeta(service.ObjectMeta), service.UID) c.logger.Infof("Found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID)
} }
pods, err := c.listPods() pods, err := c.listPods()
@ -101,7 +101,7 @@ func (c *Cluster) listResources() error {
} }
for _, obj := range pods { for _, obj := range pods {
c.logger.Infof("Found pod: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) c.logger.Infof("Found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
} }
pvcs, err := c.listPersistentVolumeClaims() pvcs, err := c.listPersistentVolumeClaims()
@ -110,7 +110,7 @@ func (c *Cluster) listResources() error {
} }
for _, obj := range pvcs { for _, obj := range pvcs {
c.logger.Infof("Found PVC: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) c.logger.Infof("Found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
} }
return nil return nil
@ -129,7 +129,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
return nil, err return nil, err
} }
c.Statefulset = statefulSet c.Statefulset = statefulSet
c.logger.Debugf("Created new statefulset '%s', uid: %s", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) c.logger.Debugf("Created new statefulset %q, uid: %q", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID)
return statefulSet, nil return statefulSet, nil
} }
@ -144,7 +144,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
patchData, err := specPatch(newStatefulSet.Spec) patchData, err := specPatch(newStatefulSet.Spec)
if err != nil { if err != nil {
return fmt.Errorf("could not form patch for the statefulset '%s': %v", statefulSetName, err) return fmt.Errorf("could not form patch for the statefulset %q: %v", statefulSetName, err)
} }
statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch(
@ -152,7 +152,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
types.MergePatchType, types.MergePatchType,
patchData, "") patchData, "")
if err != nil { if err != nil {
return fmt.Errorf("could not patch statefulset '%s': %v", statefulSetName, err) return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err)
} }
c.Statefulset = statefulSet c.Statefulset = statefulSet
@ -172,9 +172,9 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error
orphanDepencies := true orphanDepencies := true
oldStatefulset := c.Statefulset oldStatefulset := c.Statefulset
options := meta_v1.DeleteOptions{OrphanDependents: &orphanDepencies} options := metav1.DeleteOptions{OrphanDependents: &orphanDepencies}
if err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options); err != nil { if err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options); err != nil {
return fmt.Errorf("could not delete statefulset '%s': %v", statefulSetName, err) return fmt.Errorf("could not delete statefulset %q: %v", statefulSetName, err)
} }
// make sure we clear the stored statefulset status if the subsequent create fails. // make sure we clear the stored statefulset status if the subsequent create fails.
c.Statefulset = nil c.Statefulset = nil
@ -183,7 +183,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error
err := retryutil.Retry(constants.StatefulsetDeletionInterval, constants.StatefulsetDeletionTimeout, err := retryutil.Retry(constants.StatefulsetDeletionInterval, constants.StatefulsetDeletionTimeout,
func() (bool, error) { func() (bool, error) {
_, err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name, meta_v1.GetOptions{}) _, err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name, metav1.GetOptions{})
return err != nil, nil return err != nil, nil
}) })
@ -194,7 +194,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error
// create the new statefulset with the desired spec. It would take over the remaining pods. // create the new statefulset with the desired spec. It would take over the remaining pods.
createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet) createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet)
if err != nil { if err != nil {
return fmt.Errorf("could not create statefulset '%s': %v", statefulSetName, err) return fmt.Errorf("could not create statefulset %q: %v", statefulSetName, err)
} }
// check that all the previous replicas were picked up. // check that all the previous replicas were picked up.
if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas && if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas &&
@ -216,7 +216,7 @@ func (c *Cluster) deleteStatefulSet() error {
if err != nil { if err != nil {
return err return err
} }
c.logger.Infof("statefulset '%s' has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) c.logger.Infof("statefulset %q has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta))
c.Statefulset = nil c.Statefulset = nil
if err := c.deletePods(); err != nil { if err := c.deletePods(); err != nil {
@ -263,19 +263,19 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
if role == Master { if role == Master {
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of // for the master service we need to re-create the endpoint as well. Get the up-to-date version of
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) // the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name, meta_v1.GetOptions{}) currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name, metav1.GetOptions{})
if err != nil { if err != nil {
return fmt.Errorf("could not get current cluster endpoints: %v", err) return fmt.Errorf("could not get current cluster endpoints: %v", err)
} }
} }
err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions) err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions)
if err != nil { if err != nil {
return fmt.Errorf("could not delete service '%s': '%v'", serviceName, err) return fmt.Errorf("could not delete service %q: %v", serviceName, err)
} }
c.Endpoint = nil c.Endpoint = nil
svc, err := c.KubeClient.Services(newService.Namespace).Create(newService) svc, err := c.KubeClient.Services(newService.Namespace).Create(newService)
if err != nil { if err != nil {
return fmt.Errorf("could not create service '%s': '%v'", serviceName, err) return fmt.Errorf("could not create service %q: %v", serviceName, err)
} }
c.Service[role] = svc c.Service[role] = svc
if role == Master { if role == Master {
@ -283,7 +283,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets)
ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec)
if err != nil { if err != nil {
return fmt.Errorf("could not create endpoint '%s': '%v'", endpointName, err) return fmt.Errorf("could not create endpoint %q: %v", endpointName, err)
} }
c.Endpoint = ep c.Endpoint = ep
} }
@ -299,13 +299,13 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
[]byte(annotationsPatchData), "") []byte(annotationsPatchData), "")
if err != nil { if err != nil {
return fmt.Errorf("could not replace annotations for the service '%s': %v", serviceName, err) return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err)
} }
} }
patchData, err := specPatch(newService.Spec) patchData, err := specPatch(newService.Spec)
if err != nil { if err != nil {
return fmt.Errorf("could not form patch for the service '%s': %v", serviceName, err) return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
} }
svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch(
@ -313,7 +313,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
types.MergePatchType, types.MergePatchType,
patchData, "") patchData, "")
if err != nil { if err != nil {
return fmt.Errorf("could not patch service '%s': %v", serviceName, err) return fmt.Errorf("could not patch service %q: %v", serviceName, err)
} }
c.Service[role] = svc c.Service[role] = svc
@ -330,7 +330,7 @@ func (c *Cluster) deleteService(role PostgresRole) error {
if err != nil { if err != nil {
return err return err
} }
c.logger.Infof("%s service '%s' has been deleted", role, util.NameFromMeta(service.ObjectMeta)) c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta))
c.Service[role] = nil c.Service[role] = nil
return nil return nil
} }
@ -359,7 +359,7 @@ func (c *Cluster) deleteEndpoint() error {
if err != nil { if err != nil {
return err return err
} }
c.logger.Infof("endpoint '%s' has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta))
c.Endpoint = nil c.Endpoint = nil
return nil return nil
@ -372,11 +372,11 @@ func (c *Cluster) applySecrets() error {
secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec) secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec)
if k8sutil.ResourceAlreadyExists(err) { if k8sutil.ResourceAlreadyExists(err) {
var userMap map[string]spec.PgUser var userMap map[string]spec.PgUser
curSecret, err := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, meta_v1.GetOptions{}) curSecret, err := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{})
if err != nil { if err != nil {
return fmt.Errorf("could not get current secret: %v", err) return fmt.Errorf("could not get current secret: %v", err)
} }
c.logger.Debugf("secret '%s' already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta))
if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name { if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name {
secretUsername = constants.SuperuserKeyName secretUsername = constants.SuperuserKeyName
userMap = c.systemUsers userMap = c.systemUsers
@ -393,10 +393,10 @@ func (c *Cluster) applySecrets() error {
continue continue
} else { } else {
if err != nil { if err != nil {
return fmt.Errorf("could not create secret for user '%s': %v", secretUsername, err) return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err)
} }
c.Secrets[secret.UID] = secret c.Secrets[secret.UID] = secret
c.logger.Debugf("Created new secret '%s', uid: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID) c.logger.Debugf("Created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID)
} }
} }
@ -404,12 +404,12 @@ func (c *Cluster) applySecrets() error {
} }
func (c *Cluster) deleteSecret(secret *v1.Secret) error { func (c *Cluster) deleteSecret(secret *v1.Secret) error {
c.logger.Debugf("Deleting secret '%s'", util.NameFromMeta(secret.ObjectMeta)) c.logger.Debugf("Deleting secret %q", util.NameFromMeta(secret.ObjectMeta))
err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions)
if err != nil { if err != nil {
return err return err
} }
c.logger.Infof("secret '%s' has been deleted", util.NameFromMeta(secret.ObjectMeta)) c.logger.Infof("secret %q has been deleted", util.NameFromMeta(secret.ObjectMeta))
delete(c.Secrets, secret.UID) delete(c.Secrets, secret.UID)
return err return err

View File

@ -95,7 +95,7 @@ func (c *Cluster) syncService(role PostgresRole) error {
if err != nil { if err != nil {
return fmt.Errorf("could not create missing %s service: %v", role, err) return fmt.Errorf("could not create missing %s service: %v", role, err)
} }
c.logger.Infof("Created missing %s service '%s'", role, util.NameFromMeta(svc.ObjectMeta)) c.logger.Infof("Created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta))
return nil return nil
} }
@ -110,7 +110,7 @@ func (c *Cluster) syncService(role PostgresRole) error {
if err := c.updateService(role, desiredSvc); err != nil { if err := c.updateService(role, desiredSvc); err != nil {
return fmt.Errorf("could not update %s service to match desired state: %v", role, err) return fmt.Errorf("could not update %s service to match desired state: %v", role, err)
} }
c.logger.Infof("%s service '%s' is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
return nil return nil
} }
@ -122,7 +122,7 @@ func (c *Cluster) syncEndpoint() error {
if err != nil { if err != nil {
return fmt.Errorf("could not create missing endpoint: %v", err) return fmt.Errorf("could not create missing endpoint: %v", err)
} }
c.logger.Infof("Created missing endpoint '%s'", util.NameFromMeta(ep.ObjectMeta)) c.logger.Infof("Created missing endpoint %q", util.NameFromMeta(ep.ObjectMeta))
return nil return nil
} }
@ -151,7 +151,7 @@ func (c *Cluster) syncStatefulSet() error {
if err != nil { if err != nil {
return fmt.Errorf("cluster is not ready: %v", err) return fmt.Errorf("cluster is not ready: %v", err)
} }
c.logger.Infof("Created missing statefulset '%s'", util.NameFromMeta(ss.ObjectMeta)) c.logger.Infof("Created missing statefulset %q", util.NameFromMeta(ss.ObjectMeta))
if !rollUpdate { if !rollUpdate {
return nil return nil
} }

View File

@ -6,7 +6,7 @@ import (
"strings" "strings"
"time" "time"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/apps/v1beta1" "k8s.io/client-go/pkg/apis/apps/v1beta1"
@ -77,11 +77,11 @@ func metadataAnnotationsPatch(annotations map[string]string) string {
func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) {
if isUpdate { if isUpdate {
c.logger.Infof("statefulset '%s' has been changed", c.logger.Infof("statefulset %q has been changed",
util.NameFromMeta(old.ObjectMeta), util.NameFromMeta(old.ObjectMeta),
) )
} else { } else {
c.logger.Infof("statefulset '%s' is not in the desired state and needs to be updated", c.logger.Infof("statefulset %q is not in the desired state and needs to be updated",
util.NameFromMeta(old.ObjectMeta), util.NameFromMeta(old.ObjectMeta),
) )
} }
@ -89,18 +89,18 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate
if len(reasons) > 0 { if len(reasons) > 0 {
for _, reason := range reasons { for _, reason := range reasons {
c.logger.Infof("Reason: %s", reason) c.logger.Infof("Reason: %q", reason)
} }
} }
} }
func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) { func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) {
if isUpdate { if isUpdate {
c.logger.Infof("%s service '%s' has been changed", c.logger.Infof("%s service %q has been changed",
role, util.NameFromMeta(old.ObjectMeta), role, util.NameFromMeta(old.ObjectMeta),
) )
} else { } else {
c.logger.Infof("%s service '%s is not in the desired state and needs to be updated", c.logger.Infof("%s service %q is not in the desired state and needs to be updated",
role, util.NameFromMeta(old.ObjectMeta), role, util.NameFromMeta(old.ObjectMeta),
) )
} }
@ -124,10 +124,10 @@ func (c *Cluster) getOAuthToken() (string, error) {
// Temporary getting postgresql-operator secret from the NamespaceDefault // Temporary getting postgresql-operator secret from the NamespaceDefault
credentialsSecret, err := c.KubeClient. credentialsSecret, err := c.KubeClient.
Secrets(c.OpConfig.OAuthTokenSecretName.Namespace). Secrets(c.OpConfig.OAuthTokenSecretName.Namespace).
Get(c.OpConfig.OAuthTokenSecretName.Name, meta_v1.GetOptions{}) Get(c.OpConfig.OAuthTokenSecretName.Name, metav1.GetOptions{})
if err != nil { if err != nil {
c.logger.Debugf("Oauth token secret name: %s", c.OpConfig.OAuthTokenSecretName) c.logger.Debugf("Oauth token secret name: %q", c.OpConfig.OAuthTokenSecretName)
return "", fmt.Errorf("could not get credentials secret: %v", err) return "", fmt.Errorf("could not get credentials secret: %v", err)
} }
data := credentialsSecret.Data data := credentialsSecret.Data
@ -153,7 +153,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) {
return []string{}, fmt.Errorf("could not get oauth token: %v", err) return []string{}, fmt.Errorf("could not get oauth token: %v", err)
} }
teamInfo, err := c.TeamsAPIClient.TeamInfo(c.Spec.TeamID, token) teamInfo, err := c.teamsAPIClient.TeamInfo(c.Spec.TeamID, token)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get team info: %v", err) return nil, fmt.Errorf("could not get team info: %v", err)
} }
@ -194,10 +194,10 @@ func (c *Cluster) waitForPodDeletion(podEvents chan spec.PodEvent) error {
func (c *Cluster) waitStatefulsetReady() error { func (c *Cluster) waitStatefulsetReady() error {
return retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, return retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
func() (bool, error) { func() (bool, error) {
listOptions := meta_v1.ListOptions{ listOptions := metav1.ListOptions{
LabelSelector: c.labelsSet().String(), LabelSelector: c.labelsSet().String(),
} }
ss, err := c.KubeClient.StatefulSets(c.Metadata.Namespace).List(listOptions) ss, err := c.KubeClient.StatefulSets(c.Namespace).List(listOptions)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -212,17 +212,17 @@ func (c *Cluster) waitStatefulsetReady() error {
func (c *Cluster) waitPodLabelsReady() error { func (c *Cluster) waitPodLabelsReady() error {
ls := c.labelsSet() ls := c.labelsSet()
namespace := c.Metadata.Namespace namespace := c.Namespace
listOptions := meta_v1.ListOptions{ listOptions := metav1.ListOptions{
LabelSelector: ls.String(), LabelSelector: ls.String(),
} }
masterListOption := meta_v1.ListOptions{ masterListOption := metav1.ListOptions{
LabelSelector: labels.Merge(ls, labels.Set{ LabelSelector: labels.Merge(ls, labels.Set{
c.OpConfig.PodRoleLabel: constants.PodRoleMaster, c.OpConfig.PodRoleLabel: constants.PodRoleMaster,
}).String(), }).String(),
} }
replicaListOption := meta_v1.ListOptions{ replicaListOption := metav1.ListOptions{
LabelSelector: labels.Merge(ls, labels.Set{ LabelSelector: labels.Merge(ls, labels.Set{
c.OpConfig.PodRoleLabel: constants.PodRoleReplica, c.OpConfig.PodRoleLabel: constants.PodRoleReplica,
}).String(), }).String(),
@ -278,7 +278,7 @@ func (c *Cluster) labelsSet() labels.Set {
for k, v := range c.OpConfig.ClusterLabels { for k, v := range c.OpConfig.ClusterLabels {
lbls[k] = v lbls[k] = v
} }
lbls[c.OpConfig.ClusterNameLabel] = c.Metadata.Name lbls[c.OpConfig.ClusterNameLabel] = c.Name
return labels.Set(lbls) return labels.Set(lbls)
} }
@ -308,7 +308,7 @@ func (c *Cluster) credentialSecretName(username string) string {
// and must start and end with an alphanumeric character // and must start and end with an alphanumeric character
return fmt.Sprintf(constants.UserSecretTemplate, return fmt.Sprintf(constants.UserSecretTemplate,
strings.Replace(username, "_", "-", -1), strings.Replace(username, "_", "-", -1),
c.Metadata.Name) c.Name)
} }
func (c *Cluster) podSpiloRole(pod *v1.Pod) string { func (c *Cluster) podSpiloRole(pod *v1.Pod) string {

View File

@ -6,7 +6,7 @@ import (
"strings" "strings"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
@ -17,8 +17,8 @@ import (
) )
func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) { func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) {
ns := c.Metadata.Namespace ns := c.Namespace
listOptions := meta_v1.ListOptions{ listOptions := metav1.ListOptions{
LabelSelector: c.labelsSet().String(), LabelSelector: c.labelsSet().String(),
} }
@ -36,7 +36,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error {
return err return err
} }
for _, pvc := range pvcs { for _, pvc := range pvcs {
c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) c.logger.Debugf("Deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta))
if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil {
c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err)
} }
@ -63,14 +63,14 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) {
if lastDash > 0 && lastDash < len(pvc.Name)-1 { if lastDash > 0 && lastDash < len(pvc.Name)-1 {
pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]) pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %s to a number", pvc.Name) return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %q to a number", pvc.Name)
} }
if int32(pvcNumber) > lastPodIndex { if int32(pvcNumber) > lastPodIndex {
c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) c.logger.Debugf("Skipping persistent volume %q corresponding to a non-running pods", pvc.Name)
continue continue
} }
} }
pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName, meta_v1.GetOptions{}) pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get PersistentVolume: %v", err) return nil, fmt.Errorf("could not get PersistentVolume: %v", err)
} }
@ -119,22 +119,22 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume
if err != nil { if err != nil {
return err return err
} }
c.logger.Debugf("updating persistent volume %s to %d", pv.Name, newSize) c.logger.Debugf("updating persistent volume %q to %d", pv.Name, newSize)
if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil { if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil {
return fmt.Errorf("could not resize EBS volume %s: %v", awsVolumeId, err) return fmt.Errorf("could not resize EBS volume %q: %v", awsVolumeId, err)
} }
c.logger.Debugf("resizing the filesystem on the volume %s", pv.Name) c.logger.Debugf("resizing the filesystem on the volume %q", pv.Name)
podName := getPodNameFromPersistentVolume(pv) podName := getPodNameFromPersistentVolume(pv)
if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil {
return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) return fmt.Errorf("could not resize the filesystem on pod %q: %v", podName, err)
} }
c.logger.Debugf("filesystem resize successful on volume %s", pv.Name) c.logger.Debugf("filesystem resize successful on volume %q", pv.Name)
pv.Spec.Capacity[v1.ResourceStorage] = newQuantity pv.Spec.Capacity[v1.ResourceStorage] = newQuantity
c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) c.logger.Debugf("updating persistent volume definition for volume %q", pv.Name)
if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil {
return fmt.Errorf("could not update persistent volume: %s", err) return fmt.Errorf("could not update persistent volume: %q", err)
} }
c.logger.Debugf("successfully updated persistent volume %s", pv.Name) c.logger.Debugf("successfully updated persistent volume %q", pv.Name)
} }
} }
if len(pvs) > 0 && totalCompatible == 0 { if len(pvs) > 0 && totalCompatible == 0 {

View File

@ -5,7 +5,7 @@ import (
"sync" "sync"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"k8s.io/client-go/kubernetes" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -14,21 +14,26 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/teams" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
) )
type Config struct { type Config struct {
RestConfig *rest.Config RestConfig *rest.Config
KubeClient *kubernetes.Clientset
RestClient rest.Interface
TeamsAPIClient *teams.API
InfrastructureRoles map[string]spec.PgUser InfrastructureRoles map[string]spec.PgUser
NoDatabaseAccess bool
NoTeamsAPI bool
ConfigMapName spec.NamespacedName
Namespace string
} }
type Controller struct { type Controller struct {
Config config Config
opConfig *config.Config opConfig *config.Config
logger *logrus.Entry
logger *logrus.Entry
KubeClient k8sutil.KubernetesClient
RestClient rest.Interface // kubernetes API group REST client
clustersMu sync.RWMutex clustersMu sync.RWMutex
clusters map[spec.NamespacedName]*cluster.Cluster clusters map[spec.NamespacedName]*cluster.Cluster
@ -38,23 +43,16 @@ type Controller struct {
podInformer cache.SharedIndexInformer podInformer cache.SharedIndexInformer
podCh chan spec.PodEvent podCh chan spec.PodEvent
clusterEventQueues []*cache.FIFO clusterEventQueues []*cache.FIFO
lastClusterSyncTime int64 lastClusterSyncTime int64
} }
func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { func NewController(controllerConfig *Config) *Controller {
logger := logrus.New() logger := logrus.New()
if operatorConfig.DebugLogging {
logger.Level = logrus.DebugLevel
}
controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger)
return &Controller{ return &Controller{
Config: *controllerConfig, config: *controllerConfig,
opConfig: operatorConfig, opConfig: &config.Config{},
logger: logger.WithField("pkg", "controller"), logger: logger.WithField("pkg", "controller"),
clusters: make(map[spec.NamespacedName]*cluster.Cluster), clusters: make(map[spec.NamespacedName]*cluster.Cluster),
stopChs: make(map[spec.NamespacedName]chan struct{}), stopChs: make(map[spec.NamespacedName]chan struct{}),
@ -62,42 +60,76 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
} }
} }
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) initClients() {
defer wg.Done() client, err := k8sutil.ClientSet(c.config.RestConfig)
wg.Add(1) if err != nil {
c.logger.Fatalf("couldn't create client: %v", err)
}
c.KubeClient = k8sutil.NewFromKubernetesInterface(client)
c.initController() c.RestClient, err = k8sutil.KubernetesRestClient(*c.config.RestConfig)
if err != nil {
c.logger.Fatalf("couldn't create rest client: %v", err)
}
}
go c.runInformers(stopCh) func (c *Controller) initOperatorConfig() {
configMapData := make(map[string]string)
for i := range c.clusterEventQueues { if c.config.ConfigMapName != (spec.NamespacedName{}) {
go c.processClusterEventsQueue(i) configMap, err := c.KubeClient.ConfigMaps(c.config.ConfigMapName.Namespace).
Get(c.config.ConfigMapName.Name, metav1.GetOptions{})
if err != nil {
panic(err)
}
configMapData = configMap.Data
} else {
c.logger.Infoln("No ConfigMap specified. Loading default values")
} }
c.logger.Info("Started working in background") if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var
configMapData["namespace"] = c.config.Namespace
}
if c.config.NoDatabaseAccess {
configMapData["enable_database_access"] = "false"
}
if c.config.NoTeamsAPI {
configMapData["enable_teams_api"] = "false"
}
c.opConfig = config.NewFromMap(configMapData)
} }
func (c *Controller) initController() { func (c *Controller) initController() {
c.initClients()
c.initOperatorConfig()
c.logger.Infof("Config: %s", c.opConfig.MustMarshal())
if c.opConfig.DebugLogging {
c.logger.Level = logrus.DebugLevel
}
if err := c.createTPR(); err != nil { if err := c.createTPR(); err != nil {
c.logger.Fatalf("could not register ThirdPartyResource: %v", err) c.logger.Fatalf("could not register ThirdPartyResource: %v", err)
} }
if infraRoles, err := c.getInfrastructureRoles(); err != nil { if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil {
c.logger.Warningf("could not get infrastructure roles: %v", err) c.logger.Warningf("could not get infrastructure roles: %v", err)
} else { } else {
c.InfrastructureRoles = infraRoles c.config.InfrastructureRoles = infraRoles
} }
// Postgresqls // Postgresqls
clusterLw := &cache.ListWatch{
ListFunc: c.clusterListFunc,
WatchFunc: c.clusterWatchFunc,
}
c.postgresqlInformer = cache.NewSharedIndexInformer( c.postgresqlInformer = cache.NewSharedIndexInformer(
clusterLw, &cache.ListWatch{
ListFunc: c.clusterListFunc,
WatchFunc: c.clusterWatchFunc,
},
&spec.Postgresql{}, &spec.Postgresql{},
constants.QueueResyncPeriodTPR, constants.QueueResyncPeriodTPR,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) cache.Indexers{})
c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.postgresqlAdd, AddFunc: c.postgresqlAdd,
@ -136,6 +168,21 @@ func (c *Controller) initController() {
} }
} }
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
wg.Add(1)
c.initController()
go c.runInformers(stopCh)
for i := range c.clusterEventQueues {
go c.processClusterEventsQueue(i)
}
c.logger.Info("Started working in background")
}
func (c *Controller) runInformers(stopCh <-chan struct{}) { func (c *Controller) runInformers(stopCh <-chan struct{}) {
go c.postgresqlInformer.Run(stopCh) go c.postgresqlInformer.Run(stopCh)
go c.podInformer.Run(stopCh) go c.podInformer.Run(stopCh)

View File

@ -1,7 +1,7 @@
package controller package controller
import ( import (
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
@ -10,11 +10,11 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util"
) )
func (c *Controller) podListFunc(options meta_v1.ListOptions) (runtime.Object, error) { func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, error) {
var labelSelector string var labelSelector string
var fieldSelector string var fieldSelector string
opts := meta_v1.ListOptions{ opts := metav1.ListOptions{
LabelSelector: labelSelector, LabelSelector: labelSelector,
FieldSelector: fieldSelector, FieldSelector: fieldSelector,
Watch: options.Watch, Watch: options.Watch,
@ -22,14 +22,14 @@ func (c *Controller) podListFunc(options meta_v1.ListOptions) (runtime.Object, e
TimeoutSeconds: options.TimeoutSeconds, TimeoutSeconds: options.TimeoutSeconds,
} }
return c.KubeClient.CoreV1().Pods(c.opConfig.Namespace).List(opts) return c.KubeClient.Pods(c.opConfig.Namespace).List(opts)
} }
func (c *Controller) podWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, error) {
var labelSelector string var labelSelector string
var fieldSelector string var fieldSelector string
opts := meta_v1.ListOptions{ opts := metav1.ListOptions{
LabelSelector: labelSelector, LabelSelector: labelSelector,
FieldSelector: fieldSelector, FieldSelector: fieldSelector,
Watch: options.Watch, Watch: options.Watch,
@ -37,7 +37,7 @@ func (c *Controller) podWatchFunc(options meta_v1.ListOptions) (watch.Interface,
TimeoutSeconds: options.TimeoutSeconds, TimeoutSeconds: options.TimeoutSeconds,
} }
return c.KubeClient.CoreV1Client.Pods(c.opConfig.Namespace).Watch(opts) return c.KubeClient.Pods(c.opConfig.Namespace).Watch(opts)
} }
func (c *Controller) podAdd(obj interface{}) { func (c *Controller) podAdd(obj interface{}) {
@ -107,7 +107,7 @@ func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) {
c.clustersMu.RUnlock() c.clustersMu.RUnlock()
if ok { if ok {
c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, event.ClusterName)
cluster.ReceivePodEvent(event) cluster.ReceivePodEvent(event)
} }
case <-stopCh: case <-stopCh:

View File

@ -1,18 +1,16 @@
package controller package controller
import ( import (
"encoding/json"
"fmt" "fmt"
"reflect" "reflect"
"sync/atomic" "sync/atomic"
"time" "time"
"k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/cluster"
@ -27,52 +25,43 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
c.clusterListFunc(meta_v1.ListOptions{ResourceVersion: "0"}) c.clusterListFunc(metav1.ListOptions{ResourceVersion: "0"})
case <-stopCh: case <-stopCh:
return return
} }
} }
} }
func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Object, error) { func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) {
c.logger.Info("Getting list of currently running clusters") var list spec.PostgresqlList
var activeClustersCnt, failedClustersCnt int
req := c.RestClient.Get(). req := c.RestClient.
RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). Get().
VersionedParams(&options, scheme.ParameterCodec). Namespace(c.opConfig.Namespace).
FieldsSelectorParam(fields.Everything()) Resource(constants.ResourceName).
VersionedParams(&options, metav1.ParameterCodec)
object, err := req.Do().Get()
b, err := req.DoRaw()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get list of postgresql objects: %v", err) return nil, err
}
objList, err := meta.ExtractList(object)
if err != nil {
return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err)
} }
err = json.Unmarshal(b, &list)
if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) {
c.logger.Debugln("skipping resync of clusters") c.logger.Debugln("skipping resync of clusters")
return object, err return &list, err
} }
var activeClustersCnt, failedClustersCnt int for _, pg := range list.Items {
for _, obj := range objList {
pg, ok := obj.(*spec.Postgresql)
if !ok {
return nil, fmt.Errorf("could not cast object to postgresql")
}
if pg.Error != nil { if pg.Error != nil {
failedClustersCnt++ failedClustersCnt++
continue continue
} }
c.queueClusterEvent(nil, pg, spec.EventSync) c.queueClusterEvent(nil, &pg, spec.EventSync)
activeClustersCnt++ activeClustersCnt++
} }
if len(objList) > 0 { if len(list.Items) > 0 {
if failedClustersCnt > 0 && activeClustersCnt == 0 { if failedClustersCnt > 0 && activeClustersCnt == 0 {
c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt)
} else if failedClustersCnt == 0 && activeClustersCnt > 0 { } else if failedClustersCnt == 0 && activeClustersCnt > 0 {
@ -86,16 +75,48 @@ func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Objec
atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix())
return object, err return &list, err
} }
func (c *Controller) clusterWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { type tprDecoder struct {
dec *json.Decoder
close func() error
}
func (d *tprDecoder) Close() {
d.close()
}
func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var e struct {
Type watch.EventType
Object spec.Postgresql
}
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
}
return e.Type, &e.Object, nil
}
func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true options.Watch = true
req := c.RestClient.Get(). r, err := c.RestClient.
RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). Get().
VersionedParams(&options, scheme.ParameterCodec). Namespace(c.opConfig.Namespace).
FieldsSelectorParam(fields.Everything()) Resource(constants.ResourceName).
return req.Watch() VersionedParams(&options, metav1.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&tprDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
} }
func (c *Controller) processEvent(obj interface{}) error { func (c *Controller) processEvent(obj interface{}) error {
@ -108,9 +129,9 @@ func (c *Controller) processEvent(obj interface{}) error {
logger := c.logger.WithField("worker", event.WorkerID) logger := c.logger.WithField("worker", event.WorkerID)
if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { if event.EventType == spec.EventAdd || event.EventType == spec.EventSync {
clusterName = util.NameFromMeta(event.NewSpec.Metadata) clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta)
} else { } else {
clusterName = util.NameFromMeta(event.OldSpec.Metadata) clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta)
} }
c.clustersMu.RLock() c.clustersMu.RLock()
@ -120,14 +141,14 @@ func (c *Controller) processEvent(obj interface{}) error {
switch event.EventType { switch event.EventType {
case spec.EventAdd: case spec.EventAdd:
if clusterFound { if clusterFound {
logger.Debugf("Cluster '%s' already exists", clusterName) logger.Debugf("Cluster %q already exists", clusterName)
return nil return nil
} }
logger.Infof("Creation of the '%s' cluster started", clusterName) logger.Infof("Creation of the %q cluster started", clusterName)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger)
cl.Run(stopCh) cl.Run(stopCh)
c.clustersMu.Lock() c.clustersMu.Lock()
@ -142,31 +163,31 @@ func (c *Controller) processEvent(obj interface{}) error {
return nil return nil
} }
logger.Infof("Cluster '%s' has been created", clusterName) logger.Infof("Cluster %q has been created", clusterName)
case spec.EventUpdate: case spec.EventUpdate:
logger.Infof("Update of the '%s' cluster started", clusterName) logger.Infof("Update of the %q cluster started", clusterName)
if !clusterFound { if !clusterFound {
logger.Warnf("Cluster '%s' does not exist", clusterName) logger.Warnf("Cluster %q does not exist", clusterName)
return nil return nil
} }
if err := cl.Update(event.NewSpec); err != nil { if err := cl.Update(event.NewSpec); err != nil {
cl.Error = fmt.Errorf("could not update cluster: %s", err) cl.Error = fmt.Errorf("could not update cluster: %v", err)
logger.Errorf("%v", cl.Error) logger.Errorf("%v", cl.Error)
return nil return nil
} }
cl.Error = nil cl.Error = nil
logger.Infof("Cluster '%s' has been updated", clusterName) logger.Infof("Cluster %q has been updated", clusterName)
case spec.EventDelete: case spec.EventDelete:
logger.Infof("Deletion of the '%s' cluster started", clusterName) logger.Infof("Deletion of the %q cluster started", clusterName)
if !clusterFound { if !clusterFound {
logger.Errorf("Unknown cluster: %s", clusterName) logger.Errorf("Unknown cluster: %q", clusterName)
return nil return nil
} }
if err := cl.Delete(); err != nil { if err := cl.Delete(); err != nil {
logger.Errorf("could not delete cluster '%s': %s", clusterName, err) logger.Errorf("could not delete cluster %q: %v", clusterName, err)
return nil return nil
} }
close(c.stopChs[clusterName]) close(c.stopChs[clusterName])
@ -176,14 +197,14 @@ func (c *Controller) processEvent(obj interface{}) error {
delete(c.stopChs, clusterName) delete(c.stopChs, clusterName)
c.clustersMu.Unlock() c.clustersMu.Unlock()
logger.Infof("Cluster '%s' has been deleted", clusterName) logger.Infof("Cluster %q has been deleted", clusterName)
case spec.EventSync: case spec.EventSync:
logger.Infof("Syncing of the '%s' cluster started", clusterName) logger.Infof("Syncing of the %q cluster started", clusterName)
// no race condition because a cluster is always processed by single worker // no race condition because a cluster is always processed by single worker
if !clusterFound { if !clusterFound {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger)
cl.Run(stopCh) cl.Run(stopCh)
c.clustersMu.Lock() c.clustersMu.Lock()
@ -193,13 +214,13 @@ func (c *Controller) processEvent(obj interface{}) error {
} }
if err := cl.Sync(); err != nil { if err := cl.Sync(); err != nil {
cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err)
logger.Errorf("%v", cl.Error) logger.Errorf("%v", cl.Error)
return nil return nil
} }
cl.Error = nil cl.Error = nil
logger.Infof("Cluster '%s' has been synced", clusterName) logger.Infof("Cluster %q has been synced", clusterName)
} }
return nil return nil
@ -221,8 +242,8 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
) )
if old != nil { //update, delete if old != nil { //update, delete
uid = old.Metadata.GetUID() uid = old.GetUID()
clusterName = util.NameFromMeta(old.Metadata) clusterName = util.NameFromMeta(old.ObjectMeta)
if eventType == spec.EventUpdate && new.Error == nil && old.Error != nil { if eventType == spec.EventUpdate && new.Error == nil && old.Error != nil {
eventType = spec.EventSync eventType = spec.EventSync
clusterError = new.Error clusterError = new.Error
@ -230,13 +251,13 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
clusterError = old.Error clusterError = old.Error
} }
} else { //add, sync } else { //add, sync
uid = new.Metadata.GetUID() uid = new.GetUID()
clusterName = util.NameFromMeta(new.Metadata) clusterName = util.NameFromMeta(new.ObjectMeta)
clusterError = new.Error clusterError = new.Error
} }
if clusterError != nil && eventType != spec.EventDelete { if clusterError != nil && eventType != spec.EventDelete {
c.logger.Debugf("Skipping %s event for invalid cluster %s (reason: %v)", eventType, clusterName, clusterError) c.logger.Debugf("Skipping %q event for invalid cluster %q (reason: %v)", eventType, clusterName, clusterError)
return return
} }
@ -253,7 +274,7 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
c.logger.WithField("worker", workerID).Errorf("error when queueing cluster event: %v", clusterEvent) c.logger.WithField("worker", workerID).Errorf("error when queueing cluster event: %v", clusterEvent)
} }
c.logger.WithField("worker", workerID).Infof("%s of the '%s' cluster has been queued", eventType, clusterName) c.logger.WithField("worker", workerID).Infof("%q of the %q cluster has been queued", eventType, clusterName)
} }
func (c *Controller) postgresqlAdd(obj interface{}) { func (c *Controller) postgresqlAdd(obj interface{}) {
@ -276,7 +297,7 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
if !ok { if !ok {
c.logger.Errorf("could not cast to postgresql spec") c.logger.Errorf("could not cast to postgresql spec")
} }
if pgOld.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion { if pgOld.ResourceVersion == pgNew.ResourceVersion {
return return
} }
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {

View File

@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"hash/crc32" "hash/crc32"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1" extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1"
@ -17,15 +17,12 @@ import (
func (c *Controller) makeClusterConfig() cluster.Config { func (c *Controller) makeClusterConfig() cluster.Config {
infrastructureRoles := make(map[string]spec.PgUser) infrastructureRoles := make(map[string]spec.PgUser)
for k, v := range c.InfrastructureRoles { for k, v := range c.config.InfrastructureRoles {
infrastructureRoles[k] = v infrastructureRoles[k] = v
} }
return cluster.Config{ return cluster.Config{
KubeClient: c.KubeClient, RestConfig: c.config.RestConfig,
RestClient: c.RestClient,
RestConfig: c.RestConfig,
TeamsAPIClient: c.TeamsAPIClient,
OpConfig: config.Copy(c.opConfig), OpConfig: config.Copy(c.opConfig),
InfrastructureRoles: infrastructureRoles, InfrastructureRoles: infrastructureRoles,
} }
@ -33,7 +30,7 @@ func (c *Controller) makeClusterConfig() cluster.Config {
func thirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource { func thirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource {
return &extv1beta.ThirdPartyResource{ return &extv1beta.ThirdPartyResource{
ObjectMeta: meta_v1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
//ThirdPartyResources are cluster-wide //ThirdPartyResources are cluster-wide
Name: TPRName, Name: TPRName,
}, },
@ -49,33 +46,32 @@ func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 {
} }
func (c *Controller) createTPR() error { func (c *Controller) createTPR() error {
TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) tpr := thirdPartyResource(constants.TPRName)
tpr := thirdPartyResource(TPRName)
_, err := c.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) _, err := c.KubeClient.ThirdPartyResources().Create(tpr)
if err != nil { if err != nil {
if !k8sutil.ResourceAlreadyExists(err) { if !k8sutil.ResourceAlreadyExists(err) {
return err return err
} }
c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName) c.logger.Infof("ThirdPartyResource %q is already registered", constants.TPRName)
} else { } else {
c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName) c.logger.Infof("ThirdPartyResource %q' has been registered", constants.TPRName)
} }
return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace)
} }
func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, err error) { func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (result map[string]spec.PgUser, err error) {
if c.opConfig.InfrastructureRolesSecretName == (spec.NamespacedName{}) { if *rolesSecret == (spec.NamespacedName{}) {
// we don't have infrastructure roles defined, bail out // we don't have infrastructure roles defined, bail out
return nil, nil return nil, nil
} }
infraRolesSecret, err := c.KubeClient. infraRolesSecret, err := c.KubeClient.
Secrets(c.opConfig.InfrastructureRolesSecretName.Namespace). Secrets(rolesSecret.Namespace).
Get(c.opConfig.InfrastructureRolesSecretName.Name, meta_v1.GetOptions{}) Get(rolesSecret.Name, metav1.GetOptions{})
if err != nil { if err != nil {
c.logger.Debugf("Infrastructure roles secret name: %s", c.opConfig.InfrastructureRolesSecretName) c.logger.Debugf("Infrastructure roles secret name: %q", *rolesSecret)
return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err)
} }
@ -103,7 +99,7 @@ Users:
case "inrole": case "inrole":
t.MemberOf = append(t.MemberOf, s) t.MemberOf = append(t.MemberOf, s)
default: default:
c.logger.Warnf("Unknown key %s", p) c.logger.Warnf("Unknown key %q", p)
} }
} }
} }

154
pkg/controller/util_test.go Normal file
View File

@ -0,0 +1,154 @@
package controller
import (
"fmt"
"reflect"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/pkg/api/v1"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
)
const (
testInfrastructureRolesSecretName = "infrastructureroles-test"
)
type mockSecret struct {
v1core.SecretInterface
}
func (c *mockSecret) Get(name string, options metav1.GetOptions) (*v1.Secret, error) {
if name != testInfrastructureRolesSecretName {
return nil, fmt.Errorf("NotFound")
}
secret := &v1.Secret{}
secret.Name = mockController.opConfig.ClusterNameLabel
secret.Data = map[string][]byte{
"user1": []byte("testrole"),
"password1": []byte("testpassword"),
"inrole1": []byte("testinrole"),
}
return secret, nil
}
type MockSecretGetter struct {
}
func (c *MockSecretGetter) Secrets(namespace string) v1core.SecretInterface {
return &mockSecret{}
}
func newMockKubernetesClient() k8sutil.KubernetesClient {
return k8sutil.KubernetesClient{SecretsGetter: &MockSecretGetter{}}
}
func newMockController() *Controller {
controller := NewController(&Config{})
controller.opConfig.ClusterNameLabel = "cluster-name"
controller.opConfig.InfrastructureRolesSecretName =
spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName}
controller.opConfig.Workers = 4
controller.KubeClient = newMockKubernetesClient()
return controller
}
var mockController = newMockController()
func TestPodClusterName(t *testing.T) {
var testTable = []struct {
in *v1.Pod
expected spec.NamespacedName
}{
{
&v1.Pod{},
spec.NamespacedName{},
},
{
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: v1.NamespaceDefault,
Labels: map[string]string{
mockController.opConfig.ClusterNameLabel: "testcluster",
},
},
},
spec.NamespacedName{v1.NamespaceDefault, "testcluster"},
},
}
for _, test := range testTable {
resp := mockController.podClusterName(test.in)
if resp != test.expected {
t.Errorf("expected response %v does not match the actual %v", test.expected, resp)
}
}
}
func TestClusterWorkerID(t *testing.T) {
var testTable = []struct {
in spec.NamespacedName
expected uint32
}{
{
in: spec.NamespacedName{"foo", "bar"},
expected: 2,
},
{
in: spec.NamespacedName{"default", "testcluster"},
expected: 3,
},
}
for _, test := range testTable {
resp := mockController.clusterWorkerID(test.in)
if resp != test.expected {
t.Errorf("expected response %v does not match the actual %v", test.expected, resp)
}
}
}
func TestGetInfrastructureRoles(t *testing.T) {
var testTable = []struct {
secretName spec.NamespacedName
expectedRoles map[string]spec.PgUser
expectedError error
}{
{
spec.NamespacedName{},
nil,
nil,
},
{
spec.NamespacedName{v1.NamespaceDefault, "null"},
nil,
fmt.Errorf(`could not get infrastructure roles secret: NotFound`),
},
{
spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName},
map[string]spec.PgUser{
"testrole": {
"testrole",
"testpassword",
nil,
[]string{"testinrole"},
},
},
nil,
},
}
for _, test := range testTable {
roles, err := mockController.getInfrastructureRoles(&test.secretName)
if err != test.expectedError {
if err != nil && test.expectedError != nil && err.Error() == test.expectedError.Error() {
continue
}
t.Errorf("expected error '%v' does not match the actual error '%v'", test.expectedError, err)
}
if !reflect.DeepEqual(roles, test.expectedRoles) {
t.Errorf("expected roles output %v does not match the actual %v", test.expectedRoles, roles)
}
}
}

View File

@ -6,8 +6,7 @@ import (
"strings" "strings"
"time" "time"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
) )
// MaintenanceWindow describes the time window when the operator is allowed to do maintenance on a cluster. // MaintenanceWindow describes the time window when the operator is allowed to do maintenance on a cluster.
@ -70,8 +69,8 @@ const (
// Postgresql defines PostgreSQL Third Party (resource) Object. // Postgresql defines PostgreSQL Third Party (resource) Object.
type Postgresql struct { type Postgresql struct {
meta_v1.TypeMeta `json:",inline"` metav1.TypeMeta `json:",inline"`
Metadata meta_v1.ObjectMeta `json:"metadata"` metav1.ObjectMeta `json:"metadata"`
Spec PostgresSpec `json:"spec"` Spec PostgresSpec `json:"spec"`
Status PostgresStatus `json:"status,omitempty"` Status PostgresStatus `json:"status,omitempty"`
@ -87,7 +86,7 @@ type PostgresSpec struct {
TeamID string `json:"teamId"` TeamID string `json:"teamId"`
AllowedSourceRanges []string `json:"allowedSourceRanges"` AllowedSourceRanges []string `json:"allowedSourceRanges"`
// EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omited from the manifest // EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omitted from the manifest
UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"` UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"`
ReplicaLoadBalancer bool `json:"replicaLoadBalancer,omitempty"` ReplicaLoadBalancer bool `json:"replicaLoadBalancer,omitempty"`
NumberOfInstances int32 `json:"numberOfInstances"` NumberOfInstances int32 `json:"numberOfInstances"`
@ -98,8 +97,8 @@ type PostgresSpec struct {
// PostgresqlList defines a list of PostgreSQL clusters. // PostgresqlList defines a list of PostgreSQL clusters.
type PostgresqlList struct { type PostgresqlList struct {
meta_v1.TypeMeta `json:",inline"` metav1.TypeMeta `json:",inline"`
Metadata meta_v1.ListMeta `json:"metadata"` metav1.ListMeta `json:"metadata"`
Items []Postgresql `json:"items"` Items []Postgresql `json:"items"`
} }
@ -190,25 +189,6 @@ func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error {
return nil return nil
} }
// GetObject implements Object interface for PostgreSQL TPR spec object.
func (p *Postgresql) GetObjectKind() schema.ObjectKind {
return &p.TypeMeta
}
// GetObjectMeta implements ObjectMetaAccessor interface for PostgreSQL TPR spec object.
func (p *Postgresql) GetObjectMeta() meta_v1.Object {
return &p.Metadata
}
func (pl *PostgresqlList) GetObjectKind() schema.ObjectKind {
return &pl.TypeMeta
}
// GetListMeta implements ListMetaAccessor interface for PostgreSQL TPR List spec object.
func (pl *PostgresqlList) GetListMeta() meta_v1.List {
return &pl.Metadata
}
func extractClusterName(clusterName string, teamName string) (string, error) { func extractClusterName(clusterName string, teamName string) (string, error) {
teamNameLen := len(teamName) teamNameLen := len(teamName)
if len(clusterName) < teamNameLen+2 { if len(clusterName) < teamNameLen+2 {
@ -226,10 +206,6 @@ func extractClusterName(clusterName string, teamName string) (string, error) {
return clusterName[teamNameLen+1:], nil return clusterName[teamNameLen+1:], nil
} }
// The code below is used only to work around a known problem with third-party
// resources and ugorji. If/when these issues are resolved, the code below
// should no longer be required.
//
type postgresqlListCopy PostgresqlList type postgresqlListCopy PostgresqlList
type postgresqlCopy Postgresql type postgresqlCopy Postgresql
@ -239,7 +215,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error {
err := json.Unmarshal(data, &tmp) err := json.Unmarshal(data, &tmp)
if err != nil { if err != nil {
metaErr := json.Unmarshal(data, &tmp.Metadata) metaErr := json.Unmarshal(data, &tmp.ObjectMeta)
if metaErr != nil { if metaErr != nil {
return err return err
} }
@ -253,7 +229,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error {
} }
tmp2 := Postgresql(tmp) tmp2 := Postgresql(tmp)
clusterName, err := extractClusterName(tmp2.Metadata.Name, tmp2.Spec.TeamID) clusterName, err := extractClusterName(tmp2.ObjectMeta.Name, tmp2.Spec.TeamID)
if err == nil { if err == nil {
tmp2.Spec.ClusterName = clusterName tmp2.Spec.ClusterName = clusterName
} else { } else {

View File

@ -8,7 +8,7 @@ import (
"testing" "testing"
"time" "time"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
var parseTimeTests = []struct { var parseTimeTests = []struct {
@ -103,11 +103,11 @@ var unmarshalCluster = []struct {
"kind": "Postgresql","apiVersion": "acid.zalan.do/v1", "kind": "Postgresql","apiVersion": "acid.zalan.do/v1",
"metadata": {"name": "acid-testcluster1"}, "spec": {"teamId": 100}}`), "metadata": {"name": "acid-testcluster1"}, "spec": {"teamId": 100}}`),
Postgresql{ Postgresql{
TypeMeta: meta_v1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: "Postgresql", Kind: "Postgresql",
APIVersion: "acid.zalan.do/v1", APIVersion: "acid.zalan.do/v1",
}, },
Metadata: meta_v1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "acid-testcluster1", Name: "acid-testcluster1",
}, },
Status: ClusterStatusInvalid, Status: ClusterStatusInvalid,
@ -183,11 +183,11 @@ var unmarshalCluster = []struct {
} }
}`), }`),
Postgresql{ Postgresql{
TypeMeta: meta_v1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: "Postgresql", Kind: "Postgresql",
APIVersion: "acid.zalan.do/v1", APIVersion: "acid.zalan.do/v1",
}, },
Metadata: meta_v1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "acid-testcluster1", Name: "acid-testcluster1",
}, },
Spec: PostgresSpec{ Spec: PostgresSpec{
@ -249,11 +249,11 @@ var unmarshalCluster = []struct {
{ {
[]byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1","metadata": {"name": "teapot-testcluster1"}, "spec": {"teamId": "acid"}}`), []byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1","metadata": {"name": "teapot-testcluster1"}, "spec": {"teamId": "acid"}}`),
Postgresql{ Postgresql{
TypeMeta: meta_v1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: "Postgresql", Kind: "Postgresql",
APIVersion: "acid.zalan.do/v1", APIVersion: "acid.zalan.do/v1",
}, },
Metadata: meta_v1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "teapot-testcluster1", Name: "teapot-testcluster1",
}, },
Spec: PostgresSpec{TeamID: "acid"}, Spec: PostgresSpec{TeamID: "acid"},
@ -277,16 +277,16 @@ var postgresqlList = []struct {
}{ }{
{[]byte(`{"apiVersion":"v1","items":[{"apiVersion":"acid.zalan.do/v1","kind":"Postgresql","metadata":{"labels":{"team":"acid"},"name":"acid-testcluster42","namespace":"default","resourceVersion":"30446957","selfLink":"/apis/acid.zalan.do/v1/namespaces/default/postgresqls/acid-testcluster42","uid":"857cd208-33dc-11e7-b20a-0699041e4b03"},"spec":{"allowedSourceRanges":["185.85.220.0/22"],"numberOfInstances":1,"postgresql":{"version":"9.6"},"teamId":"acid","volume":{"size":"10Gi"}},"status":"Running"}],"kind":"List","metadata":{},"resourceVersion":"","selfLink":""}`), {[]byte(`{"apiVersion":"v1","items":[{"apiVersion":"acid.zalan.do/v1","kind":"Postgresql","metadata":{"labels":{"team":"acid"},"name":"acid-testcluster42","namespace":"default","resourceVersion":"30446957","selfLink":"/apis/acid.zalan.do/v1/namespaces/default/postgresqls/acid-testcluster42","uid":"857cd208-33dc-11e7-b20a-0699041e4b03"},"spec":{"allowedSourceRanges":["185.85.220.0/22"],"numberOfInstances":1,"postgresql":{"version":"9.6"},"teamId":"acid","volume":{"size":"10Gi"}},"status":"Running"}],"kind":"List","metadata":{},"resourceVersion":"","selfLink":""}`),
PostgresqlList{ PostgresqlList{
TypeMeta: meta_v1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: "List", Kind: "List",
APIVersion: "v1", APIVersion: "v1",
}, },
Items: []Postgresql{{ Items: []Postgresql{{
TypeMeta: meta_v1.TypeMeta{ TypeMeta: metav1.TypeMeta{
Kind: "Postgresql", Kind: "Postgresql",
APIVersion: "acid.zalan.do/v1", APIVersion: "acid.zalan.do/v1",
}, },
Metadata: meta_v1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "acid-testcluster42", Name: "acid-testcluster42",
Namespace: "default", Namespace: "default",
Labels: map[string]string{"team": "acid"}, Labels: map[string]string{"team": "acid"},
@ -362,7 +362,7 @@ func TestClusterName(t *testing.T) {
continue continue
} }
if name != tt.clusterName { if name != tt.clusterName {
t.Errorf("Expected cluserName: %s, got: %s", tt.clusterName, name) t.Errorf("Expected cluserName: %q, got: %q", tt.clusterName, name)
} }
} }
} }
@ -399,7 +399,7 @@ func TestMarshalMaintenanceWindow(t *testing.T) {
} }
if !bytes.Equal(s, tt.in) { if !bytes.Equal(s, tt.in) {
t.Errorf("Expected Marshal: %s, got: %s", string(tt.in), string(s)) t.Errorf("Expected Marshal: %q, got: %q", string(tt.in), string(s))
} }
} }
} }
@ -434,7 +434,7 @@ func TestMarshal(t *testing.T) {
continue continue
} }
if !bytes.Equal(m, tt.marshal) { if !bytes.Equal(m, tt.marshal) {
t.Errorf("Marshal Postgresql expected: %s, got: %s", string(tt.marshal), string(m)) t.Errorf("Marshal Postgresql expected: %q, got: %q", string(tt.marshal), string(m))
} }
} }
} }
@ -445,8 +445,8 @@ func TestPostgresMeta(t *testing.T) {
t.Errorf("GetObjectKindMeta expected: %v, got: %v", tt.out.TypeMeta, a) t.Errorf("GetObjectKindMeta expected: %v, got: %v", tt.out.TypeMeta, a)
} }
if a := tt.out.GetObjectMeta(); reflect.DeepEqual(a, tt.out.Metadata) { if a := tt.out.GetObjectMeta(); reflect.DeepEqual(a, tt.out.ObjectMeta) {
t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.Metadata, a) t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.ObjectMeta, a)
} }
} }
} }
@ -475,8 +475,8 @@ func TestPostgresListMeta(t *testing.T) {
t.Errorf("GetObjectKindMeta expected: %v, got: %v", tt.out.TypeMeta, a) t.Errorf("GetObjectKindMeta expected: %v, got: %v", tt.out.TypeMeta, a)
} }
if a := tt.out.GetListMeta(); reflect.DeepEqual(a, tt.out.Metadata) { if a := tt.out.GetListMeta(); reflect.DeepEqual(a, tt.out.ListMeta) {
t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.Metadata, a) t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.ListMeta, a)
} }
return return

View File

@ -1,11 +1,10 @@
package spec package spec
import ( import (
"database/sql"
"fmt" "fmt"
"strings" "strings"
"database/sql"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
) )

View File

@ -49,7 +49,7 @@ func TestNamespacedNameError(t *testing.T) {
var actual NamespacedName var actual NamespacedName
err := actual.Decode(tt) err := actual.Decode(tt)
if err == nil { if err == nil {
t.Errorf("Error expected for '%s', got: %#v", tt, actual) t.Errorf("Error expected for %q, got: %#v", tt, actual)
} }
} }
} }

View File

@ -4,10 +4,7 @@ import "time"
// General kubernetes-related constants // General kubernetes-related constants
const ( const (
ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace K8sAPIPath = "/apis"
WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace
K8sVersion = "v1"
K8sAPIPath = "/api"
StatefulsetDeletionInterval = 1 * time.Second StatefulsetDeletionInterval = 1 * time.Second
StatefulsetDeletionTimeout = 30 * time.Second StatefulsetDeletionTimeout = 30 * time.Second

View File

@ -2,7 +2,7 @@ package constants
const ( const (
PasswordLength = 64 PasswordLength = 64
UserSecretTemplate = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName UserSecretTemplate = "%s.%s.credentials." + TPRKind + "." + TPRGroup // Username, ClusterName
SuperuserKeyName = "superuser" SuperuserKeyName = "superuser"
ReplicationUserKeyName = "replication" ReplicationUserKeyName = "replication"
RoleFlagSuperuser = "SUPERUSER" RoleFlagSuperuser = "SUPERUSER"

View File

@ -2,9 +2,10 @@ package constants
// Different properties of the PostgreSQL Third Party Resources // Different properties of the PostgreSQL Third Party Resources
const ( const (
TPRName = "postgresql" TPRKind = "postgresql"
TPRVendor = "acid.zalan.do" TPRGroup = "acid.zalan.do"
TPRDescription = "Managed PostgreSQL clusters" TPRDescription = "Managed PostgreSQL clusters"
TPRApiVersion = "v1" TPRApiVersion = "v1"
ResourceName = TPRName + "s" TPRName = TPRKind + "." + TPRGroup
ResourceName = TPRKind + "s"
) )

View File

@ -37,5 +37,5 @@ func (c *Ext234Resize) ResizeFilesystem(deviceName string, commandExecutor func(
(strings.Contains(out, "on-line resizing required") && ext2fsSuccessRegexp.MatchString(out)) { (strings.Contains(out, "on-line resizing required") && ext2fsSuccessRegexp.MatchString(out)) {
return nil return nil
} }
return fmt.Errorf("unrecognized output: %s, assuming error", out) return fmt.Errorf("unrecognized output: %q, assuming error", out)
} }

View File

@ -1,32 +1,60 @@
package k8sutil package k8sutil
import ( import (
"fmt"
"time" "time"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
extensions "k8s.io/client-go/kubernetes/typed/extensions/v1beta1"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
) )
type KubernetesClient struct {
v1core.SecretsGetter
v1core.ServicesGetter
v1core.EndpointsGetter
v1core.PodsGetter
v1core.PersistentVolumesGetter
v1core.PersistentVolumeClaimsGetter
v1core.ConfigMapsGetter
v1beta1.StatefulSetsGetter
extensions.ThirdPartyResourcesGetter
RESTClient rest.Interface
}
func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) {
c = KubernetesClient{}
c.PodsGetter = src.CoreV1()
c.ServicesGetter = src.CoreV1()
c.EndpointsGetter = src.CoreV1()
c.SecretsGetter = src.CoreV1()
c.ConfigMapsGetter = src.CoreV1()
c.PersistentVolumeClaimsGetter = src.CoreV1()
c.PersistentVolumesGetter = src.CoreV1()
c.StatefulSetsGetter = src.AppsV1beta1()
c.ThirdPartyResourcesGetter = src.ExtensionsV1beta1()
c.RESTClient = src.CoreV1().RESTClient()
return
}
func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) {
if outOfCluster { if outOfCluster {
return clientcmd.BuildConfigFromFlags("", kubeConfig) return clientcmd.BuildConfigFromFlags("", kubeConfig)
} }
return rest.InClusterConfig() return rest.InClusterConfig()
} }
func KubernetesClient(config *rest.Config) (client *kubernetes.Clientset, err error) { func ClientSet(config *rest.Config) (client *kubernetes.Clientset, err error) {
return kubernetes.NewForConfig(config) return kubernetes.NewForConfig(config)
} }
@ -38,35 +66,24 @@ func ResourceNotFound(err error) bool {
return apierrors.IsNotFound(err) return apierrors.IsNotFound(err)
} }
func KubernetesRestClient(c *rest.Config) (rest.Interface, error) { func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) {
c.GroupVersion = &schema.GroupVersion{Version: constants.K8sVersion} cfg.GroupVersion = &schema.GroupVersion{
c.APIPath = constants.K8sAPIPath Group: constants.TPRGroup,
c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} Version: constants.TPRApiVersion,
schemeBuilder := runtime.NewSchemeBuilder(
func(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(
schema.GroupVersion{
Group: constants.TPRVendor,
Version: constants.TPRApiVersion,
},
&spec.Postgresql{},
&spec.PostgresqlList{},
&meta_v1.ListOptions{},
&meta_v1.DeleteOptions{},
)
return nil
})
if err := schemeBuilder.AddToScheme(scheme.Scheme); err != nil {
return nil, fmt.Errorf("could not apply functions to register PostgreSQL TPR type: %v", err)
} }
cfg.APIPath = constants.K8sAPIPath
cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
return rest.RESTClientFor(c) return rest.RESTClientFor(&cfg)
} }
func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error {
return retryutil.Retry(interval, timeout, func() (bool, error) { return retryutil.Retry(interval, timeout, func() (bool, error) {
_, err := restclient.Get().RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, ns)).DoRaw() _, err := restclient.
Get().
Namespace(ns).
Resource(constants.ResourceName).
DoRaw()
if err != nil { if err != nil {
if ResourceNotFound(err) { // not set up yet. wait more. if ResourceNotFound(err) { // not set up yet. wait more.
return false, nil return false, nil

View File

@ -66,11 +66,11 @@ func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserReque
switch r.Kind { switch r.Kind {
case spec.PGSyncUserAdd: case spec.PGSyncUserAdd:
if err := s.createPgUser(r.User, db); err != nil { if err := s.createPgUser(r.User, db); err != nil {
return fmt.Errorf("could not create user '%s': %v", r.User.Name, err) return fmt.Errorf("could not create user %q: %v", r.User.Name, err)
} }
case spec.PGsyncUserAlter: case spec.PGsyncUserAlter:
if err := s.alterPgUser(r.User, db); err != nil { if err := s.alterPgUser(r.User, db); err != nil {
return fmt.Errorf("could not alter user '%s': %v", r.User.Name, err) return fmt.Errorf("could not alter user %q: %v", r.User.Name, err)
} }
default: default:
return fmt.Errorf("unrecognized operation: %v", r.Kind) return fmt.Errorf("unrecognized operation: %v", r.Kind)
@ -100,7 +100,7 @@ func (s DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) (err
_, err = db.Query(query) // TODO: Try several times _, err = db.Query(query) // TODO: Try several times
if err != nil { if err != nil {
err = fmt.Errorf("dB error: %s, query: %v", err, query) err = fmt.Errorf("dB error: %v, query: %q", err, query)
return return
} }
@ -122,7 +122,7 @@ func (s DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) (err
_, err = db.Query(query) // TODO: Try several times _, err = db.Query(query) // TODO: Try several times
if err != nil { if err != nil {
err = fmt.Errorf("dB error: %s query %v", err, query) err = fmt.Errorf("dB error: %v query %q", err, query)
return return
} }

View File

@ -8,7 +8,7 @@ import (
"time" "time"
"github.com/motomux/pretty" "github.com/motomux/pretty"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
) )
@ -34,7 +34,7 @@ func RandomPassword(n int) string {
} }
// NameFromMeta converts a metadata object to the NamespacedName name representation. // NameFromMeta converts a metadata object to the NamespacedName name representation.
func NameFromMeta(meta meta_v1.ObjectMeta) spec.NamespacedName { func NameFromMeta(meta metav1.ObjectMeta) spec.NamespacedName {
return spec.NamespacedName{ return spec.NamespacedName{
Namespace: meta.Namespace, Namespace: meta.Namespace,
Name: meta.Name, Name: meta.Name,

View File

@ -4,7 +4,7 @@ import (
"reflect" "reflect"
"testing" "testing"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
) )
@ -53,7 +53,7 @@ func TestRandomPassword(t *testing.T) {
} }
func TestNameFromMeta(t *testing.T) { func TestNameFromMeta(t *testing.T) {
meta := meta_v1.ObjectMeta{ meta := metav1.ObjectMeta{
Name: "testcluster", Name: "testcluster",
Namespace: "default", Namespace: "default",
} }
@ -73,7 +73,7 @@ func TestPGUserPassword(t *testing.T) {
for _, tt := range pgUsers { for _, tt := range pgUsers {
pwd := PGUserPassword(tt.in) pwd := PGUserPassword(tt.in)
if pwd != tt.out { if pwd != tt.out {
t.Errorf("PgUserPassword expected: %s, got: %s", tt.out, pwd) t.Errorf("PgUserPassword expected: %q, got: %q", tt.out, pwd)
} }
} }
} }
@ -81,7 +81,7 @@ func TestPGUserPassword(t *testing.T) {
func TestPrettyDiff(t *testing.T) { func TestPrettyDiff(t *testing.T) {
for _, tt := range prettyDiffTest { for _, tt := range prettyDiffTest {
if actual := PrettyDiff(tt.inA, tt.inB); actual != tt.out { if actual := PrettyDiff(tt.inA, tt.inB); actual != tt.out {
t.Errorf("PrettyDiff expected: %s, got: %s", tt.out, actual) t.Errorf("PrettyDiff expected: %q, got: %q", tt.out, actual)
} }
} }
} }

View File

@ -42,11 +42,11 @@ func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool
func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) {
volumeID := pv.Spec.AWSElasticBlockStore.VolumeID volumeID := pv.Spec.AWSElasticBlockStore.VolumeID
if volumeID == "" { if volumeID == "" {
return "", fmt.Errorf("volume id is empty for volume %s", pv.Name) return "", fmt.Errorf("volume id is empty for volume %q", pv.Name)
} }
idx := strings.LastIndex(volumeID, constants.EBSVolumeIDStart) + 1 idx := strings.LastIndex(volumeID, constants.EBSVolumeIDStart) + 1
if idx == 0 { if idx == 0 {
return "", fmt.Errorf("malfored EBS volume id %s", volumeID) return "", fmt.Errorf("malfored EBS volume id %q", volumeID)
} }
return volumeID[idx:], nil return volumeID[idx:], nil
} }
@ -60,7 +60,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error {
} }
vol := volumeOutput.Volumes[0] vol := volumeOutput.Volumes[0]
if *vol.VolumeId != volumeId { if *vol.VolumeId != volumeId {
return fmt.Errorf("describe volume %s returned information about a non-matching volume %s", volumeId, *vol.VolumeId) return fmt.Errorf("describe volume %q returned information about a non-matching volume %q", volumeId, *vol.VolumeId)
} }
if *vol.Size == newSize { if *vol.Size == newSize {
// nothing to do // nothing to do
@ -74,7 +74,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error {
state := *output.VolumeModification.ModificationState state := *output.VolumeModification.ModificationState
if state == constants.EBSVolumeStateFailed { if state == constants.EBSVolumeStateFailed {
return fmt.Errorf("could not modify persistent volume %s: modification state failed", volumeId) return fmt.Errorf("could not modify persistent volume %q: modification state failed", volumeId)
} }
if state == "" { if state == "" {
return fmt.Errorf("received empty modification status") return fmt.Errorf("received empty modification status")
@ -91,10 +91,10 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error {
return false, fmt.Errorf("could not describe volume modification: %v", err) return false, fmt.Errorf("could not describe volume modification: %v", err)
} }
if len(out.VolumesModifications) != 1 { if len(out.VolumesModifications) != 1 {
return false, fmt.Errorf("describe volume modification didn't return one record for volume \"%s\"", volumeId) return false, fmt.Errorf("describe volume modification didn't return one record for volume %q", volumeId)
} }
if *out.VolumesModifications[0].VolumeId != volumeId { if *out.VolumesModifications[0].VolumeId != volumeId {
return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"", return false, fmt.Errorf("non-matching volume id when describing modifications: %q is different from %q",
*out.VolumesModifications[0].VolumeId, volumeId) *out.VolumesModifications[0].VolumeId, volumeId)
} }
return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil