diff --git a/cmd/main.go b/cmd/main.go index f9d02d55a..24263c73b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -50,6 +50,9 @@ func ControllerConfig() *controller.Config { } restClient, err := k8sutil.KubernetesRestClient(restConfig) + if err != nil { + log.Fatalf("Can't create rest client: %s", err) + } return &controller.Config{ KubeClient: client, diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index e9be4c2be..b9515ea94 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -20,13 +20,13 @@ import ( func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) { c.logger.Info("Getting list of currently running clusters") - object, err := c.RestClient.Get(). - Namespace(c.opConfig.Namespace). - Resource(constants.ResourceName). + + req := c.RestClient.Get(). + RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). VersionedParams(&options, api.ParameterCodec). - FieldsSelectorParam(fields.Everything()). - Do(). - Get() + FieldsSelectorParam(fields.Everything()) + + object, err := req.Do().Get() if err != nil { return nil, fmt.Errorf("Can't get list of postgresql objects: %s", err) @@ -55,6 +55,15 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e return object, err } +func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) { + req := c.RestClient.Get(). + RequestURI(fmt.Sprintf(constants.WatchClustersURITemplate, c.opConfig.Namespace)). + VersionedParams(&options, api.ParameterCodec). + FieldsSelectorParam(fields.Everything()) + + return req.Watch() +} + func (c *Controller) processEvent(obj interface{}) error { var clusterName spec.NamespacedName @@ -184,16 +193,6 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec c.logger.WithField("worker", workerId).Infof("%s of the '%s' cluster has been queued for", eventType, clusterName) } -func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) { - return c.RestClient.Get(). - Prefix("watch"). - Namespace(c.opConfig.Namespace). - Resource(constants.ResourceName). - VersionedParams(&options, api.ParameterCodec). - FieldsSelectorParam(fields.Everything()). - Watch() -} - func (c *Controller) postgresqlAdd(obj interface{}) { pg, ok := obj.(*spec.Postgresql) if !ok { diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 719d85e59..92500dd78 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -81,9 +81,7 @@ func (c *Controller) createTPR() error { c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName) } - restClient := c.RestClient - - return k8sutil.WaitTPRReady(restClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) + return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) } func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, err error) { diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go index c4bc30ac3..a110f2080 100644 --- a/pkg/util/constants/constants.go +++ b/pkg/util/constants/constants.go @@ -6,6 +6,10 @@ const ( TPRVendor = "acid.zalan.do" TPRDescription = "Managed PostgreSQL clusters" TPRApiVersion = "v1" + ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace + WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace + K8sVersion = "v1" + K8sApiPath = "/api" DataVolumeName = "pgdata" PasswordLength = 64 UserSecretTemplate = "%s.%s.credentials.%s.%s" // Username, ClusterName, TPRName, TPRVendor diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 924bf569e..92052fb20 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -1,6 +1,9 @@ package k8sutil import ( + "fmt" + "time" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api" apierrors "k8s.io/client-go/pkg/api/errors" @@ -12,6 +15,7 @@ import ( "github.bus.zalan.do/acid/postgres-operator/pkg/spec" "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" + "github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil" ) func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { @@ -34,17 +38,17 @@ func ResourceNotFound(err error) bool { } func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) { - c.APIPath = "/apis" - c.GroupVersion = &unversioned.GroupVersion{ - Group: constants.TPRVendor, - Version: constants.TPRApiVersion, - } + c.GroupVersion = &unversioned.GroupVersion{Version: constants.K8sVersion} + c.APIPath = constants.K8sApiPath c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} schemeBuilder := runtime.NewSchemeBuilder( func(scheme *runtime.Scheme) error { scheme.AddKnownTypes( - *c.GroupVersion, + unversioned.GroupVersion{ + Group: constants.TPRVendor, + Version: constants.TPRApiVersion, + }, &spec.Postgresql{}, &spec.PostgresqlList{}, &api.ListOptions{}, @@ -56,3 +60,16 @@ func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) { return rest.RESTClientFor(c) } + +func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { + return retryutil.Retry(interval, timeout, func() (bool, error) { + _, err := restclient.Get().RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, ns)).DoRaw() + if err != nil { + if ResourceNotFound(err) { // not set up yet. wait more. + return false, nil + } + return false, err + } + return true, nil + }) +} diff --git a/pkg/util/k8sutil/tpr_util.go b/pkg/util/k8sutil/tpr_util.go deleted file mode 100644 index 48521e753..000000000 --- a/pkg/util/k8sutil/tpr_util.go +++ /dev/null @@ -1,28 +0,0 @@ -package k8sutil - -import ( - "fmt" - "time" - - "k8s.io/client-go/rest" - - "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" - "github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil" -) - -func listClustersURI(ns string) string { - return fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s", constants.TPRVendor, constants.TPRApiVersion, ns, constants.ResourceName) -} - -func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { - return retryutil.Retry(interval, timeout, func() (bool, error) { - _, err := restclient.Get().RequestURI(listClustersURI(ns)).DoRaw() - if err != nil { - if ResourceNotFound(err) { // not set up yet. wait more. - return false, nil - } - return false, err - } - return true, nil - }) -}