Use Kubernetes API instead of API group

This commit is contained in:
Murat Kabilov 2017-05-10 16:05:25 +02:00
parent ec3f24c3ee
commit fd449342e5
6 changed files with 46 additions and 53 deletions

View File

@ -50,6 +50,9 @@ func ControllerConfig() *controller.Config {
} }
restClient, err := k8sutil.KubernetesRestClient(restConfig) restClient, err := k8sutil.KubernetesRestClient(restConfig)
if err != nil {
log.Fatalf("Can't create rest client: %s", err)
}
return &controller.Config{ return &controller.Config{
KubeClient: client, KubeClient: client,

View File

@ -20,13 +20,13 @@ import (
func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) { func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, error) {
c.logger.Info("Getting list of currently running clusters") c.logger.Info("Getting list of currently running clusters")
object, err := c.RestClient.Get().
Namespace(c.opConfig.Namespace). req := c.RestClient.Get().
Resource(constants.ResourceName). RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)).
VersionedParams(&options, api.ParameterCodec). VersionedParams(&options, api.ParameterCodec).
FieldsSelectorParam(fields.Everything()). FieldsSelectorParam(fields.Everything())
Do().
Get() object, err := req.Do().Get()
if err != nil { if err != nil {
return nil, fmt.Errorf("Can't get list of postgresql objects: %s", err) return nil, fmt.Errorf("Can't get list of postgresql objects: %s", err)
@ -55,6 +55,15 @@ func (c *Controller) clusterListFunc(options api.ListOptions) (runtime.Object, e
return object, err return object, err
} }
func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) {
req := c.RestClient.Get().
RequestURI(fmt.Sprintf(constants.WatchClustersURITemplate, c.opConfig.Namespace)).
VersionedParams(&options, api.ParameterCodec).
FieldsSelectorParam(fields.Everything())
return req.Watch()
}
func (c *Controller) processEvent(obj interface{}) error { func (c *Controller) processEvent(obj interface{}) error {
var clusterName spec.NamespacedName var clusterName spec.NamespacedName
@ -184,16 +193,6 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
c.logger.WithField("worker", workerId).Infof("%s of the '%s' cluster has been queued for", eventType, clusterName) c.logger.WithField("worker", workerId).Infof("%s of the '%s' cluster has been queued for", eventType, clusterName)
} }
func (c *Controller) clusterWatchFunc(options api.ListOptions) (watch.Interface, error) {
return c.RestClient.Get().
Prefix("watch").
Namespace(c.opConfig.Namespace).
Resource(constants.ResourceName).
VersionedParams(&options, api.ParameterCodec).
FieldsSelectorParam(fields.Everything()).
Watch()
}
func (c *Controller) postgresqlAdd(obj interface{}) { func (c *Controller) postgresqlAdd(obj interface{}) {
pg, ok := obj.(*spec.Postgresql) pg, ok := obj.(*spec.Postgresql)
if !ok { if !ok {

View File

@ -81,9 +81,7 @@ func (c *Controller) createTPR() error {
c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName) c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName)
} }
restClient := c.RestClient return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace)
return k8sutil.WaitTPRReady(restClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace)
} }
func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, err error) { func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, err error) {

View File

@ -6,6 +6,10 @@ const (
TPRVendor = "acid.zalan.do" TPRVendor = "acid.zalan.do"
TPRDescription = "Managed PostgreSQL clusters" TPRDescription = "Managed PostgreSQL clusters"
TPRApiVersion = "v1" TPRApiVersion = "v1"
ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace
WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace
K8sVersion = "v1"
K8sApiPath = "/api"
DataVolumeName = "pgdata" DataVolumeName = "pgdata"
PasswordLength = 64 PasswordLength = 64
UserSecretTemplate = "%s.%s.credentials.%s.%s" // Username, ClusterName, TPRName, TPRVendor UserSecretTemplate = "%s.%s.credentials.%s.%s" // Username, ClusterName, TPRName, TPRVendor

View File

@ -1,6 +1,9 @@
package k8sutil package k8sutil
import ( import (
"fmt"
"time"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api"
apierrors "k8s.io/client-go/pkg/api/errors" apierrors "k8s.io/client-go/pkg/api/errors"
@ -12,6 +15,7 @@ import (
"github.bus.zalan.do/acid/postgres-operator/pkg/spec" "github.bus.zalan.do/acid/postgres-operator/pkg/spec"
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants" "github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
"github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil"
) )
func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) {
@ -34,17 +38,17 @@ func ResourceNotFound(err error) bool {
} }
func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) { func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) {
c.APIPath = "/apis" c.GroupVersion = &unversioned.GroupVersion{Version: constants.K8sVersion}
c.GroupVersion = &unversioned.GroupVersion{ c.APIPath = constants.K8sApiPath
Group: constants.TPRVendor,
Version: constants.TPRApiVersion,
}
c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
schemeBuilder := runtime.NewSchemeBuilder( schemeBuilder := runtime.NewSchemeBuilder(
func(scheme *runtime.Scheme) error { func(scheme *runtime.Scheme) error {
scheme.AddKnownTypes( scheme.AddKnownTypes(
*c.GroupVersion, unversioned.GroupVersion{
Group: constants.TPRVendor,
Version: constants.TPRApiVersion,
},
&spec.Postgresql{}, &spec.Postgresql{},
&spec.PostgresqlList{}, &spec.PostgresqlList{},
&api.ListOptions{}, &api.ListOptions{},
@ -56,3 +60,16 @@ func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) {
return rest.RESTClientFor(c) return rest.RESTClientFor(c)
} }
func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error {
return retryutil.Retry(interval, timeout, func() (bool, error) {
_, err := restclient.Get().RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, ns)).DoRaw()
if err != nil {
if ResourceNotFound(err) { // not set up yet. wait more.
return false, nil
}
return false, err
}
return true, nil
})
}

View File

@ -1,28 +0,0 @@
package k8sutil
import (
"fmt"
"time"
"k8s.io/client-go/rest"
"github.bus.zalan.do/acid/postgres-operator/pkg/util/constants"
"github.bus.zalan.do/acid/postgres-operator/pkg/util/retryutil"
)
func listClustersURI(ns string) string {
return fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s", constants.TPRVendor, constants.TPRApiVersion, ns, constants.ResourceName)
}
func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error {
return retryutil.Retry(interval, timeout, func() (bool, error) {
_, err := restclient.Get().RequestURI(listClustersURI(ns)).DoRaw()
if err != nil {
if ResourceNotFound(err) { // not set up yet. wait more.
return false, nil
}
return false, err
}
return true, nil
})
}