From 4455f1b639ecda689cf9777e8345a9db350a8376 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Mon, 24 Jul 2017 16:56:46 +0200 Subject: [PATCH 1/2] Feature/unit tests (#53) - Avoid relying on Clientset structure to call Kubernetes API functions. While Clientset is a convinient "catch-all" abstraction for calling REST API related to different Kubernetes objects, it's impossible to mock. Replacing it wih the kubernetes.Interface would be quite straightforward, but would require an exra level of mocked interfaces, because of the versioning. Instead, a new interface is defined, which contains only the objects we need of the pre-defined versions. - Move KubernetesClient to k8sutil package. - Add more tests. --- cmd/main.go | 6 +- pkg/cluster/cluster.go | 3 +- pkg/controller/controller.go | 9 +- pkg/controller/pod.go | 4 +- pkg/controller/util.go | 12 +-- pkg/controller/util_test.go | 154 +++++++++++++++++++++++++++++++++++ pkg/spec/types.go | 1 - pkg/util/k8sutil/k8sutil.go | 31 ++++++- 8 files changed, 201 insertions(+), 19 deletions(-) create mode 100644 pkg/controller/util_test.go diff --git a/cmd/main.go b/cmd/main.go index 2d53f4b82..c38eb2e91 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -48,7 +48,7 @@ func ControllerConfig() *controller.Config { log.Fatalf("Can't get REST config: %s", err) } - client, err := k8sutil.KubernetesClient(restConfig) + client, err := k8sutil.ClientSet(restConfig) if err != nil { log.Fatalf("Can't create client: %s", err) } @@ -60,7 +60,7 @@ func ControllerConfig() *controller.Config { return &controller.Config{ RestConfig: restConfig, - KubeClient: client, + KubeClient: k8sutil.NewFromKubernetesInterface(client), RestClient: restClient, } } @@ -101,7 +101,7 @@ func main() { log.Printf("Config: %s", cfg.MustMarshal()) - c := controller.New(controllerConfig, cfg) + c := controller.NewController(controllerConfig, cfg) c.Run(stop, wg) sig := <-sigs diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index e62f0b16e..8683803b9 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -11,7 +11,6 @@ import ( "sync" "github.com/Sirupsen/logrus" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" @@ -36,7 +35,7 @@ var ( // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. type Config struct { - KubeClient *kubernetes.Clientset //TODO: move clients to the better place? + KubeClient k8sutil.KubernetesClient RestClient *rest.RESTClient RestConfig *rest.Config TeamsAPIClient *teams.API diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b0e0962bd..dc62504d3 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/Sirupsen/logrus" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -14,12 +13,13 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/teams" ) type Config struct { RestConfig *rest.Config - KubeClient *kubernetes.Clientset + KubeClient k8sutil.KubernetesClient RestClient *rest.RESTClient TeamsAPIClient *teams.API InfrastructureRoles map[string]spec.PgUser @@ -27,6 +27,7 @@ type Config struct { type Controller struct { Config + opConfig *config.Config logger *logrus.Entry @@ -43,7 +44,7 @@ type Controller struct { lastClusterSyncTime int64 } -func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { +func NewController(controllerConfig *Config, operatorConfig *config.Config) *Controller { logger := logrus.New() if operatorConfig.DebugLogging { @@ -82,7 +83,7 @@ func (c *Controller) initController() { c.logger.Fatalf("could not register ThirdPartyResource: %v", err) } - if infraRoles, err := c.getInfrastructureRoles(); err != nil { + if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil { c.logger.Warningf("could not get infrastructure roles: %v", err) } else { c.InfrastructureRoles = infraRoles diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 3b31d439f..c3fb2a5e8 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -29,7 +29,7 @@ func (c *Controller) podListFunc(options api.ListOptions) (runtime.Object, error TimeoutSeconds: options.TimeoutSeconds, } - return c.KubeClient.CoreV1().Pods(c.opConfig.Namespace).List(opts) + return c.KubeClient.Pods(c.opConfig.Namespace).List(opts) } func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, error) { @@ -52,7 +52,7 @@ func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, err TimeoutSeconds: options.TimeoutSeconds, } - return c.KubeClient.CoreV1Client.Pods(c.opConfig.Namespace).Watch(opts) + return c.KubeClient.Pods(c.opConfig.Namespace).Watch(opts) } func (c *Controller) podAdd(obj interface{}) { diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 76b33168c..d7ea55e3a 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -51,7 +51,7 @@ func (c *Controller) createTPR() error { TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) tpr := thirdPartyResource(TPRName) - _, err := c.KubeClient.ExtensionsV1beta1().ThirdPartyResources().Create(tpr) + _, err := c.KubeClient.ThirdPartyResources().Create(tpr) if err != nil { if !k8sutil.ResourceAlreadyExists(err) { return err @@ -64,17 +64,17 @@ func (c *Controller) createTPR() error { return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) } -func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, err error) { - if c.opConfig.InfrastructureRolesSecretName == (spec.NamespacedName{}) { +func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (result map[string]spec.PgUser, err error) { + if *rolesSecret == (spec.NamespacedName{}) { // we don't have infrastructure roles defined, bail out return nil, nil } infraRolesSecret, err := c.KubeClient. - Secrets(c.opConfig.InfrastructureRolesSecretName.Namespace). - Get(c.opConfig.InfrastructureRolesSecretName.Name) + Secrets(rolesSecret.Namespace). + Get(rolesSecret.Name) if err != nil { - c.logger.Debugf("Infrastructure roles secret name: %s", c.opConfig.InfrastructureRolesSecretName) + c.logger.Debugf("Infrastructure roles secret name: %s", *rolesSecret) return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) } diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go new file mode 100644 index 000000000..bd7d7d049 --- /dev/null +++ b/pkg/controller/util_test.go @@ -0,0 +1,154 @@ +package controller + +import ( + "fmt" + "reflect" + "testing" + + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/pkg/api/v1" + + "github.com/zalando-incubator/postgres-operator/pkg/spec" + "github.com/zalando-incubator/postgres-operator/pkg/util/config" + "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" +) + +const ( + testInfrastructureRolesSecretName = "infrastructureroles-test" +) + +type mockSecret struct { + v1core.SecretInterface +} + +func (c *mockSecret) Get(name string) (*v1.Secret, error) { + if name != testInfrastructureRolesSecretName { + return nil, fmt.Errorf("NotFound") + } + secret := &v1.Secret{} + secret.Name = mockController.opConfig.ClusterNameLabel + secret.Data = map[string][]byte{ + "user1": []byte("testrole"), + "password1": []byte("testpassword"), + "inrole1": []byte("testinrole"), + } + return secret, nil + +} + +type MockSecretGetter struct { +} + +func (c *MockSecretGetter) Secrets(namespace string) v1core.SecretInterface { + return &mockSecret{} +} + +func newMockKubernetesClient() k8sutil.KubernetesClient { + return k8sutil.KubernetesClient{SecretsGetter: &MockSecretGetter{}} +} + +func newMockController() *Controller { + controller := NewController(&Config{}, &config.Config{}) + controller.opConfig.ClusterNameLabel = "cluster-name" + controller.opConfig.InfrastructureRolesSecretName = + spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName} + controller.opConfig.Workers = 4 + controller.KubeClient = newMockKubernetesClient() + return controller +} + +var mockController = newMockController() + +func TestPodClusterName(t *testing.T) { + var testTable = []struct { + in *v1.Pod + expected spec.NamespacedName + }{ + { + &v1.Pod{}, + spec.NamespacedName{}, + }, + { + &v1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Namespace: v1.NamespaceDefault, + Labels: map[string]string{ + mockController.opConfig.ClusterNameLabel: "testcluster", + }, + }, + }, + spec.NamespacedName{v1.NamespaceDefault, "testcluster"}, + }, + } + for _, test := range testTable { + resp := mockController.podClusterName(test.in) + if resp != test.expected { + t.Errorf("expected response %v does not match the actual %v", test.expected, resp) + } + } +} + +func TestClusterWorkerID(t *testing.T) { + var testTable = []struct { + in spec.NamespacedName + expected uint32 + }{ + { + in: spec.NamespacedName{"foo", "bar"}, + expected: 2, + }, + { + in: spec.NamespacedName{"default", "testcluster"}, + expected: 3, + }, + } + for _, test := range testTable { + resp := mockController.clusterWorkerID(test.in) + if resp != test.expected { + t.Errorf("expected response %v does not match the actual %v", test.expected, resp) + } + } +} + +func TestGetInfrastructureRoles(t *testing.T) { + var testTable = []struct { + secretName spec.NamespacedName + expectedRoles map[string]spec.PgUser + expectedError error + }{ + { + spec.NamespacedName{}, + nil, + nil, + }, + { + spec.NamespacedName{v1.NamespaceDefault, "null"}, + nil, + fmt.Errorf(`could not get infrastructure roles secret: NotFound`), + }, + { + spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName}, + map[string]spec.PgUser{ + "testrole": { + "testrole", + "testpassword", + nil, + []string{"testinrole"}, + }, + }, + nil, + }, + } + for _, test := range testTable { + roles, err := mockController.getInfrastructureRoles(&test.secretName) + if err != test.expectedError { + if err != nil && test.expectedError != nil && err.Error() == test.expectedError.Error() { + continue + } + t.Errorf("expected error '%v' does not match the actual error '%v'", test.expectedError, err) + } + if !reflect.DeepEqual(roles, test.expectedRoles) { + t.Errorf("expected roles output %v does not match the actual %v", test.expectedRoles, roles) + } + } +} diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 398003841..822395ce9 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -3,7 +3,6 @@ package spec import ( "fmt" "strings" - "database/sql" "k8s.io/client-go/pkg/api/v1" diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index f0ea50dd4..981083cb9 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -5,6 +5,9 @@ import ( "time" "k8s.io/client-go/kubernetes" + v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + extensions "k8s.io/client-go/kubernetes/typed/extensions/v1beta1" "k8s.io/client-go/pkg/api" apierrors "k8s.io/client-go/pkg/api/errors" "k8s.io/client-go/pkg/api/unversioned" @@ -18,6 +21,32 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) +type KubernetesClient struct { + v1core.SecretsGetter + v1core.ServicesGetter + v1core.EndpointsGetter + v1core.PodsGetter + v1core.PersistentVolumesGetter + v1core.PersistentVolumeClaimsGetter + v1core.ConfigMapsGetter + v1beta1.StatefulSetsGetter + extensions.ThirdPartyResourcesGetter +} + +func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { + c = KubernetesClient{} + c.PodsGetter = src.CoreV1() + c.ServicesGetter = src.CoreV1() + c.EndpointsGetter = src.CoreV1() + c.SecretsGetter = src.CoreV1() + c.ConfigMapsGetter = src.CoreV1() + c.PersistentVolumeClaimsGetter = src.CoreV1() + c.PersistentVolumesGetter = src.CoreV1() + c.StatefulSetsGetter = src.AppsV1beta1() + c.ThirdPartyResourcesGetter = src.ExtensionsV1beta1() + return +} + func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { if outOfCluster { return clientcmd.BuildConfigFromFlags("", kubeConfig) @@ -25,7 +54,7 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { return rest.InClusterConfig() } -func KubernetesClient(config *rest.Config) (client *kubernetes.Clientset, err error) { +func ClientSet(config *rest.Config) (client *kubernetes.Clientset, err error) { return kubernetes.NewForConfig(config) } From 1f8b37f33d703116ffb2f305999c8e8e8664e88a Mon Sep 17 00:00:00 2001 From: Murat Kabilov Date: Tue, 25 Jul 2017 15:25:17 +0200 Subject: [PATCH 2/2] Make use of kubernetes client-go v4 * client-go v4.0.0-beta0 * remove unnecessary methods for tpr object * rest client: use interface instead of structure pointer * proper names for constants; some clean up for log messages * remove teams api client from controller and make it per cluster --- cmd/main.go | 82 ++------- glide.lock | 225 ++++++++--------------- glide.yaml | 55 +++--- pkg/cluster/cluster.go | 62 ++++--- pkg/cluster/exec.go | 14 +- pkg/cluster/filesystems.go | 2 +- pkg/cluster/k8sres.go | 45 ++--- pkg/cluster/pg.go | 2 +- pkg/cluster/pod.go | 23 +-- pkg/cluster/resources.go | 73 ++++---- pkg/cluster/sync.go | 8 +- pkg/cluster/util.go | 35 ++-- pkg/cluster/volumes.go | 31 ++-- pkg/controller/controller.go | 204 ++++++++++++-------- pkg/controller/pod.go | 31 +--- pkg/controller/postgresql.go | 149 ++++++++------- pkg/controller/util.go | 23 +-- pkg/controller/util_test.go | 8 +- pkg/spec/postgresql.go | 37 +--- pkg/spec/postgresql_test.go | 35 ++-- pkg/spec/types.go | 4 +- pkg/spec/types_test.go | 2 +- pkg/util/constants/kubernetes.go | 5 +- pkg/util/constants/roles.go | 2 +- pkg/util/constants/thirdpartyresource.go | 7 +- pkg/util/filesystems/ext234.go | 2 +- pkg/util/k8sutil/k8sutil.go | 47 ++--- pkg/util/users/users.go | 8 +- pkg/util/util.go | 4 +- pkg/util/util_test.go | 8 +- pkg/util/volumes/ebs.go | 12 +- 31 files changed, 579 insertions(+), 666 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index c38eb2e91..3093cedb8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,64 +9,41 @@ import ( "syscall" "github.com/zalando-incubator/postgres-operator/pkg/controller" - "github.com/zalando-incubator/postgres-operator/pkg/spec" - "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) var ( - KubeConfigFile string - podNamespace string - configMapName spec.NamespacedName - OutOfCluster bool - noTeamsAPI bool - noDatabaseAccess bool - version string + KubeConfigFile string + OutOfCluster bool + version string + + config controller.Config ) func init() { flag.StringVar(&KubeConfigFile, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") flag.BoolVar(&OutOfCluster, "outofcluster", false, "Whether the operator runs in- our outside of the Kubernetes cluster.") - flag.BoolVar(&noDatabaseAccess, "nodatabaseaccess", false, "Disable all access to the database from the operator side.") - flag.BoolVar(&noTeamsAPI, "noteamsapi", false, "Disable all access to the teams API") + flag.BoolVar(&config.NoDatabaseAccess, "nodatabaseaccess", false, "Disable all access to the database from the operator side.") + flag.BoolVar(&config.NoTeamsAPI, "noteamsapi", false, "Disable all access to the teams API") flag.Parse() - podNamespace = os.Getenv("MY_POD_NAMESPACE") - if podNamespace == "" { - podNamespace = "default" + config.Namespace = os.Getenv("MY_POD_NAMESPACE") + if config.Namespace == "" { + config.Namespace = "default" } configMap := os.Getenv("CONFIG_MAP_NAME") if configMap != "" { - configMapName.Decode(configMap) - } -} - -func ControllerConfig() *controller.Config { - restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster) - if err != nil { - log.Fatalf("Can't get REST config: %s", err) - } - - client, err := k8sutil.ClientSet(restConfig) - if err != nil { - log.Fatalf("Can't create client: %s", err) - } - - restClient, err := k8sutil.KubernetesRestClient(restConfig) - if err != nil { - log.Fatalf("Can't create rest client: %s", err) - } - - return &controller.Config{ - RestConfig: restConfig, - KubeClient: k8sutil.NewFromKubernetesInterface(client), - RestClient: restClient, + err := config.ConfigMapName.Decode(configMap) + if err != nil { + log.Fatalf("incorrect config map name") + } } } func main() { - configMapData := make(map[string]string) + var err error + log.SetOutput(os.Stdout) log.Printf("Spilo operator %s\n", version) @@ -76,32 +53,13 @@ func main() { wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on - controllerConfig := ControllerConfig() - - if configMapName != (spec.NamespacedName{}) { - configMap, err := controllerConfig.KubeClient.ConfigMaps(configMapName.Namespace).Get(configMapName.Name) - if err != nil { - panic(err) - } - - configMapData = configMap.Data - } else { - log.Printf("No ConfigMap specified. Loading default values") + config.RestConfig, err = k8sutil.RestConfig(KubeConfigFile, OutOfCluster) + if err != nil { + log.Fatalf("couldn't get REST config: %v", err) } - if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var - configMapData["namespace"] = podNamespace - } - if noDatabaseAccess { - configMapData["enable_database_access"] = "false" - } - if noTeamsAPI { - configMapData["enable_teams_api"] = "false" - } - cfg := config.NewFromMap(configMapData) - log.Printf("Config: %s", cfg.MustMarshal()) + c := controller.NewController(&config) - c := controller.NewController(controllerConfig, cfg) c.Run(stop, wg) sig := <-sigs diff --git a/glide.lock b/glide.lock index 24db2b0d8..f9a3aa4b1 100644 --- a/glide.lock +++ b/glide.lock @@ -1,13 +1,8 @@ -hash: 427db08c70ab32596f9230f0111e24996f73b1b66ddd7365dd0b1b38c0ae367f -updated: 2017-05-19T17:11:37.120200516+02:00 +hash: 140e0c8a606d18ca405e9c50359cc673e4aa0cc88bcae5d7f83791e7002bd6a1 +updated: 2017-07-24T19:24:17.604824235+02:00 imports: -- name: cloud.google.com/go - version: 3b1ae45394a234c385be014e9a488f2bb6eef821 - subpackages: - - compute/metadata - - internal - name: github.com/aws/aws-sdk-go - version: e766cfe96ef7320817087fa4cd92c09abdb87310 + version: afd601335e2a72d43caa3af6bd2abe512fcc3bfd subpackages: - aws - aws/awserr @@ -25,40 +20,17 @@ imports: - aws/request - aws/session - aws/signer/v4 + - internal/shareddefaults - private/protocol - private/protocol/ec2query - - private/protocol/json/jsonutil - - private/protocol/jsonrpc - private/protocol/query - private/protocol/query/queryutil - private/protocol/rest - - private/protocol/restxml - private/protocol/xml/xmlutil - - private/waiter - - service/autoscaling - service/ec2 - - service/ecr - - service/elb - - service/route53 - service/sts -- name: github.com/blang/semver - version: 31b736133b98f26d5e078ec9eb591666edfd091f -- name: github.com/coreos/go-oidc - version: 5644a2f50e2d2d5ba0b474bc5bc55fea1925936d - subpackages: - - http - - jose - - key - - oauth2 - - oidc -- name: github.com/coreos/pkg - version: fa29b1d70f0beaddd4c7021607cc3c3be8ce94b8 - subpackages: - - health - - httputil - - timeutil - name: github.com/davecgh/go-spew - version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d + version: 782f4967f2dc4564575ca782fe2d04090b5faca8 subpackages: - spew - name: github.com/docker/distribution @@ -71,49 +43,52 @@ imports: subpackages: - spdy - name: github.com/emicklei/go-restful - version: 89ef8af493ab468a45a42bb0d89a06fccdd2fb22 + version: ff4f55a206334ef123e4f79bbf348980da81ca46 subpackages: - log - - swagger +- name: github.com/emicklei/go-restful-swagger12 + version: dcef7f55730566d41eae5db10e7d6981829720f6 - name: github.com/ghodss/yaml version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee - name: github.com/go-ini/ini - version: 2e44421e256d82ebbf3d4d4fcabe8930b905eff3 + version: 3d73f4b845efdf9989fffd4b4e562727744a34ba +- name: github.com/go-openapi/analysis + version: b44dc874b601d9e4e2f6e19140e794ba24bead3b - name: github.com/go-openapi/jsonpointer version: 46af16f9f7b149af66e5d1bd010e3574dc06de98 - name: github.com/go-openapi/jsonreference version: 13c6e3589ad90f49bd3e3bbe2c2cb3d7a4142272 +- name: github.com/go-openapi/loads + version: 18441dfa706d924a39a030ee2c3b1d8d81917b38 - name: github.com/go-openapi/spec version: 6aced65f8501fe1217321abf0749d354824ba2ff - name: github.com/go-openapi/swag version: 1d0bd113de87027671077d3c71eb3ac5d7dbba72 - name: github.com/gogo/protobuf - version: 100ba4e885062801d56799d78530b73b178a78f3 + version: c0656edd0d9eab7c66d1eb0c568f9039345796f7 subpackages: - proto - sortkeys - name: github.com/golang/glog version: 44145f04b68cf362d9c4df2182967c2275eaefed -- name: github.com/golang/protobuf - version: 8616e8ee5e20a1704615e6c8d7afcdac06087a67 - subpackages: - - proto - name: github.com/google/gofuzz - version: bbcb9da2d746f8bdbd6a936686a0a6067ada0ec5 + version: 44d81051d367757e1c7c6a5a86423ece9afcf63c +- name: github.com/hashicorp/golang-lru + version: a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4 + subpackages: + - simplelru - name: github.com/howeyc/gopass - version: 3ca23474a7c7203e0a0a070fd33508f6efdb9b3d + version: bf9dde6d0d2c004a008c27aaee91170c786f6db8 - name: github.com/imdario/mergo version: 6633656539c1639d9d78127b7d47c622b5d7b6dc - name: github.com/jmespath/go-jmespath - version: 3433f3ea46d9f8019119e7dd41274e112a2359a9 -- name: github.com/jonboulle/clockwork - version: 72f9bd7c4e0c2a40055ab3d0f09654f730cce982 + version: bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d - name: github.com/juju/ratelimit - version: 77ed1c8a01217656d2080ad51981f6e99adaa177 + version: 5b9ff866471762aa2ab2dced63c9fb6f53921342 - name: github.com/kr/text version: 7cafcd837844e784b526369c9bce262804aebc60 - name: github.com/lib/pq - version: 2704adc878c21e1329f46f6e56a1c387d788ff94 + version: dd1fe2071026ce53f36a39112e645b4d4f5793a4 subpackages: - oid - name: github.com/mailru/easyjson @@ -124,44 +99,34 @@ imports: - jwriter - name: github.com/motomux/pretty version: b2aad2c9a95d14eb978f29baa6e3a5c3c20eef30 -- name: github.com/pborman/uuid - version: ca53cad383cad2479bbba7f7a1a05797ec1386e4 - name: github.com/PuerkitoBio/purell version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 - name: github.com/PuerkitoBio/urlesc version: 5bd2802263f21d8788851d5305584c82a5c75d7e - name: github.com/Sirupsen/logrus - version: ba1b36c82c5e05c4f912a88eab0dcd91a171688f - subpackages: - - client + version: a3f95b5c423586578a4e099b11a46c2479628cac - name: github.com/spf13/pflag - version: 5ccb023bc27df288a957c5e994cd44fd19619465 + version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 - name: github.com/ugorji/go - version: f1f1a805ed361a0e078bb537e4ea78cd37dcf065 + version: ded73eae5db7e7a0ef6f55aace87a2873c5d2b74 subpackages: - codec - name: golang.org/x/crypto - version: 1f22c0103821b9390939b6776727195525381532 + version: d172538b2cfce0c13cee31e647d0367aa8cd2486 subpackages: - ssh/terminal - name: golang.org/x/net - version: e90d6d0afc4c315a0d87a568ae68577cc15149a0 + version: f2499483f923065a842d38eb4c7f1927e6fc6e6d subpackages: - - context - - context/ctxhttp + - html + - html/atom - http2 - http2/hpack - idna - lex/httplex -- name: golang.org/x/oauth2 - version: 3c3a985cb79f52a3190fbc056984415ca6763d01 - subpackages: - - google - - internal - - jws - - jwt + - websocket - name: golang.org/x/sys - version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 + version: c4489faa6e5ab84c0ef40d6ee878f7a030281f0f subpackages: - unix - name: golang.org/x/text @@ -177,25 +142,14 @@ imports: - unicode/bidi - unicode/norm - width -- name: google.golang.org/appengine - version: 4f7eeb5305a4ba1966344836ba4af9996b7b4e05 - subpackages: - - internal - - internal/app_identity - - internal/base - - internal/datastore - - internal/log - - internal/modules - - internal/remote_api - - internal/urlfetch - - urlfetch - name: gopkg.in/inf.v0 version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 - name: gopkg.in/yaml.v2 version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 - name: k8s.io/apimachinery - version: 84c15da65eb86243c295d566203d7689cc6ac04b + version: abe34e4f5b4413c282a83011892cbeea5b32223b subpackages: + - pkg/api/equality - pkg/api/errors - pkg/api/meta - pkg/api/resource @@ -204,8 +158,10 @@ imports: - pkg/apimachinery/registered - pkg/apis/meta/v1 - pkg/apis/meta/v1/unstructured + - pkg/apis/meta/v1alpha1 - pkg/conversion - pkg/conversion/queryparams + - pkg/conversion/unstructured - pkg/fields - pkg/labels - pkg/openapi @@ -215,9 +171,13 @@ imports: - pkg/runtime/serializer/json - pkg/runtime/serializer/protobuf - pkg/runtime/serializer/recognizer + - pkg/runtime/serializer/streaming - pkg/runtime/serializer/versioning - pkg/selection - pkg/types + - pkg/util/cache + - pkg/util/clock + - pkg/util/diff - pkg/util/errors - pkg/util/framer - pkg/util/httpstream @@ -225,6 +185,7 @@ imports: - pkg/util/intstr - pkg/util/json - pkg/util/net + - pkg/util/rand - pkg/util/remotecommand - pkg/util/runtime - pkg/util/sets @@ -232,116 +193,76 @@ imports: - pkg/util/validation/field - pkg/util/wait - pkg/util/yaml + - pkg/version - pkg/watch - third_party/forked/golang/netutil - third_party/forked/golang/reflect - name: k8s.io/client-go - version: e121606b0d09b2e1c467183ee46217fa85a6b672 + version: df46f7f13b3da19b90b8b4f0d18b8adc6fbf28dc subpackages: - discovery - kubernetes + - kubernetes/scheme + - kubernetes/typed/admissionregistration/v1alpha1 - kubernetes/typed/apps/v1beta1 + - kubernetes/typed/authentication/v1 - kubernetes/typed/authentication/v1beta1 + - kubernetes/typed/authorization/v1 - kubernetes/typed/authorization/v1beta1 - kubernetes/typed/autoscaling/v1 + - kubernetes/typed/autoscaling/v2alpha1 - kubernetes/typed/batch/v1 - kubernetes/typed/batch/v2alpha1 - - kubernetes/typed/certificates/v1alpha1 + - kubernetes/typed/certificates/v1beta1 - kubernetes/typed/core/v1 - kubernetes/typed/extensions/v1beta1 + - kubernetes/typed/networking/v1 - kubernetes/typed/policy/v1beta1 - kubernetes/typed/rbac/v1alpha1 + - kubernetes/typed/rbac/v1beta1 + - kubernetes/typed/settings/v1alpha1 + - kubernetes/typed/storage/v1 - kubernetes/typed/storage/v1beta1 - pkg/api - - pkg/api/errors - - pkg/api/install - - pkg/api/meta - - pkg/api/meta/metatypes - - pkg/api/resource - - pkg/api/unversioned - pkg/api/v1 - - pkg/api/validation/path - - pkg/apimachinery - - pkg/apimachinery/announced - - pkg/apimachinery/registered + - pkg/api/v1/ref + - pkg/apis/admissionregistration + - pkg/apis/admissionregistration/v1alpha1 - pkg/apis/apps - - pkg/apis/apps/install - pkg/apis/apps/v1beta1 - pkg/apis/authentication - - pkg/apis/authentication/install + - pkg/apis/authentication/v1 - pkg/apis/authentication/v1beta1 - pkg/apis/authorization - - pkg/apis/authorization/install + - pkg/apis/authorization/v1 - pkg/apis/authorization/v1beta1 - pkg/apis/autoscaling - - pkg/apis/autoscaling/install - pkg/apis/autoscaling/v1 + - pkg/apis/autoscaling/v2alpha1 - pkg/apis/batch - - pkg/apis/batch/install - pkg/apis/batch/v1 - pkg/apis/batch/v2alpha1 - pkg/apis/certificates - - pkg/apis/certificates/install - - pkg/apis/certificates/v1alpha1 + - pkg/apis/certificates/v1beta1 - pkg/apis/extensions - - pkg/apis/extensions/install - pkg/apis/extensions/v1beta1 + - pkg/apis/networking + - pkg/apis/networking/v1 - pkg/apis/policy - - pkg/apis/policy/install - pkg/apis/policy/v1beta1 - pkg/apis/rbac - - pkg/apis/rbac/install - pkg/apis/rbac/v1alpha1 + - pkg/apis/rbac/v1beta1 + - pkg/apis/settings + - pkg/apis/settings/v1alpha1 - pkg/apis/storage - - pkg/apis/storage/install + - pkg/apis/storage/v1 - pkg/apis/storage/v1beta1 - - pkg/auth/user - - pkg/conversion - - pkg/conversion/queryparams - - pkg/fields - - pkg/genericapiserver/openapi/common - - pkg/labels - - pkg/runtime - - pkg/runtime/serializer - - pkg/runtime/serializer/json - - pkg/runtime/serializer/protobuf - - pkg/runtime/serializer/recognizer - - pkg/runtime/serializer/streaming - - pkg/runtime/serializer/versioning - - pkg/selection - - pkg/third_party/forked/golang/reflect - - pkg/third_party/forked/golang/template - - pkg/types - pkg/util - - pkg/util/cert - - pkg/util/clock - - pkg/util/diff - - pkg/util/errors - - pkg/util/flowcontrol - - pkg/util/framer - - pkg/util/homedir - - pkg/util/integer - - pkg/util/intstr - - pkg/util/json - - pkg/util/jsonpath - - pkg/util/labels - - pkg/util/net - pkg/util/parsers - - pkg/util/rand - - pkg/util/runtime - - pkg/util/sets - - pkg/util/uuid - - pkg/util/validation - - pkg/util/validation/field - - pkg/util/wait - - pkg/util/yaml - pkg/version - - pkg/watch - - pkg/watch/versioned - - plugin/pkg/client/auth - - plugin/pkg/client/auth/gcp - - plugin/pkg/client/auth/oidc - rest + - rest/watch - tools/auth - tools/cache - tools/clientcmd @@ -349,11 +270,11 @@ imports: - tools/clientcmd/api/latest - tools/clientcmd/api/v1 - tools/metrics + - tools/remotecommand - transport -- name: k8s.io/kubernetes - version: ee39d359dd0896c4c0eccf23f033f158ad3d3bd7 - subpackages: - - pkg/api - - pkg/client/unversioned/remotecommand - - pkg/util/exec + - util/cert + - util/exec + - util/flowcontrol + - util/homedir + - util/integer testImports: [] diff --git a/glide.yaml b/glide.yaml index 12ad3c548..fead69587 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,26 +1,39 @@ package: github.com/zalando-incubator/postgres-operator import: -- package: github.com/gogo/protobuf - version: ^0.3.0 - package: github.com/Sirupsen/logrus - version: ^0.11.5 - subpackages: - - client -- package: github.com/lib/pq -- package: github.com/motomux/pretty -- package: golang.org/x/net - subpackages: - - context -- package: k8s.io/apimachinery - version: 84c15da65eb86243c295d566203d7689cc6ac04b - subpackages: - - pkg/util/json - - pkg/util/remotecommand -- package: k8s.io/client-go - version: ^2.0.0 -- package: k8s.io/kubernetes - version: ee39d359dd0896c4c0eccf23f033f158ad3d3bd7 - subpackages: - - pkg/client/unversioned/remotecommand + version: ^1.0.1 - package: github.com/aws/aws-sdk-go version: ^1.8.24 + subpackages: + - aws + - aws/session + - service/ec2 +- package: github.com/lib/pq +- package: github.com/motomux/pretty +- package: k8s.io/apimachinery + subpackages: + - pkg/api/errors + - pkg/api/meta + - pkg/api/resource + - pkg/apis/meta/v1 + - pkg/fields + - pkg/labels + - pkg/runtime + - pkg/runtime/schema + - pkg/runtime/serializer + - pkg/types + - pkg/util/intstr + - pkg/util/remotecommand + - pkg/watch +- package: k8s.io/client-go + version: ^4.0.0-beta.0 + subpackages: + - kubernetes + - pkg/api + - pkg/api/v1 + - pkg/apis/apps/v1beta1 + - pkg/apis/extensions/v1beta1 + - rest + - tools/cache + - tools/clientcmd + - tools/remotecommand diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 8683803b9..1d7ffef95 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -11,10 +11,10 @@ import ( "sync" "github.com/Sirupsen/logrus" - "k8s.io/client-go/pkg/api" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" - "k8s.io/client-go/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -35,11 +35,8 @@ var ( // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. type Config struct { - KubeClient k8sutil.KubernetesClient - RestClient *rest.RESTClient - RestConfig *rest.Config - TeamsAPIClient *teams.API OpConfig config.Config + RestConfig *rest.Config InfrastructureRoles map[string]spec.PgUser // inherited from the controller } @@ -65,8 +62,11 @@ type Cluster struct { mu sync.Mutex masterLess bool userSyncStrategy spec.UserSyncer - deleteOptions *v1.DeleteOptions + deleteOptions *metav1.DeleteOptions podEventsQueue *cache.FIFO + + teamsAPIClient *teams.API + KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? } type compareStatefulsetResult struct { @@ -77,8 +77,8 @@ type compareStatefulsetResult struct { } // New creates a new cluster. This function should be called from a controller. -func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { - lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) +func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { + lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Name) kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} orphanDependents := true @@ -101,15 +101,17 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { kubeResources: kubeResources, masterLess: false, userSyncStrategy: users.DefaultUserSyncStrategy{}, - deleteOptions: &v1.DeleteOptions{OrphanDependents: &orphanDependents}, + deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, podEventsQueue: podEventsQueue, + KubeClient: kubeClient, + teamsAPIClient: teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger.Logger), } return cluster } func (c *Cluster) clusterName() spec.NamespacedName { - return util.NameFromMeta(c.Metadata) + return util.NameFromMeta(c.ObjectMeta) } func (c *Cluster) teamName() string { @@ -125,8 +127,8 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { } request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods - _, err = c.RestClient.Patch(api.MergePatchType). - RequestURI(c.Metadata.GetSelfLink()). + _, err = c.KubeClient.RESTClient.Patch(types.MergePatchType). + RequestURI(c.GetSelfLink()). Body(request). DoRaw() @@ -136,7 +138,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { } if err != nil { - c.logger.Warningf("could not set status for cluster '%s': %s", c.clusterName(), err) + c.logger.Warningf("could not set status for cluster %q: %v", c.clusterName(), err) } } @@ -179,7 +181,7 @@ func (c *Cluster) Create() error { if err != nil { return fmt.Errorf("could not create endpoint: %v", err) } - c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) + c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) for _, role := range []PostgresRole{Master, Replica} { if role == Replica && !c.Spec.ReplicaLoadBalancer { @@ -189,7 +191,7 @@ func (c *Cluster) Create() error { if err != nil { return fmt.Errorf("could not create %s service: %v", role, err) } - c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) + c.logger.Infof("%s service %q has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) } if err = c.initUsers(); err != nil { @@ -206,12 +208,12 @@ func (c *Cluster) Create() error { if err != nil { return fmt.Errorf("could not create statefulset: %v", err) } - c.logger.Infof("statefulset '%s' has been successfully created", util.NameFromMeta(ss.ObjectMeta)) + c.logger.Infof("statefulset %q has been successfully created", util.NameFromMeta(ss.ObjectMeta)) c.logger.Info("Waiting for cluster being ready") if err = c.waitStatefulsetPodsReady(); err != nil { - c.logger.Errorf("Failed to create cluster: %s", err) + c.logger.Errorf("Failed to create cluster: %v", err) return err } c.logger.Infof("pods are ready") @@ -232,7 +234,7 @@ func (c *Cluster) Create() error { err = c.listResources() if err != nil { - c.logger.Errorf("could not list resources: %s", err) + c.logger.Errorf("could not list resources: %v", err) } return nil @@ -242,7 +244,7 @@ func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match //TODO: improve comparison match = true if c.Service[role].Spec.Type != service.Spec.Type { - return false, fmt.Sprintf("new %s service's type %s doesn't match the current one %s", + return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q", role, service.Spec.Type, c.Service[role].Spec.Type) } oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges @@ -258,7 +260,7 @@ func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation] newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation] if oldDNSAnnotation != newDNSAnnotation { - return false, fmt.Sprintf("new %s service's '%s' annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) + return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation) } return true, "" @@ -289,7 +291,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp } if len(c.Statefulset.Spec.Template.Spec.Containers) == 0 { - c.logger.Warnf("statefulset '%s' has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) + c.logger.Warnf("statefulset %q has no container", util.NameFromMeta(c.Statefulset.ObjectMeta)) return &compareStatefulsetResult{} } // In the comparisons below, the needsReplace and needsRollUpdate flags are never reset, since checks fall through @@ -332,12 +334,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) { needsReplace = true - reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %s doesn't match the current one", name)) + reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q doesn't match the current one", name)) } if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) { name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name needsReplace = true - reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %s doesn't match the current one", name)) + reasons = append(reasons, fmt.Sprintf("new statefulset's volumeClaimTemplates specification for volume %q doesn't match the current one", name)) } } @@ -404,8 +406,8 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { defer c.mu.Unlock() c.setStatus(spec.ClusterStatusUpdating) - c.logger.Debugf("Cluster update from version %s to %s", - c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion) + c.logger.Debugf("Cluster update from version %q to %q", + c.ResourceVersion, newSpec.ResourceVersion) /* Make sure we update when this function exists */ defer func() { @@ -430,7 +432,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { if err != nil { return fmt.Errorf("could not create new %s service: %v", role, err) } - c.logger.Infof("%s service '%s' has been created", role, util.NameFromMeta(service.ObjectMeta)) + c.logger.Infof("%s service %q has been created", role, util.NameFromMeta(service.ObjectMeta)) } } // only proceed further if both old and new load balancer were present @@ -445,7 +447,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.setStatus(spec.ClusterStatusUpdateFailed) return fmt.Errorf("could not update %s service: %v", role, err) } - c.logger.Infof("%s service '%s' has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) + c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) } } @@ -470,11 +472,11 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { } } //TODO: if there is a change in numberOfInstances, make sure Pods have been created/deleted - c.logger.Infof("statefulset '%s' has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) + c.logger.Infof("statefulset %q has been updated", util.NameFromMeta(c.Statefulset.ObjectMeta)) } if c.Spec.PgVersion != newSpec.Spec.PgVersion { // PG versions comparison - c.logger.Warnf("Postgresql version change(%s -> %s) is not allowed", + c.logger.Warnf("Postgresql version change(%q -> %q) is not allowed", c.Spec.PgVersion, newSpec.Spec.PgVersion) //TODO: rewrite pg version in tpr spec } diff --git a/pkg/cluster/exec.go b/pkg/cluster/exec.go index fbd913c21..e00715a5d 100644 --- a/pkg/cluster/exec.go +++ b/pkg/cluster/exec.go @@ -4,9 +4,11 @@ import ( "bytes" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" - "k8s.io/client-go/pkg/api" - "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/remotecommand" "github.com/zalando-incubator/postgres-operator/pkg/spec" ) @@ -17,7 +19,7 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) ( execErr bytes.Buffer ) - pod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name) + pod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) if err != nil { return "", fmt.Errorf("could not get pod info: %v", err) } @@ -26,17 +28,17 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) ( return "", fmt.Errorf("could not determine which container to use") } - req := c.RestClient.Post(). + req := c.KubeClient.RESTClient.Post(). Resource("pods"). Name(podName.Name). Namespace(podName.Namespace). SubResource("exec") - req.VersionedParams(&api.PodExecOptions{ + req.VersionedParams(&v1.PodExecOptions{ Container: pod.Spec.Containers[0].Name, Command: command, Stdout: true, Stderr: true, - }, api.ParameterCodec) + }, scheme.ParameterCodec) exec, err := remotecommand.NewExecutor(c.RestConfig, "POST", req.URL()) if err != nil { diff --git a/pkg/cluster/filesystems.go b/pkg/cluster/filesystems.go index e68e5614c..65e7048c7 100644 --- a/pkg/cluster/filesystems.go +++ b/pkg/cluster/filesystems.go @@ -39,5 +39,5 @@ func (c *Cluster) resizePostgresFilesystem(podName *spec.NamespacedName, resizer return err } - return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %s", fsType) + return fmt.Errorf("could not resize filesystem: no compatible resizers for the filesystem of type %q", fsType) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 0cba32837..3ddefbb90 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1,14 +1,15 @@ package cluster import ( + "encoding/json" "fmt" "sort" - "encoding/json" - "k8s.io/client-go/pkg/api/resource" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" - "k8s.io/client-go/pkg/util/intstr" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" @@ -198,7 +199,7 @@ PATRONI_INITDB_PARAMS: } result, err := json.Marshal(config) if err != nil { - c.logger.Errorf("Cannot convert spilo configuration into JSON: %s", err) + c.logger.Errorf("Cannot convert spilo configuration into JSON: %v", err) return "" } return string(result) @@ -210,7 +211,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme envVars := []v1.EnvVar{ { Name: "SCOPE", - Value: c.Metadata.Name, + Value: c.Name, }, { Name: "PGROOT", @@ -273,7 +274,7 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme } privilegedMode := bool(true) container := v1.Container{ - Name: c.Metadata.Name, + Name: c.Name, Image: c.OpConfig.DockerImage, ImagePullPolicy: v1.PullAlways, Resources: *resourceRequirements, @@ -311,9 +312,9 @@ func (c *Cluster) generatePodTemplate(resourceRequirements *v1.ResourceRequireme } template := v1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Labels: c.labelsSet(), - Namespace: c.Metadata.Name, + Namespace: c.Name, }, Spec: podSpec, } @@ -337,14 +338,14 @@ func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.Stateful } statefulSet := &v1beta1.StatefulSet{ - ObjectMeta: v1.ObjectMeta{ - Name: c.Metadata.Name, - Namespace: c.Metadata.Namespace, + ObjectMeta: metav1.ObjectMeta{ + Name: c.Name, + Namespace: c.Namespace, Labels: c.labelsSet(), }, Spec: v1beta1.StatefulSetSpec{ Replicas: &spec.NumberOfInstances, - ServiceName: c.Metadata.Name, + ServiceName: c.Name, Template: *podTemplate, VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate}, }, @@ -354,7 +355,7 @@ func (c *Cluster) generateStatefulSet(spec spec.PostgresSpec) (*v1beta1.Stateful } func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { - metadata := v1.ObjectMeta{ + metadata := metav1.ObjectMeta{ Name: constants.DataVolumeName, } if volumeStorageClass != "" { @@ -386,7 +387,7 @@ func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) { secrets = make(map[string]*v1.Secret, len(c.pgUsers)) - namespace := c.Metadata.Namespace + namespace := c.Namespace for username, pgUser := range c.pgUsers { //Skip users with no password i.e. human users (they'll be authenticated using pam) secret := c.generateSingleUserSecret(namespace, pgUser) @@ -412,7 +413,7 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) } username := pgUser.Name secret := v1.Secret{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: c.credentialSecretName(username), Namespace: namespace, Labels: c.labelsSet(), @@ -429,7 +430,7 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service { dnsNameFunction := c.masterDnsName - name := c.Metadata.Name + name := c.Name if role == Replica { dnsNameFunction = c.replicaDnsName name = name + "-repl" @@ -450,7 +451,7 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) if (newSpec.UseLoadBalancer != nil && *newSpec.UseLoadBalancer) || (newSpec.UseLoadBalancer == nil && c.OpConfig.EnableLoadBalancer) { - // safe default value: lock load balancer to only local address unless overriden explicitely. + // safe default value: lock load balancer to only local address unless overridden explicitly. sourceRanges := []string{localHost} allowedSourceRanges := newSpec.AllowedSourceRanges if len(allowedSourceRanges) >= 0 { @@ -468,9 +469,9 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) } service := &v1.Service{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: c.Metadata.Namespace, + Namespace: c.Namespace, Labels: c.roleLabelsSet(role), Annotations: annotations, }, @@ -482,9 +483,9 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpoints { endpoints := &v1.Endpoints{ - ObjectMeta: v1.ObjectMeta{ - Name: c.Metadata.Name, - Namespace: c.Metadata.Namespace, + ObjectMeta: metav1.ObjectMeta{ + Name: c.Name, + Namespace: c.Namespace, Labels: c.roleLabelsSet(Master), }, } diff --git a/pkg/cluster/pg.go b/pkg/cluster/pg.go index fd677006c..d06da2b81 100644 --- a/pkg/cluster/pg.go +++ b/pkg/cluster/pg.go @@ -22,7 +22,7 @@ var getUserSQL = `SELECT a.rolname, COALESCE(a.rolpassword, ''), a.rolsuper, a.r ORDER BY 1;` func (c *Cluster) pgConnectionString() string { - hostname := fmt.Sprintf("%s.%s.svc.cluster.local", c.Metadata.Name, c.Metadata.Namespace) + hostname := fmt.Sprintf("%s.%s.svc.cluster.local", c.Name, c.Namespace) username := c.systemUsers[constants.SuperuserKeyName].Name password := c.systemUsers[constants.SuperuserKeyName].Password diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 52ecdfffe..2b7ed438e 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -3,6 +3,7 @@ package cluster import ( "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" @@ -11,8 +12,8 @@ import ( ) func (c *Cluster) listPods() ([]v1.Pod, error) { - ns := c.Metadata.Namespace - listOptions := v1.ListOptions{ + ns := c.Namespace + listOptions := metav1.ListOptions{ LabelSelector: c.labelsSet().String(), } @@ -34,11 +35,11 @@ func (c *Cluster) deletePods() error { for _, obj := range pods { podName := util.NameFromMeta(obj.ObjectMeta) - c.logger.Debugf("Deleting pod '%s'", podName) + c.logger.Debugf("Deleting pod %q", podName) if err := c.deletePod(podName); err != nil { - c.logger.Errorf("could not delete pod '%s': %s", podName, err) + c.logger.Errorf("could not delete pod %q: %v", podName, err) } else { - c.logger.Infof("pod '%s' has been deleted", podName) + c.logger.Infof("pod %q has been deleted", podName) } } if len(pods) > 0 { @@ -106,16 +107,16 @@ func (c *Cluster) recreatePod(pod v1.Pod) error { if err := c.waitForPodLabel(ch); err != nil { return err } - c.logger.Infof("pod '%s' is ready", podName) + c.logger.Infof("pod %q is ready", podName) return nil } func (c *Cluster) recreatePods() error { ls := c.labelsSet() - namespace := c.Metadata.Namespace + namespace := c.Namespace - listOptions := v1.ListOptions{ + listOptions := metav1.ListOptions{ LabelSelector: ls.String(), } @@ -135,7 +136,7 @@ func (c *Cluster) recreatePods() error { } if err := c.recreatePod(pod); err != nil { - return fmt.Errorf("could not recreate replica pod '%s': %v", util.NameFromMeta(pod.ObjectMeta), err) + return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) } } if masterPod.Name == "" { @@ -143,10 +144,10 @@ func (c *Cluster) recreatePods() error { } else { //TODO: do manual failover //TODO: specify master, leave new master empty - c.logger.Infof("Recreating master pod '%s'", util.NameFromMeta(masterPod.ObjectMeta)) + c.logger.Infof("Recreating master pod %q", util.NameFromMeta(masterPod.ObjectMeta)) if err := c.recreatePod(masterPod); err != nil { - return fmt.Errorf("could not recreate master pod '%s': %v", util.NameFromMeta(masterPod.ObjectMeta), err) + return fmt.Errorf("could not recreate master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err) } } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index b71f8355f..426890314 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -3,7 +3,8 @@ package cluster import ( "fmt" - "k8s.io/client-go/pkg/api" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" @@ -15,8 +16,8 @@ import ( ) func (c *Cluster) loadResources() error { - ns := c.Metadata.Namespace - listOptions := v1.ListOptions{ + ns := c.Namespace + listOptions := metav1.ListOptions{ LabelSelector: c.labelsSet().String(), } @@ -60,7 +61,7 @@ func (c *Cluster) loadResources() error { continue } c.Secrets[secret.UID] = &secrets.Items[i] - c.logger.Debugf("secret loaded, uid: %s", secret.UID) + c.logger.Debugf("secret loaded, uid: %q", secret.UID) } statefulSets, err := c.KubeClient.StatefulSets(ns).List(listOptions) @@ -79,19 +80,19 @@ func (c *Cluster) loadResources() error { func (c *Cluster) listResources() error { if c.Statefulset != nil { - c.logger.Infof("Found statefulset: %s (uid: %s)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) + c.logger.Infof("Found statefulset: %q (uid: %q)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) } for _, obj := range c.Secrets { - c.logger.Infof("Found secret: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) + c.logger.Infof("Found secret: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } if c.Endpoint != nil { - c.logger.Infof("Found endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) + c.logger.Infof("Found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) } for role, service := range c.Service { - c.logger.Infof("Found %s service: %s (uid: %s)", role, util.NameFromMeta(service.ObjectMeta), service.UID) + c.logger.Infof("Found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) } pods, err := c.listPods() @@ -100,7 +101,7 @@ func (c *Cluster) listResources() error { } for _, obj := range pods { - c.logger.Infof("Found pod: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) + c.logger.Infof("Found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } pvcs, err := c.listPersistentVolumeClaims() @@ -109,7 +110,7 @@ func (c *Cluster) listResources() error { } for _, obj := range pvcs { - c.logger.Infof("Found PVC: %s (uid: %s)", util.NameFromMeta(obj.ObjectMeta), obj.UID) + c.logger.Infof("Found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID) } return nil @@ -128,7 +129,7 @@ func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) { return nil, err } c.Statefulset = statefulSet - c.logger.Debugf("Created new statefulset '%s', uid: %s", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) + c.logger.Debugf("Created new statefulset %q, uid: %q", util.NameFromMeta(statefulSet.ObjectMeta), statefulSet.UID) return statefulSet, nil } @@ -143,15 +144,15 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { patchData, err := specPatch(newStatefulSet.Spec) if err != nil { - return fmt.Errorf("could not form patch for the statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not form patch for the statefulset %q: %v", statefulSetName, err) } statefulSet, err := c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch( c.Statefulset.Name, - api.MergePatchType, + types.MergePatchType, patchData, "") if err != nil { - return fmt.Errorf("could not patch statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err) } c.Statefulset = statefulSet @@ -171,9 +172,9 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error orphanDepencies := true oldStatefulset := c.Statefulset - options := v1.DeleteOptions{OrphanDependents: &orphanDepencies} + options := metav1.DeleteOptions{OrphanDependents: &orphanDepencies} if err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Delete(oldStatefulset.Name, &options); err != nil { - return fmt.Errorf("could not delete statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not delete statefulset %q: %v", statefulSetName, err) } // make sure we clear the stored statefulset status if the subsequent create fails. c.Statefulset = nil @@ -182,7 +183,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error err := retryutil.Retry(constants.StatefulsetDeletionInterval, constants.StatefulsetDeletionTimeout, func() (bool, error) { - _, err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name) + _, err := c.KubeClient.StatefulSets(oldStatefulset.Namespace).Get(oldStatefulset.Name, metav1.GetOptions{}) return err != nil, nil }) @@ -193,7 +194,7 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error // create the new statefulset with the desired spec. It would take over the remaining pods. createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet) if err != nil { - return fmt.Errorf("could not create statefulset '%s': %v", statefulSetName, err) + return fmt.Errorf("could not create statefulset %q: %v", statefulSetName, err) } // check that all the previous replicas were picked up. if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas && @@ -215,7 +216,7 @@ func (c *Cluster) deleteStatefulSet() error { if err != nil { return err } - c.logger.Infof("statefulset '%s' has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) + c.logger.Infof("statefulset %q has been deleted", util.NameFromMeta(c.Statefulset.ObjectMeta)) c.Statefulset = nil if err := c.deletePods(); err != nil { @@ -262,19 +263,19 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error if role == Master { // for the master service we need to re-create the endpoint as well. Get the up-to-date version of // the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) - currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name) + currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("could not get current cluster endpoints: %v", err) } } err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions) if err != nil { - return fmt.Errorf("could not delete service '%s': '%v'", serviceName, err) + return fmt.Errorf("could not delete service %q: %v", serviceName, err) } c.Endpoint = nil svc, err := c.KubeClient.Services(newService.Namespace).Create(newService) if err != nil { - return fmt.Errorf("could not create service '%s': '%v'", serviceName, err) + return fmt.Errorf("could not create service %q: %v", serviceName, err) } c.Service[role] = svc if role == Master { @@ -282,7 +283,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) if err != nil { - return fmt.Errorf("could not create endpoint '%s': '%v'", endpointName, err) + return fmt.Errorf("could not create endpoint %q: %v", endpointName, err) } c.Endpoint = ep } @@ -294,25 +295,25 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error _, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( c.Service[role].Name, - api.StrategicMergePatchType, + types.StrategicMergePatchType, []byte(annotationsPatchData), "") if err != nil { - return fmt.Errorf("could not replace annotations for the service '%s': %v", serviceName, err) + return fmt.Errorf("could not replace annotations for the service %q: %v", serviceName, err) } } patchData, err := specPatch(newService.Spec) if err != nil { - return fmt.Errorf("could not form patch for the service '%s': %v", serviceName, err) + return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err) } svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( c.Service[role].Name, - api.MergePatchType, + types.MergePatchType, patchData, "") if err != nil { - return fmt.Errorf("could not patch service '%s': %v", serviceName, err) + return fmt.Errorf("could not patch service %q: %v", serviceName, err) } c.Service[role] = svc @@ -329,7 +330,7 @@ func (c *Cluster) deleteService(role PostgresRole) error { if err != nil { return err } - c.logger.Infof("%s service '%s' has been deleted", role, util.NameFromMeta(service.ObjectMeta)) + c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta)) c.Service[role] = nil return nil } @@ -358,7 +359,7 @@ func (c *Cluster) deleteEndpoint() error { if err != nil { return err } - c.logger.Infof("endpoint '%s' has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) + c.logger.Infof("endpoint %q has been deleted", util.NameFromMeta(c.Endpoint.ObjectMeta)) c.Endpoint = nil return nil @@ -371,11 +372,11 @@ func (c *Cluster) applySecrets() error { secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec) if k8sutil.ResourceAlreadyExists(err) { var userMap map[string]spec.PgUser - curSecret, err := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name) + curSecret, err := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("could not get current secret: %v", err) } - c.logger.Debugf("secret '%s' already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) + c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta)) if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name { secretUsername = constants.SuperuserKeyName userMap = c.systemUsers @@ -392,10 +393,10 @@ func (c *Cluster) applySecrets() error { continue } else { if err != nil { - return fmt.Errorf("could not create secret for user '%s': %v", secretUsername, err) + return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) } c.Secrets[secret.UID] = secret - c.logger.Debugf("Created new secret '%s', uid: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID) + c.logger.Debugf("Created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) } } @@ -403,12 +404,12 @@ func (c *Cluster) applySecrets() error { } func (c *Cluster) deleteSecret(secret *v1.Secret) error { - c.logger.Debugf("Deleting secret '%s'", util.NameFromMeta(secret.ObjectMeta)) + c.logger.Debugf("Deleting secret %q", util.NameFromMeta(secret.ObjectMeta)) err := c.KubeClient.Secrets(secret.Namespace).Delete(secret.Name, c.deleteOptions) if err != nil { return err } - c.logger.Infof("secret '%s' has been deleted", util.NameFromMeta(secret.ObjectMeta)) + c.logger.Infof("secret %q has been deleted", util.NameFromMeta(secret.ObjectMeta)) delete(c.Secrets, secret.UID) return err diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 625eb34f5..2b796b511 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -95,7 +95,7 @@ func (c *Cluster) syncService(role PostgresRole) error { if err != nil { return fmt.Errorf("could not create missing %s service: %v", role, err) } - c.logger.Infof("Created missing %s service '%s'", role, util.NameFromMeta(svc.ObjectMeta)) + c.logger.Infof("Created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) return nil } @@ -110,7 +110,7 @@ func (c *Cluster) syncService(role PostgresRole) error { if err := c.updateService(role, desiredSvc); err != nil { return fmt.Errorf("could not update %s service to match desired state: %v", role, err) } - c.logger.Infof("%s service '%s' is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) + c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) return nil } @@ -122,7 +122,7 @@ func (c *Cluster) syncEndpoint() error { if err != nil { return fmt.Errorf("could not create missing endpoint: %v", err) } - c.logger.Infof("Created missing endpoint '%s'", util.NameFromMeta(ep.ObjectMeta)) + c.logger.Infof("Created missing endpoint %q", util.NameFromMeta(ep.ObjectMeta)) return nil } @@ -151,7 +151,7 @@ func (c *Cluster) syncStatefulSet() error { if err != nil { return fmt.Errorf("cluster is not ready: %v", err) } - c.logger.Infof("Created missing statefulset '%s'", util.NameFromMeta(ss.ObjectMeta)) + c.logger.Infof("Created missing statefulset %q", util.NameFromMeta(ss.ObjectMeta)) if !rollUpdate { return nil } diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 31d46cc42..58f92fac1 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -6,9 +6,10 @@ import ( "strings" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/apps/v1beta1" - "k8s.io/client-go/pkg/labels" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -76,11 +77,11 @@ func metadataAnnotationsPatch(annotations map[string]string) string { func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate bool, reasons []string) { if isUpdate { - c.logger.Infof("statefulset '%s' has been changed", + c.logger.Infof("statefulset %q has been changed", util.NameFromMeta(old.ObjectMeta), ) } else { - c.logger.Infof("statefulset '%s' is not in the desired state and needs to be updated", + c.logger.Infof("statefulset %q is not in the desired state and needs to be updated", util.NameFromMeta(old.ObjectMeta), ) } @@ -88,18 +89,18 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate if len(reasons) > 0 { for _, reason := range reasons { - c.logger.Infof("Reason: %s", reason) + c.logger.Infof("Reason: %q", reason) } } } func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) { if isUpdate { - c.logger.Infof("%s service '%s' has been changed", + c.logger.Infof("%s service %q has been changed", role, util.NameFromMeta(old.ObjectMeta), ) } else { - c.logger.Infof("%s service '%s is not in the desired state and needs to be updated", + c.logger.Infof("%s service %q is not in the desired state and needs to be updated", role, util.NameFromMeta(old.ObjectMeta), ) } @@ -123,10 +124,10 @@ func (c *Cluster) getOAuthToken() (string, error) { // Temporary getting postgresql-operator secret from the NamespaceDefault credentialsSecret, err := c.KubeClient. Secrets(c.OpConfig.OAuthTokenSecretName.Namespace). - Get(c.OpConfig.OAuthTokenSecretName.Name) + Get(c.OpConfig.OAuthTokenSecretName.Name, metav1.GetOptions{}) if err != nil { - c.logger.Debugf("Oauth token secret name: %s", c.OpConfig.OAuthTokenSecretName) + c.logger.Debugf("Oauth token secret name: %q", c.OpConfig.OAuthTokenSecretName) return "", fmt.Errorf("could not get credentials secret: %v", err) } data := credentialsSecret.Data @@ -152,7 +153,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) { return []string{}, fmt.Errorf("could not get oauth token: %v", err) } - teamInfo, err := c.TeamsAPIClient.TeamInfo(c.Spec.TeamID, token) + teamInfo, err := c.teamsAPIClient.TeamInfo(c.Spec.TeamID, token) if err != nil { return nil, fmt.Errorf("could not get team info: %v", err) } @@ -193,10 +194,10 @@ func (c *Cluster) waitForPodDeletion(podEvents chan spec.PodEvent) error { func (c *Cluster) waitStatefulsetReady() error { return retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { - listOptions := v1.ListOptions{ + listOptions := metav1.ListOptions{ LabelSelector: c.labelsSet().String(), } - ss, err := c.KubeClient.StatefulSets(c.Metadata.Namespace).List(listOptions) + ss, err := c.KubeClient.StatefulSets(c.Namespace).List(listOptions) if err != nil { return false, err } @@ -211,17 +212,17 @@ func (c *Cluster) waitStatefulsetReady() error { func (c *Cluster) waitPodLabelsReady() error { ls := c.labelsSet() - namespace := c.Metadata.Namespace + namespace := c.Namespace - listOptions := v1.ListOptions{ + listOptions := metav1.ListOptions{ LabelSelector: ls.String(), } - masterListOption := v1.ListOptions{ + masterListOption := metav1.ListOptions{ LabelSelector: labels.Merge(ls, labels.Set{ c.OpConfig.PodRoleLabel: constants.PodRoleMaster, }).String(), } - replicaListOption := v1.ListOptions{ + replicaListOption := metav1.ListOptions{ LabelSelector: labels.Merge(ls, labels.Set{ c.OpConfig.PodRoleLabel: constants.PodRoleReplica, }).String(), @@ -277,7 +278,7 @@ func (c *Cluster) labelsSet() labels.Set { for k, v := range c.OpConfig.ClusterLabels { lbls[k] = v } - lbls[c.OpConfig.ClusterNameLabel] = c.Metadata.Name + lbls[c.OpConfig.ClusterNameLabel] = c.Name return labels.Set(lbls) } @@ -307,7 +308,7 @@ func (c *Cluster) credentialSecretName(username string) string { // and must start and end with an alphanumeric character return fmt.Sprintf(constants.UserSecretTemplate, strings.Replace(username, "_", "-", -1), - c.Metadata.Name) + c.Name) } func (c *Cluster) podSpiloRole(pod *v1.Pod) string { diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index 920004472..d2ed26f90 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -5,7 +5,8 @@ import ( "strconv" "strings" - "k8s.io/client-go/pkg/api/resource" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" @@ -16,8 +17,8 @@ import ( ) func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, error) { - ns := c.Metadata.Namespace - listOptions := v1.ListOptions{ + ns := c.Namespace + listOptions := metav1.ListOptions{ LabelSelector: c.labelsSet().String(), } @@ -35,7 +36,7 @@ func (c *Cluster) deletePersistenVolumeClaims() error { return err } for _, pvc := range pvcs { - c.logger.Debugf("Deleting PVC '%s'", util.NameFromMeta(pvc.ObjectMeta)) + c.logger.Debugf("Deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta)) if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, c.deleteOptions); err != nil { c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err) } @@ -62,14 +63,14 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { if lastDash > 0 && lastDash < len(pvc.Name)-1 { pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]) if err != nil { - return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %s to a number", pvc.Name) + return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %q to a number", pvc.Name) } if int32(pvcNumber) > lastPodIndex { - c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) + c.logger.Debugf("Skipping persistent volume %q corresponding to a non-running pods", pvc.Name) continue } } - pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName) + pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("could not get PersistentVolume: %v", err) } @@ -118,22 +119,22 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume if err != nil { return err } - c.logger.Debugf("updating persistent volume %s to %d", pv.Name, newSize) + c.logger.Debugf("updating persistent volume %q to %d", pv.Name, newSize) if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil { - return fmt.Errorf("could not resize EBS volume %s: %v", awsVolumeId, err) + return fmt.Errorf("could not resize EBS volume %q: %v", awsVolumeId, err) } - c.logger.Debugf("resizing the filesystem on the volume %s", pv.Name) + c.logger.Debugf("resizing the filesystem on the volume %q", pv.Name) podName := getPodNameFromPersistentVolume(pv) if err := c.resizePostgresFilesystem(podName, []filesystems.FilesystemResizer{&filesystems.Ext234Resize{}}); err != nil { - return fmt.Errorf("could not resize the filesystem on pod '%s': %v", podName, err) + return fmt.Errorf("could not resize the filesystem on pod %q: %v", podName, err) } - c.logger.Debugf("filesystem resize successful on volume %s", pv.Name) + c.logger.Debugf("filesystem resize successful on volume %q", pv.Name) pv.Spec.Capacity[v1.ResourceStorage] = newQuantity - c.logger.Debugf("updating persistent volume definition for volume %s", pv.Name) + c.logger.Debugf("updating persistent volume definition for volume %q", pv.Name) if _, err := c.KubeClient.PersistentVolumes().Update(pv); err != nil { - return fmt.Errorf("could not update persistent volume: %s", err) + return fmt.Errorf("could not update persistent volume: %q", err) } - c.logger.Debugf("successfully updated persistent volume %s", pv.Name) + c.logger.Debugf("successfully updated persistent volume %q", pv.Name) } } if len(pvs) > 0 && totalCompatible == 0 { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index dc62504d3..398ee086e 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/Sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -14,22 +15,25 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" - "github.com/zalando-incubator/postgres-operator/pkg/util/teams" ) type Config struct { RestConfig *rest.Config - KubeClient k8sutil.KubernetesClient - RestClient *rest.RESTClient - TeamsAPIClient *teams.API InfrastructureRoles map[string]spec.PgUser + + NoDatabaseAccess bool + NoTeamsAPI bool + ConfigMapName spec.NamespacedName + Namespace string } type Controller struct { - Config - + config Config opConfig *config.Config - logger *logrus.Entry + + logger *logrus.Entry + KubeClient k8sutil.KubernetesClient + RestClient rest.Interface // kubernetes API group REST client clustersMu sync.RWMutex clusters map[spec.NamespacedName]*cluster.Cluster @@ -39,23 +43,16 @@ type Controller struct { podInformer cache.SharedIndexInformer podCh chan spec.PodEvent - clusterEventQueues []*cache.FIFO - + clusterEventQueues []*cache.FIFO lastClusterSyncTime int64 } -func NewController(controllerConfig *Config, operatorConfig *config.Config) *Controller { +func NewController(controllerConfig *Config) *Controller { logger := logrus.New() - if operatorConfig.DebugLogging { - logger.Level = logrus.DebugLevel - } - - controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger) - return &Controller{ - Config: *controllerConfig, - opConfig: operatorConfig, + config: *controllerConfig, + opConfig: &config.Config{}, logger: logger.WithField("pkg", "controller"), clusters: make(map[spec.NamespacedName]*cluster.Cluster), stopChs: make(map[spec.NamespacedName]chan struct{}), @@ -63,6 +60,114 @@ func NewController(controllerConfig *Config, operatorConfig *config.Config) *Con } } +func (c *Controller) initClients() { + client, err := k8sutil.ClientSet(c.config.RestConfig) + if err != nil { + c.logger.Fatalf("couldn't create client: %v", err) + } + c.KubeClient = k8sutil.NewFromKubernetesInterface(client) + + c.RestClient, err = k8sutil.KubernetesRestClient(*c.config.RestConfig) + if err != nil { + c.logger.Fatalf("couldn't create rest client: %v", err) + } +} + +func (c *Controller) initOperatorConfig() { + configMapData := make(map[string]string) + + if c.config.ConfigMapName != (spec.NamespacedName{}) { + configMap, err := c.KubeClient.ConfigMaps(c.config.ConfigMapName.Namespace). + Get(c.config.ConfigMapName.Name, metav1.GetOptions{}) + if err != nil { + panic(err) + } + + configMapData = configMap.Data + } else { + c.logger.Infoln("No ConfigMap specified. Loading default values") + } + + if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var + configMapData["namespace"] = c.config.Namespace + } + if c.config.NoDatabaseAccess { + configMapData["enable_database_access"] = "false" + } + if c.config.NoTeamsAPI { + configMapData["enable_teams_api"] = "false" + } + + c.opConfig = config.NewFromMap(configMapData) +} + +func (c *Controller) initController() { + c.initClients() + c.initOperatorConfig() + + c.logger.Infof("Config: %s", c.opConfig.MustMarshal()) + + if c.opConfig.DebugLogging { + c.logger.Level = logrus.DebugLevel + } + + if err := c.createTPR(); err != nil { + c.logger.Fatalf("could not register ThirdPartyResource: %v", err) + } + + if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil { + c.logger.Warningf("could not get infrastructure roles: %v", err) + } else { + c.config.InfrastructureRoles = infraRoles + } + + // Postgresqls + c.postgresqlInformer = cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: c.clusterListFunc, + WatchFunc: c.clusterWatchFunc, + }, + &spec.Postgresql{}, + constants.QueueResyncPeriodTPR, + cache.Indexers{}) + + c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.postgresqlAdd, + UpdateFunc: c.postgresqlUpdate, + DeleteFunc: c.postgresqlDelete, + }) + + // Pods + podLw := &cache.ListWatch{ + ListFunc: c.podListFunc, + WatchFunc: c.podWatchFunc, + } + + c.podInformer = cache.NewSharedIndexInformer( + podLw, + &v1.Pod{}, + constants.QueueResyncPeriodPod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.podAdd, + UpdateFunc: c.podUpdate, + DeleteFunc: c.podDelete, + }) + + c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) + for i := range c.clusterEventQueues { + c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) { + e, ok := obj.(spec.ClusterEvent) + if !ok { + return "", fmt.Errorf("could not cast to ClusterEvent") + } + + return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil + }) + } +} + func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() wg.Add(1) @@ -78,69 +183,6 @@ func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { c.logger.Info("Started working in background") } -func (c *Controller) initController() { - if err := c.createTPR(); err != nil { - c.logger.Fatalf("could not register ThirdPartyResource: %v", err) - } - - if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil { - c.logger.Warningf("could not get infrastructure roles: %v", err) - } else { - c.InfrastructureRoles = infraRoles - } - - // Postgresqls - clusterLw := &cache.ListWatch{ - ListFunc: c.clusterListFunc, - WatchFunc: c.clusterWatchFunc, - } - c.postgresqlInformer = cache.NewSharedIndexInformer( - clusterLw, - &spec.Postgresql{}, - constants.QueueResyncPeriodTPR, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - - if err := c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.postgresqlAdd, - UpdateFunc: c.postgresqlUpdate, - DeleteFunc: c.postgresqlDelete, - }); err != nil { - c.logger.Fatalf("could not add event handlers: %v", err) - } - - // Pods - podLw := &cache.ListWatch{ - ListFunc: c.podListFunc, - WatchFunc: c.podWatchFunc, - } - - c.podInformer = cache.NewSharedIndexInformer( - podLw, - &v1.Pod{}, - constants.QueueResyncPeriodPod, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - - if err := c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.podAdd, - UpdateFunc: c.podUpdate, - DeleteFunc: c.podDelete, - }); err != nil { - c.logger.Fatalf("could not add event handlers: %v", err) - } - - c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers) - for i := range c.clusterEventQueues { - c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) { - e, ok := obj.(spec.ClusterEvent) - if !ok { - return "", fmt.Errorf("could not cast to ClusterEvent") - } - - return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil - }) - } -} - func (c *Controller) runInformers(stopCh <-chan struct{}) { go c.postgresqlInformer.Run(stopCh) go c.podInformer.Run(stopCh) diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index c3fb2a5e8..6575455db 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -1,27 +1,20 @@ package controller import ( - "k8s.io/client-go/pkg/api" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/pkg/runtime" - "k8s.io/client-go/pkg/watch" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" ) -func (c *Controller) podListFunc(options api.ListOptions) (runtime.Object, error) { +func (c *Controller) podListFunc(options metav1.ListOptions) (runtime.Object, error) { var labelSelector string var fieldSelector string - if options.LabelSelector != nil { - labelSelector = options.LabelSelector.String() - } - - if options.FieldSelector != nil { - fieldSelector = options.FieldSelector.String() - } - opts := v1.ListOptions{ + opts := metav1.ListOptions{ LabelSelector: labelSelector, FieldSelector: fieldSelector, Watch: options.Watch, @@ -32,19 +25,11 @@ func (c *Controller) podListFunc(options api.ListOptions) (runtime.Object, error return c.KubeClient.Pods(c.opConfig.Namespace).List(opts) } -func (c *Controller) podWatchFunc(options api.ListOptions) (watch.Interface, error) { +func (c *Controller) podWatchFunc(options metav1.ListOptions) (watch.Interface, error) { var labelSelector string var fieldSelector string - if options.LabelSelector != nil { - labelSelector = options.LabelSelector.String() - } - - if options.FieldSelector != nil { - fieldSelector = options.FieldSelector.String() - } - - opts := v1.ListOptions{ + opts := metav1.ListOptions{ LabelSelector: labelSelector, FieldSelector: fieldSelector, Watch: options.Watch, @@ -122,7 +107,7 @@ func (c *Controller) podEventsDispatcher(stopCh <-chan struct{}) { c.clustersMu.RUnlock() if ok { - c.logger.Debugf("Sending %s event of pod '%s' to the '%s' cluster channel", event.EventType, event.PodName, event.ClusterName) + c.logger.Debugf("Sending %q event of pod %q to the %q cluster channel", event.EventType, event.PodName, event.ClusterName) cluster.ReceivePodEvent(event) } case <-stopCh: diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 14997c37f..5b3d97080 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -1,17 +1,16 @@ package controller import ( + "encoding/json" "fmt" "reflect" "sync/atomic" "time" - "k8s.io/client-go/pkg/api" - "k8s.io/client-go/pkg/api/meta" - "k8s.io/client-go/pkg/fields" - "k8s.io/client-go/pkg/runtime" - "k8s.io/client-go/pkg/types" - "k8s.io/client-go/pkg/watch" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "github.com/zalando-incubator/postgres-operator/pkg/cluster" @@ -26,52 +25,43 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}) { for { select { case <-ticker.C: - c.clusterListFunc(api.ListOptions{ResourceVersion: "0"}) + c.clusterListFunc(metav1.ListOptions{ResourceVersion: "0"}) case <-stopCh: return } } } -func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) { - c.logger.Info("Getting list of currently running clusters") +func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) { + var list spec.PostgresqlList + var activeClustersCnt, failedClustersCnt int - req := c.RestClient.Get(). - RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). - VersionedParams(&options, api.ParameterCodec). - FieldsSelectorParam(fields.Everything()) - - object, err := req.Do().Get() + req := c.RestClient. + Get(). + Namespace(c.opConfig.Namespace). + Resource(constants.ResourceName). + VersionedParams(&options, metav1.ParameterCodec) + b, err := req.DoRaw() if err != nil { - return nil, fmt.Errorf("could not get list of postgresql objects: %v", err) - } - - objList, err := meta.ExtractList(object) - if err != nil { - return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err) + return nil, err } + err = json.Unmarshal(b, &list) if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { c.logger.Debugln("skipping resync of clusters") - return object, err + return &list, err } - var activeClustersCnt, failedClustersCnt int - for _, obj := range objList { - pg, ok := obj.(*spec.Postgresql) - if !ok { - return nil, fmt.Errorf("could not cast object to postgresql") - } - + for _, pg := range list.Items { if pg.Error != nil { failedClustersCnt++ continue } - c.queueClusterEvent(nil, pg, spec.EventSync) + c.queueClusterEvent(nil, &pg, spec.EventSync) activeClustersCnt++ } - if len(objList) > 0 { + if len(list.Items) > 0 { if failedClustersCnt > 0 && activeClustersCnt == 0 { c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) } else if failedClustersCnt == 0 && activeClustersCnt > 0 { @@ -85,15 +75,48 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) - return object, err + return &list, err } -func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) { - req := c.RestClient.Get(). - RequestURI(fmt.Sprintf(constants.WatchClustersURITemplate, c.opConfig.Namespace)). - VersionedParams(&options, api.ParameterCodec). - FieldsSelectorParam(fields.Everything()) - return req.Watch() +type tprDecoder struct { + dec *json.Decoder + close func() error +} + +func (d *tprDecoder) Close() { + d.close() +} + +func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { + var e struct { + Type watch.EventType + Object spec.Postgresql + } + if err := d.dec.Decode(&e); err != nil { + return watch.Error, nil, err + } + + return e.Type, &e.Object, nil +} + +func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interface, error) { + options.Watch = true + r, err := c.RestClient. + Get(). + Namespace(c.opConfig.Namespace). + Resource(constants.ResourceName). + VersionedParams(&options, metav1.ParameterCodec). + FieldsSelectorParam(nil). + Stream() + + if err != nil { + return nil, err + } + + return watch.NewStreamWatcher(&tprDecoder{ + dec: json.NewDecoder(r), + close: r.Close, + }), nil } func (c *Controller) processEvent(obj interface{}) error { @@ -106,9 +129,9 @@ func (c *Controller) processEvent(obj interface{}) error { logger := c.logger.WithField("worker", event.WorkerID) if event.EventType == spec.EventAdd || event.EventType == spec.EventSync { - clusterName = util.NameFromMeta(event.NewSpec.Metadata) + clusterName = util.NameFromMeta(event.NewSpec.ObjectMeta) } else { - clusterName = util.NameFromMeta(event.OldSpec.Metadata) + clusterName = util.NameFromMeta(event.OldSpec.ObjectMeta) } c.clustersMu.RLock() @@ -118,14 +141,14 @@ func (c *Controller) processEvent(obj interface{}) error { switch event.EventType { case spec.EventAdd: if clusterFound { - logger.Debugf("Cluster '%s' already exists", clusterName) + logger.Debugf("Cluster %q already exists", clusterName) return nil } - logger.Infof("Creation of the '%s' cluster started", clusterName) + logger.Infof("Creation of the %q cluster started", clusterName) stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) cl.Run(stopCh) c.clustersMu.Lock() @@ -140,31 +163,31 @@ func (c *Controller) processEvent(obj interface{}) error { return nil } - logger.Infof("Cluster '%s' has been created", clusterName) + logger.Infof("Cluster %q has been created", clusterName) case spec.EventUpdate: - logger.Infof("Update of the '%s' cluster started", clusterName) + logger.Infof("Update of the %q cluster started", clusterName) if !clusterFound { - logger.Warnf("Cluster '%s' does not exist", clusterName) + logger.Warnf("Cluster %q does not exist", clusterName) return nil } if err := cl.Update(event.NewSpec); err != nil { - cl.Error = fmt.Errorf("could not update cluster: %s", err) + cl.Error = fmt.Errorf("could not update cluster: %v", err) logger.Errorf("%v", cl.Error) return nil } cl.Error = nil - logger.Infof("Cluster '%s' has been updated", clusterName) + logger.Infof("Cluster %q has been updated", clusterName) case spec.EventDelete: - logger.Infof("Deletion of the '%s' cluster started", clusterName) + logger.Infof("Deletion of the %q cluster started", clusterName) if !clusterFound { - logger.Errorf("Unknown cluster: %s", clusterName) + logger.Errorf("Unknown cluster: %q", clusterName) return nil } if err := cl.Delete(); err != nil { - logger.Errorf("could not delete cluster '%s': %s", clusterName, err) + logger.Errorf("could not delete cluster %q: %v", clusterName, err) return nil } close(c.stopChs[clusterName]) @@ -174,14 +197,14 @@ func (c *Controller) processEvent(obj interface{}) error { delete(c.stopChs, clusterName) c.clustersMu.Unlock() - logger.Infof("Cluster '%s' has been deleted", clusterName) + logger.Infof("Cluster %q has been deleted", clusterName) case spec.EventSync: - logger.Infof("Syncing of the '%s' cluster started", clusterName) + logger.Infof("Syncing of the %q cluster started", clusterName) // no race condition because a cluster is always processed by single worker if !clusterFound { stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) cl.Run(stopCh) c.clustersMu.Lock() @@ -191,13 +214,13 @@ func (c *Controller) processEvent(obj interface{}) error { } if err := cl.Sync(); err != nil { - cl.Error = fmt.Errorf("could not sync cluster '%s': %v", clusterName, err) + cl.Error = fmt.Errorf("could not sync cluster %q: %v", clusterName, err) logger.Errorf("%v", cl.Error) return nil } cl.Error = nil - logger.Infof("Cluster '%s' has been synced", clusterName) + logger.Infof("Cluster %q has been synced", clusterName) } return nil @@ -219,8 +242,8 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec ) if old != nil { //update, delete - uid = old.Metadata.GetUID() - clusterName = util.NameFromMeta(old.Metadata) + uid = old.GetUID() + clusterName = util.NameFromMeta(old.ObjectMeta) if eventType == spec.EventUpdate && new.Error == nil && old.Error != nil { eventType = spec.EventSync clusterError = new.Error @@ -228,13 +251,13 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec clusterError = old.Error } } else { //add, sync - uid = new.Metadata.GetUID() - clusterName = util.NameFromMeta(new.Metadata) + uid = new.GetUID() + clusterName = util.NameFromMeta(new.ObjectMeta) clusterError = new.Error } if clusterError != nil && eventType != spec.EventDelete { - c.logger.Debugf("Skipping %s event for invalid cluster %s (reason: %v)", eventType, clusterName, clusterError) + c.logger.Debugf("Skipping %q event for invalid cluster %q (reason: %v)", eventType, clusterName, clusterError) return } @@ -251,7 +274,7 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil { c.logger.WithField("worker", workerID).Errorf("error when queueing cluster event: %v", clusterEvent) } - c.logger.WithField("worker", workerID).Infof("%s of the '%s' cluster has been queued", eventType, clusterName) + c.logger.WithField("worker", workerID).Infof("%q of the %q cluster has been queued", eventType, clusterName) } func (c *Controller) postgresqlAdd(obj interface{}) { @@ -274,7 +297,7 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { if !ok { c.logger.Errorf("could not cast to postgresql spec") } - if pgOld.Metadata.ResourceVersion == pgNew.Metadata.ResourceVersion { + if pgOld.ResourceVersion == pgNew.ResourceVersion { return } if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { diff --git a/pkg/controller/util.go b/pkg/controller/util.go index d7ea55e3a..f02f27955 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -4,6 +4,7 @@ import ( "fmt" "hash/crc32" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1" @@ -16,15 +17,12 @@ import ( func (c *Controller) makeClusterConfig() cluster.Config { infrastructureRoles := make(map[string]spec.PgUser) - for k, v := range c.InfrastructureRoles { + for k, v := range c.config.InfrastructureRoles { infrastructureRoles[k] = v } return cluster.Config{ - KubeClient: c.KubeClient, - RestClient: c.RestClient, - RestConfig: c.RestConfig, - TeamsAPIClient: c.TeamsAPIClient, + RestConfig: c.config.RestConfig, OpConfig: config.Copy(c.opConfig), InfrastructureRoles: infrastructureRoles, } @@ -32,7 +30,7 @@ func (c *Controller) makeClusterConfig() cluster.Config { func thirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource { return &extv1beta.ThirdPartyResource{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ //ThirdPartyResources are cluster-wide Name: TPRName, }, @@ -48,17 +46,16 @@ func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 { } func (c *Controller) createTPR() error { - TPRName := fmt.Sprintf("%s.%s", constants.TPRName, constants.TPRVendor) - tpr := thirdPartyResource(TPRName) + tpr := thirdPartyResource(constants.TPRName) _, err := c.KubeClient.ThirdPartyResources().Create(tpr) if err != nil { if !k8sutil.ResourceAlreadyExists(err) { return err } - c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName) + c.logger.Infof("ThirdPartyResource %q is already registered", constants.TPRName) } else { - c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName) + c.logger.Infof("ThirdPartyResource %q' has been registered", constants.TPRName) } return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) @@ -72,9 +69,9 @@ func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (r infraRolesSecret, err := c.KubeClient. Secrets(rolesSecret.Namespace). - Get(rolesSecret.Name) + Get(rolesSecret.Name, metav1.GetOptions{}) if err != nil { - c.logger.Debugf("Infrastructure roles secret name: %s", *rolesSecret) + c.logger.Debugf("Infrastructure roles secret name: %q", *rolesSecret) return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err) } @@ -102,7 +99,7 @@ Users: case "inrole": t.MemberOf = append(t.MemberOf, s) default: - c.logger.Warnf("Unknown key %s", p) + c.logger.Warnf("Unknown key %q", p) } } } diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go index bd7d7d049..88c51258e 100644 --- a/pkg/controller/util_test.go +++ b/pkg/controller/util_test.go @@ -5,11 +5,11 @@ import ( "reflect" "testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/pkg/api/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" - "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) @@ -21,7 +21,7 @@ type mockSecret struct { v1core.SecretInterface } -func (c *mockSecret) Get(name string) (*v1.Secret, error) { +func (c *mockSecret) Get(name string, options metav1.GetOptions) (*v1.Secret, error) { if name != testInfrastructureRolesSecretName { return nil, fmt.Errorf("NotFound") } @@ -48,7 +48,7 @@ func newMockKubernetesClient() k8sutil.KubernetesClient { } func newMockController() *Controller { - controller := NewController(&Config{}, &config.Config{}) + controller := NewController(&Config{}) controller.opConfig.ClusterNameLabel = "cluster-name" controller.opConfig.InfrastructureRolesSecretName = spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName} @@ -70,7 +70,7 @@ func TestPodClusterName(t *testing.T) { }, { &v1.Pod{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Namespace: v1.NamespaceDefault, Labels: map[string]string{ mockController.opConfig.ClusterNameLabel: "testcluster", diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index 3649eff50..8598c66cd 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -6,9 +6,7 @@ import ( "strings" "time" - "k8s.io/client-go/pkg/api/meta" - "k8s.io/client-go/pkg/api/unversioned" - "k8s.io/client-go/pkg/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // MaintenanceWindow describes the time window when the operator is allowed to do maintenance on a cluster. @@ -71,8 +69,8 @@ const ( // Postgresql defines PostgreSQL Third Party (resource) Object. type Postgresql struct { - unversioned.TypeMeta `json:",inline"` - Metadata v1.ObjectMeta `json:"metadata"` + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` Spec PostgresSpec `json:"spec"` Status PostgresStatus `json:"status,omitempty"` @@ -88,7 +86,7 @@ type PostgresSpec struct { TeamID string `json:"teamId"` AllowedSourceRanges []string `json:"allowedSourceRanges"` - // EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omited from the manifest + // EnableLoadBalancer is a pointer, since it is importat to know if that parameters is omitted from the manifest UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"` ReplicaLoadBalancer bool `json:"replicaLoadBalancer,omitempty"` NumberOfInstances int32 `json:"numberOfInstances"` @@ -99,8 +97,8 @@ type PostgresSpec struct { // PostgresqlList defines a list of PostgreSQL clusters. type PostgresqlList struct { - unversioned.TypeMeta `json:",inline"` - Metadata unversioned.ListMeta `json:"metadata"` + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` Items []Postgresql `json:"items"` } @@ -191,21 +189,6 @@ func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error { return nil } -// GetObject implements Object interface for PostgreSQL TPR spec object. -func (p *Postgresql) GetObjectKind() unversioned.ObjectKind { - return &p.TypeMeta -} - -// GetObjectMeta implements ObjectMetaAccessor interface for PostgreSQL TPR spec object. -func (p *Postgresql) GetObjectMeta() meta.Object { - return &p.Metadata -} - -// GetListMeta implements ListMetaAccessor interface for PostgreSQL TPR List spec object. -func (pl *PostgresqlList) GetListMeta() unversioned.List { - return &pl.Metadata -} - func extractClusterName(clusterName string, teamName string) (string, error) { teamNameLen := len(teamName) if len(clusterName) < teamNameLen+2 { @@ -223,10 +206,6 @@ func extractClusterName(clusterName string, teamName string) (string, error) { return clusterName[teamNameLen+1:], nil } -// The code below is used only to work around a known problem with third-party -// resources and ugorji. If/when these issues are resolved, the code below -// should no longer be required. -// type postgresqlListCopy PostgresqlList type postgresqlCopy Postgresql @@ -236,7 +215,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { err := json.Unmarshal(data, &tmp) if err != nil { - metaErr := json.Unmarshal(data, &tmp.Metadata) + metaErr := json.Unmarshal(data, &tmp.ObjectMeta) if metaErr != nil { return err } @@ -250,7 +229,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { } tmp2 := Postgresql(tmp) - clusterName, err := extractClusterName(tmp2.Metadata.Name, tmp2.Spec.TeamID) + clusterName, err := extractClusterName(tmp2.ObjectMeta.Name, tmp2.Spec.TeamID) if err == nil { tmp2.Spec.ClusterName = clusterName } else { diff --git a/pkg/spec/postgresql_test.go b/pkg/spec/postgresql_test.go index d067b8d55..7c67bf0a9 100644 --- a/pkg/spec/postgresql_test.go +++ b/pkg/spec/postgresql_test.go @@ -8,8 +8,7 @@ import ( "testing" "time" - "k8s.io/client-go/pkg/api/unversioned" - "k8s.io/client-go/pkg/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var parseTimeTests = []struct { @@ -104,11 +103,11 @@ var unmarshalCluster = []struct { "kind": "Postgresql","apiVersion": "acid.zalan.do/v1", "metadata": {"name": "acid-testcluster1"}, "spec": {"teamId": 100}}`), Postgresql{ - TypeMeta: unversioned.TypeMeta{ + TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", }, - Metadata: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "acid-testcluster1", }, Status: ClusterStatusInvalid, @@ -184,11 +183,11 @@ var unmarshalCluster = []struct { } }`), Postgresql{ - TypeMeta: unversioned.TypeMeta{ + TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", }, - Metadata: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "acid-testcluster1", }, Spec: PostgresSpec{ @@ -250,11 +249,11 @@ var unmarshalCluster = []struct { { []byte(`{"kind": "Postgresql","apiVersion": "acid.zalan.do/v1","metadata": {"name": "teapot-testcluster1"}, "spec": {"teamId": "acid"}}`), Postgresql{ - TypeMeta: unversioned.TypeMeta{ + TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", }, - Metadata: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "teapot-testcluster1", }, Spec: PostgresSpec{TeamID: "acid"}, @@ -278,16 +277,16 @@ var postgresqlList = []struct { }{ {[]byte(`{"apiVersion":"v1","items":[{"apiVersion":"acid.zalan.do/v1","kind":"Postgresql","metadata":{"labels":{"team":"acid"},"name":"acid-testcluster42","namespace":"default","resourceVersion":"30446957","selfLink":"/apis/acid.zalan.do/v1/namespaces/default/postgresqls/acid-testcluster42","uid":"857cd208-33dc-11e7-b20a-0699041e4b03"},"spec":{"allowedSourceRanges":["185.85.220.0/22"],"numberOfInstances":1,"postgresql":{"version":"9.6"},"teamId":"acid","volume":{"size":"10Gi"}},"status":"Running"}],"kind":"List","metadata":{},"resourceVersion":"","selfLink":""}`), PostgresqlList{ - TypeMeta: unversioned.TypeMeta{ + TypeMeta: metav1.TypeMeta{ Kind: "List", APIVersion: "v1", }, Items: []Postgresql{{ - TypeMeta: unversioned.TypeMeta{ + TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", APIVersion: "acid.zalan.do/v1", }, - Metadata: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: "acid-testcluster42", Namespace: "default", Labels: map[string]string{"team": "acid"}, @@ -363,7 +362,7 @@ func TestClusterName(t *testing.T) { continue } if name != tt.clusterName { - t.Errorf("Expected cluserName: %s, got: %s", tt.clusterName, name) + t.Errorf("Expected cluserName: %q, got: %q", tt.clusterName, name) } } } @@ -400,7 +399,7 @@ func TestMarshalMaintenanceWindow(t *testing.T) { } if !bytes.Equal(s, tt.in) { - t.Errorf("Expected Marshal: %s, got: %s", string(tt.in), string(s)) + t.Errorf("Expected Marshal: %q, got: %q", string(tt.in), string(s)) } } } @@ -435,7 +434,7 @@ func TestMarshal(t *testing.T) { continue } if !bytes.Equal(m, tt.marshal) { - t.Errorf("Marshal Postgresql expected: %s, got: %s", string(tt.marshal), string(m)) + t.Errorf("Marshal Postgresql expected: %q, got: %q", string(tt.marshal), string(m)) } } } @@ -446,8 +445,8 @@ func TestPostgresMeta(t *testing.T) { t.Errorf("GetObjectKindMeta expected: %v, got: %v", tt.out.TypeMeta, a) } - if a := tt.out.GetObjectMeta(); reflect.DeepEqual(a, tt.out.Metadata) { - t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.Metadata, a) + if a := tt.out.GetObjectMeta(); reflect.DeepEqual(a, tt.out.ObjectMeta) { + t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.ObjectMeta, a) } } } @@ -476,8 +475,8 @@ func TestPostgresListMeta(t *testing.T) { t.Errorf("GetObjectKindMeta expected: %v, got: %v", tt.out.TypeMeta, a) } - if a := tt.out.GetListMeta(); reflect.DeepEqual(a, tt.out.Metadata) { - t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.Metadata, a) + if a := tt.out.GetListMeta(); reflect.DeepEqual(a, tt.out.ListMeta) { + t.Errorf("GetObjectMeta expected: %v, got: %v", tt.out.ListMeta, a) } return diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 822395ce9..1a43cde05 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -1,12 +1,12 @@ package spec import ( + "database/sql" "fmt" "strings" - "database/sql" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/pkg/types" ) // EvenType contains type of the events for the TPRs and Pods received from Kubernetes diff --git a/pkg/spec/types_test.go b/pkg/spec/types_test.go index b690586e9..d0368268a 100644 --- a/pkg/spec/types_test.go +++ b/pkg/spec/types_test.go @@ -49,7 +49,7 @@ func TestNamespacedNameError(t *testing.T) { var actual NamespacedName err := actual.Decode(tt) if err == nil { - t.Errorf("Error expected for '%s', got: %#v", tt, actual) + t.Errorf("Error expected for %q, got: %#v", tt, actual) } } } diff --git a/pkg/util/constants/kubernetes.go b/pkg/util/constants/kubernetes.go index 3a56aa35a..79c60cad2 100644 --- a/pkg/util/constants/kubernetes.go +++ b/pkg/util/constants/kubernetes.go @@ -4,10 +4,7 @@ import "time" // General kubernetes-related constants const ( - ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace - WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace - K8sVersion = "v1" - K8sAPIPath = "/api" + K8sAPIPath = "/apis" StatefulsetDeletionInterval = 1 * time.Second StatefulsetDeletionTimeout = 30 * time.Second diff --git a/pkg/util/constants/roles.go b/pkg/util/constants/roles.go index 85fb42b1b..9f584c370 100644 --- a/pkg/util/constants/roles.go +++ b/pkg/util/constants/roles.go @@ -2,7 +2,7 @@ package constants const ( PasswordLength = 64 - UserSecretTemplate = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName + UserSecretTemplate = "%s.%s.credentials." + TPRKind + "." + TPRGroup // Username, ClusterName SuperuserKeyName = "superuser" ReplicationUserKeyName = "replication" RoleFlagSuperuser = "SUPERUSER" diff --git a/pkg/util/constants/thirdpartyresource.go b/pkg/util/constants/thirdpartyresource.go index 7207b4583..a0a00d259 100644 --- a/pkg/util/constants/thirdpartyresource.go +++ b/pkg/util/constants/thirdpartyresource.go @@ -2,9 +2,10 @@ package constants // Different properties of the PostgreSQL Third Party Resources const ( - TPRName = "postgresql" - TPRVendor = "acid.zalan.do" + TPRKind = "postgresql" + TPRGroup = "acid.zalan.do" TPRDescription = "Managed PostgreSQL clusters" TPRApiVersion = "v1" - ResourceName = TPRName + "s" + TPRName = TPRKind + "." + TPRGroup + ResourceName = TPRKind + "s" ) diff --git a/pkg/util/filesystems/ext234.go b/pkg/util/filesystems/ext234.go index ceea73984..fc2943d46 100644 --- a/pkg/util/filesystems/ext234.go +++ b/pkg/util/filesystems/ext234.go @@ -37,5 +37,5 @@ func (c *Ext234Resize) ResizeFilesystem(deviceName string, commandExecutor func( (strings.Contains(out, "on-line resizing required") && ext2fsSuccessRegexp.MatchString(out)) { return nil } - return fmt.Errorf("unrecognized output: %s, assuming error", out) + return fmt.Errorf("unrecognized output: %q, assuming error", out) } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 981083cb9..09ad38444 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -1,22 +1,19 @@ package k8sutil import ( - "fmt" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/kubernetes" v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" v1core "k8s.io/client-go/kubernetes/typed/core/v1" extensions "k8s.io/client-go/kubernetes/typed/extensions/v1beta1" "k8s.io/client-go/pkg/api" - apierrors "k8s.io/client-go/pkg/api/errors" - "k8s.io/client-go/pkg/api/unversioned" - "k8s.io/client-go/pkg/runtime" - "k8s.io/client-go/pkg/runtime/serializer" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) @@ -31,6 +28,7 @@ type KubernetesClient struct { v1core.ConfigMapsGetter v1beta1.StatefulSetsGetter extensions.ThirdPartyResourcesGetter + RESTClient rest.Interface } func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { @@ -44,6 +42,7 @@ func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { c.PersistentVolumesGetter = src.CoreV1() c.StatefulSetsGetter = src.AppsV1beta1() c.ThirdPartyResourcesGetter = src.ExtensionsV1beta1() + c.RESTClient = src.CoreV1().RESTClient() return } @@ -51,6 +50,7 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { if outOfCluster { return clientcmd.BuildConfigFromFlags("", kubeConfig) } + return rest.InClusterConfig() } @@ -66,35 +66,24 @@ func ResourceNotFound(err error) bool { return apierrors.IsNotFound(err) } -func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) { - c.GroupVersion = &unversioned.GroupVersion{Version: constants.K8sVersion} - c.APIPath = constants.K8sAPIPath - c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} - - schemeBuilder := runtime.NewSchemeBuilder( - func(scheme *runtime.Scheme) error { - scheme.AddKnownTypes( - unversioned.GroupVersion{ - Group: constants.TPRVendor, - Version: constants.TPRApiVersion, - }, - &spec.Postgresql{}, - &spec.PostgresqlList{}, - &api.ListOptions{}, - &api.DeleteOptions{}, - ) - return nil - }) - if err := schemeBuilder.AddToScheme(api.Scheme); err != nil { - return nil, fmt.Errorf("could not apply functions to register PostgreSQL TPR type: %v", err) +func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) { + cfg.GroupVersion = &schema.GroupVersion{ + Group: constants.TPRGroup, + Version: constants.TPRApiVersion, } + cfg.APIPath = constants.K8sAPIPath + cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} - return rest.RESTClientFor(c) + return rest.RESTClientFor(&cfg) } func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { return retryutil.Retry(interval, timeout, func() (bool, error) { - _, err := restclient.Get().RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, ns)).DoRaw() + _, err := restclient. + Get(). + Namespace(ns). + Resource(constants.ResourceName). + DoRaw() if err != nil { if ResourceNotFound(err) { // not set up yet. wait more. return false, nil diff --git a/pkg/util/users/users.go b/pkg/util/users/users.go index 8ff32d305..1b5f5966f 100644 --- a/pkg/util/users/users.go +++ b/pkg/util/users/users.go @@ -66,11 +66,11 @@ func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserReque switch r.Kind { case spec.PGSyncUserAdd: if err := s.createPgUser(r.User, db); err != nil { - return fmt.Errorf("could not create user '%s': %v", r.User.Name, err) + return fmt.Errorf("could not create user %q: %v", r.User.Name, err) } case spec.PGsyncUserAlter: if err := s.alterPgUser(r.User, db); err != nil { - return fmt.Errorf("could not alter user '%s': %v", r.User.Name, err) + return fmt.Errorf("could not alter user %q: %v", r.User.Name, err) } default: return fmt.Errorf("unrecognized operation: %v", r.Kind) @@ -100,7 +100,7 @@ func (s DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) (err _, err = db.Query(query) // TODO: Try several times if err != nil { - err = fmt.Errorf("dB error: %s, query: %v", err, query) + err = fmt.Errorf("dB error: %v, query: %q", err, query) return } @@ -122,7 +122,7 @@ func (s DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) (err _, err = db.Query(query) // TODO: Try several times if err != nil { - err = fmt.Errorf("dB error: %s query %v", err, query) + err = fmt.Errorf("dB error: %v query %q", err, query) return } diff --git a/pkg/util/util.go b/pkg/util/util.go index abe4a8237..c66622679 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -8,7 +8,7 @@ import ( "time" "github.com/motomux/pretty" - "k8s.io/client-go/pkg/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" ) @@ -34,7 +34,7 @@ func RandomPassword(n int) string { } // NameFromMeta converts a metadata object to the NamespacedName name representation. -func NameFromMeta(meta v1.ObjectMeta) spec.NamespacedName { +func NameFromMeta(meta metav1.ObjectMeta) spec.NamespacedName { return spec.NamespacedName{ Namespace: meta.Namespace, Name: meta.Name, diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 067f64927..cfd37c033 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -4,7 +4,7 @@ import ( "reflect" "testing" - "k8s.io/client-go/pkg/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" ) @@ -53,7 +53,7 @@ func TestRandomPassword(t *testing.T) { } func TestNameFromMeta(t *testing.T) { - meta := v1.ObjectMeta{ + meta := metav1.ObjectMeta{ Name: "testcluster", Namespace: "default", } @@ -73,7 +73,7 @@ func TestPGUserPassword(t *testing.T) { for _, tt := range pgUsers { pwd := PGUserPassword(tt.in) if pwd != tt.out { - t.Errorf("PgUserPassword expected: %s, got: %s", tt.out, pwd) + t.Errorf("PgUserPassword expected: %q, got: %q", tt.out, pwd) } } } @@ -81,7 +81,7 @@ func TestPGUserPassword(t *testing.T) { func TestPrettyDiff(t *testing.T) { for _, tt := range prettyDiffTest { if actual := PrettyDiff(tt.inA, tt.inB); actual != tt.out { - t.Errorf("PrettyDiff expected: %s, got: %s", tt.out, actual) + t.Errorf("PrettyDiff expected: %q, got: %q", tt.out, actual) } } } diff --git a/pkg/util/volumes/ebs.go b/pkg/util/volumes/ebs.go index 8d6ec12b7..c213a1126 100644 --- a/pkg/util/volumes/ebs.go +++ b/pkg/util/volumes/ebs.go @@ -42,11 +42,11 @@ func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) { volumeID := pv.Spec.AWSElasticBlockStore.VolumeID if volumeID == "" { - return "", fmt.Errorf("volume id is empty for volume %s", pv.Name) + return "", fmt.Errorf("volume id is empty for volume %q", pv.Name) } idx := strings.LastIndex(volumeID, constants.EBSVolumeIDStart) + 1 if idx == 0 { - return "", fmt.Errorf("malfored EBS volume id %s", volumeID) + return "", fmt.Errorf("malfored EBS volume id %q", volumeID) } return volumeID[idx:], nil } @@ -60,7 +60,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { } vol := volumeOutput.Volumes[0] if *vol.VolumeId != volumeId { - return fmt.Errorf("describe volume %s returned information about a non-matching volume %s", volumeId, *vol.VolumeId) + return fmt.Errorf("describe volume %q returned information about a non-matching volume %q", volumeId, *vol.VolumeId) } if *vol.Size == newSize { // nothing to do @@ -74,7 +74,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { state := *output.VolumeModification.ModificationState if state == constants.EBSVolumeStateFailed { - return fmt.Errorf("could not modify persistent volume %s: modification state failed", volumeId) + return fmt.Errorf("could not modify persistent volume %q: modification state failed", volumeId) } if state == "" { return fmt.Errorf("received empty modification status") @@ -91,10 +91,10 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { return false, fmt.Errorf("could not describe volume modification: %v", err) } if len(out.VolumesModifications) != 1 { - return false, fmt.Errorf("describe volume modification didn't return one record for volume \"%s\"", volumeId) + return false, fmt.Errorf("describe volume modification didn't return one record for volume %q", volumeId) } if *out.VolumesModifications[0].VolumeId != volumeId { - return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"", + return false, fmt.Errorf("non-matching volume id when describing modifications: %q is different from %q", *out.VolumesModifications[0].VolumeId, volumeId) } return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil