This commit is contained in:
Murat Kabilov 2017-07-24 19:38:08 +02:00
parent d7e9142fc7
commit 143eb968ea
10 changed files with 149 additions and 170 deletions

View File

@ -8,11 +8,8 @@ import (
"sync" "sync"
"syscall" "syscall"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/zalando-incubator/postgres-operator/pkg/controller" "github.com/zalando-incubator/postgres-operator/pkg/controller"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
) )
@ -47,28 +44,19 @@ func init() {
func ControllerConfig() *controller.Config { func ControllerConfig() *controller.Config {
restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster) restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster)
if err != nil { if err != nil {
log.Fatalf("Can't get REST config: %v", err) log.Fatalf("couldn't get REST config: %v", err)
}
client, err := k8sutil.KubernetesClient(restConfig)
if err != nil {
log.Fatalf("Can't create client: %v", err)
}
restClient, err := k8sutil.KubernetesRestClient(restConfig)
if err != nil {
log.Fatalf("Can't create rest client: %v", err)
} }
return &controller.Config{ return &controller.Config{
RestConfig: restConfig, RestConfig: restConfig,
KubeClient: client, NoDatabaseAccess: noDatabaseAccess,
RestClient: restClient, NoTeamsAPI: noTeamsAPI,
ConfigMapName: configMapName,
Namespace: podNamespace,
} }
} }
func main() { func main() {
configMapData := make(map[string]string)
log.SetOutput(os.Stdout) log.SetOutput(os.Stdout)
log.Printf("Spilo operator %s\n", version) log.Printf("Spilo operator %s\n", version)
@ -78,33 +66,7 @@ func main() {
wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on
controllerConfig := ControllerConfig() c := controller.New(ControllerConfig())
if configMapName != (spec.NamespacedName{}) {
configMap, err := controllerConfig.KubeClient.ConfigMaps(configMapName.Namespace).Get(configMapName.Name, meta_v1.GetOptions{})
if err != nil {
panic(err)
}
configMapData = configMap.Data
} else {
log.Printf("No ConfigMap specified. Loading default values")
}
if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var
configMapData["namespace"] = podNamespace
}
if noDatabaseAccess {
configMapData["enable_database_access"] = "false"
}
if noTeamsAPI {
configMapData["enable_teams_api"] = "false"
}
cfg := config.NewFromMap(configMapData)
log.Printf("Config: %s", cfg.MustMarshal())
c := controller.New(controllerConfig, cfg)
c.Run(stop, wg) c.Run(stop, wg)
sig := <-sigs sig := <-sigs

13
glide.lock generated
View File

@ -1,8 +1,8 @@
hash: 140e0c8a606d18ca405e9c50359cc673e4aa0cc88bcae5d7f83791e7002bd6a1 hash: 140e0c8a606d18ca405e9c50359cc673e4aa0cc88bcae5d7f83791e7002bd6a1
updated: 2017-07-12T12:52:55.896264+02:00 updated: 2017-07-24T19:24:17.604824235+02:00
imports: imports:
- name: github.com/aws/aws-sdk-go - name: github.com/aws/aws-sdk-go
version: b1a7b51924b90a6ecdbaeb17e96418740ff07a1e version: afd601335e2a72d43caa3af6bd2abe512fcc3bfd
subpackages: subpackages:
- aws - aws
- aws/awserr - aws/awserr
@ -30,7 +30,7 @@ imports:
- service/ec2 - service/ec2
- service/sts - service/sts
- name: github.com/davecgh/go-spew - name: github.com/davecgh/go-spew
version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d version: 782f4967f2dc4564575ca782fe2d04090b5faca8
subpackages: subpackages:
- spew - spew
- name: github.com/docker/distribution - name: github.com/docker/distribution
@ -104,7 +104,7 @@ imports:
- name: github.com/PuerkitoBio/urlesc - name: github.com/PuerkitoBio/urlesc
version: 5bd2802263f21d8788851d5305584c82a5c75d7e version: 5bd2802263f21d8788851d5305584c82a5c75d7e
- name: github.com/Sirupsen/logrus - name: github.com/Sirupsen/logrus
version: 7f976d3a76720c4c27af2ba716b85d2e0a7e38b1 version: a3f95b5c423586578a4e099b11a46c2479628cac
- name: github.com/spf13/pflag - name: github.com/spf13/pflag
version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7
- name: github.com/ugorji/go - name: github.com/ugorji/go
@ -118,12 +118,15 @@ imports:
- name: golang.org/x/net - name: golang.org/x/net
version: f2499483f923065a842d38eb4c7f1927e6fc6e6d version: f2499483f923065a842d38eb4c7f1927e6fc6e6d
subpackages: subpackages:
- html
- html/atom
- http2 - http2
- http2/hpack - http2/hpack
- idna - idna
- lex/httplex - lex/httplex
- websocket
- name: golang.org/x/sys - name: golang.org/x/sys
version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 version: c4489faa6e5ab84c0ef40d6ee878f7a030281f0f
subpackages: subpackages:
- unix - unix
- name: golang.org/x/text - name: golang.org/x/text

View File

@ -36,11 +36,8 @@ var (
// Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication.
type Config struct { type Config struct {
KubeClient *kubernetes.Clientset //TODO: move clients to the better place?
RestClient rest.Interface
RestConfig *rest.Config
TeamsAPIClient *teams.API
OpConfig config.Config OpConfig config.Config
RestConfig *rest.Config
InfrastructureRoles map[string]spec.PgUser // inherited from the controller InfrastructureRoles map[string]spec.PgUser // inherited from the controller
} }
@ -68,6 +65,9 @@ type Cluster struct {
userSyncStrategy spec.UserSyncer userSyncStrategy spec.UserSyncer
deleteOptions *meta_v1.DeleteOptions deleteOptions *meta_v1.DeleteOptions
podEventsQueue *cache.FIFO podEventsQueue *cache.FIFO
teamsAPIClient *teams.API
KubeClient *kubernetes.Clientset //TODO: move clients to the better place?
} }
type compareStatefulsetResult struct { type compareStatefulsetResult struct {
@ -78,7 +78,7 @@ type compareStatefulsetResult struct {
} }
// New creates a new cluster. This function should be called from a controller. // New creates a new cluster. This function should be called from a controller.
func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { func New(cfg Config, kubeClient *kubernetes.Clientset, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name)
kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)}
orphanDependents := true orphanDependents := true
@ -104,6 +104,7 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
userSyncStrategy: users.DefaultUserSyncStrategy{}, userSyncStrategy: users.DefaultUserSyncStrategy{},
deleteOptions: &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents}, deleteOptions: &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents},
podEventsQueue: podEventsQueue, podEventsQueue: podEventsQueue,
KubeClient: kubeClient,
} }
return cluster return cluster
@ -126,7 +127,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) {
} }
request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods
_, err = c.RestClient.Patch(types.MergePatchType). _, err = c.KubeClient.CoreV1().RESTClient().Patch(types.MergePatchType).
RequestURI(c.Metadata.GetSelfLink()). RequestURI(c.Metadata.GetSelfLink()).
Body(request). Body(request).
DoRaw() DoRaw()

View File

@ -153,7 +153,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) {
return []string{}, fmt.Errorf("could not get oauth token: %v", err) return []string{}, fmt.Errorf("could not get oauth token: %v", err)
} }
teamInfo, err := c.TeamsAPIClient.TeamInfo(c.Spec.TeamID, token) teamInfo, err := c.teamsAPIClient.TeamInfo(c.Spec.TeamID, token)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get team info: %v", err) return nil, fmt.Errorf("could not get team info: %v", err)
} }

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
@ -14,15 +15,18 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/config"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
"github.com/zalando-incubator/postgres-operator/pkg/util/teams" "github.com/zalando-incubator/postgres-operator/pkg/util/teams"
) )
type Config struct { type Config struct {
RestConfig *rest.Config RestConfig *rest.Config
KubeClient *kubernetes.Clientset
RestClient rest.Interface
TeamsAPIClient *teams.API
InfrastructureRoles map[string]spec.PgUser InfrastructureRoles map[string]spec.PgUser
NoDatabaseAccess bool
NoTeamsAPI bool
ConfigMapName spec.NamespacedName
Namespace string
} }
type Controller struct { type Controller struct {
@ -30,6 +34,10 @@ type Controller struct {
opConfig *config.Config opConfig *config.Config
logger *logrus.Entry logger *logrus.Entry
KubeClient *kubernetes.Clientset
RestClient rest.Interface
TeamsAPIClient *teams.API
clustersMu sync.RWMutex clustersMu sync.RWMutex
clusters map[spec.NamespacedName]*cluster.Cluster clusters map[spec.NamespacedName]*cluster.Cluster
stopChs map[spec.NamespacedName]chan struct{} stopChs map[spec.NamespacedName]chan struct{}
@ -43,15 +51,48 @@ type Controller struct {
lastClusterSyncTime int64 lastClusterSyncTime int64
} }
func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { func New(controllerConfig *Config) *Controller {
configMapData := make(map[string]string)
logger := logrus.New() logger := logrus.New()
client, err := k8sutil.KubernetesClient(controllerConfig.RestConfig)
if err != nil {
logger.Fatalf("couldn't create client: %v", err)
}
restClient, err := k8sutil.KubernetesRestClient(*controllerConfig.RestConfig)
if err != nil {
logger.Fatalf("couldn't create rest client: %v", err)
}
if controllerConfig.ConfigMapName != (spec.NamespacedName{}) {
configMap, err := client.ConfigMaps(controllerConfig.ConfigMapName.Namespace).Get(controllerConfig.ConfigMapName.Name, meta_v1.GetOptions{})
if err != nil {
panic(err)
}
configMapData = configMap.Data
} else {
logger.Infoln("No ConfigMap specified. Loading default values")
}
if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var
configMapData["namespace"] = controllerConfig.Namespace
}
if controllerConfig.NoDatabaseAccess {
configMapData["enable_database_access"] = "false"
}
if controllerConfig.NoTeamsAPI {
configMapData["enable_teams_api"] = "false"
}
operatorConfig := config.NewFromMap(configMapData)
logger.Infof("Config: %s", operatorConfig.MustMarshal())
if operatorConfig.DebugLogging { if operatorConfig.DebugLogging {
logger.Level = logrus.DebugLevel logger.Level = logrus.DebugLevel
} }
controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger)
return &Controller{ return &Controller{
Config: *controllerConfig, Config: *controllerConfig,
opConfig: operatorConfig, opConfig: operatorConfig,
@ -59,6 +100,9 @@ func New(controllerConfig *Config, operatorConfig *config.Config) *Controller {
clusters: make(map[spec.NamespacedName]*cluster.Cluster), clusters: make(map[spec.NamespacedName]*cluster.Cluster),
stopChs: make(map[spec.NamespacedName]chan struct{}), stopChs: make(map[spec.NamespacedName]chan struct{}),
podCh: make(chan spec.PodEvent), podCh: make(chan spec.PodEvent),
TeamsAPIClient: teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger),
KubeClient: client,
RestClient: restClient,
} }
} }
@ -89,15 +133,14 @@ func (c *Controller) initController() {
} }
// Postgresqls // Postgresqls
clusterLw := &cache.ListWatch{ c.postgresqlInformer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: c.clusterListFunc, ListFunc: c.clusterListFunc,
WatchFunc: c.clusterWatchFunc, WatchFunc: c.clusterWatchFunc,
} },
c.postgresqlInformer = cache.NewSharedIndexInformer(
clusterLw,
&spec.Postgresql{}, &spec.Postgresql{},
constants.QueueResyncPeriodTPR, constants.QueueResyncPeriodTPR,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) cache.Indexers{})
c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.postgresqlAdd, AddFunc: c.postgresqlAdd,

View File

@ -3,18 +3,15 @@ package controller
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"sync/atomic"
"time" "time"
"k8s.io/apimachinery/pkg/api/meta"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"encoding/json"
"github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/cluster"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util" "github.com/zalando-incubator/postgres-operator/pkg/util"
@ -35,67 +32,60 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}) {
} }
func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Object, error) { func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Object, error) {
c.logger.Info("Getting list of currently running clusters") req := c.RestClient.
Get().
req := c.RestClient.Get(). Namespace(c.opConfig.Namespace).
RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). Resource(constants.ResourceName).
VersionedParams(&options, scheme.ParameterCodec). VersionedParams(&options, meta_v1.ParameterCodec)
FieldsSelectorParam(fields.Everything())
object, err := req.Do().Get()
b, err := req.DoRaw()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get list of postgresql objects: %v", err) return nil, err
}
var list spec.PostgresqlList
return &list, json.Unmarshal(b, &list)
} }
objList, err := meta.ExtractList(object) type tprDecoder struct {
if err != nil { dec *json.Decoder
return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err) close func() error
} }
if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { func (d *tprDecoder) Close() {
c.logger.Debugln("skipping resync of clusters") d.close()
return object, err
} }
var activeClustersCnt, failedClustersCnt int func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
for _, obj := range objList { var e struct {
pg, ok := obj.(*spec.Postgresql) Type watch.EventType
if !ok { Object spec.Postgresql
return nil, fmt.Errorf("could not cast object to postgresql") }
if err := d.dec.Decode(&e); err != nil {
return watch.Error, nil, err
} }
if pg.Error != nil { return e.Type, &e.Object, nil
failedClustersCnt++
continue
}
c.queueClusterEvent(nil, pg, spec.EventSync)
activeClustersCnt++
}
if len(objList) > 0 {
if failedClustersCnt > 0 && activeClustersCnt == 0 {
c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt)
} else if failedClustersCnt == 0 && activeClustersCnt > 0 {
c.logger.Infof("There are %d clusters running", activeClustersCnt)
} else {
c.logger.Infof("There are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt)
}
} else {
c.logger.Infof("No clusters running")
}
atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix())
return object, err
} }
func (c *Controller) clusterWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { func (c *Controller) clusterWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) {
options.Watch = true options.Watch = true
req := c.RestClient.Get(). r, err := c.RestClient.
RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). Get().
VersionedParams(&options, scheme.ParameterCodec). Namespace(c.opConfig.Namespace).
FieldsSelectorParam(fields.Everything()) Resource(constants.ResourceName).
return req.Watch() VersionedParams(&options, meta_v1.ParameterCodec).
FieldsSelectorParam(nil).
Stream()
if err != nil {
return nil, err
}
return watch.NewStreamWatcher(&tprDecoder{
dec: json.NewDecoder(r),
close: r.Close,
}), nil
} }
func (c *Controller) processEvent(obj interface{}) error { func (c *Controller) processEvent(obj interface{}) error {
@ -127,7 +117,7 @@ func (c *Controller) processEvent(obj interface{}) error {
logger.Infof("Creation of the %q cluster started", clusterName) logger.Infof("Creation of the %q cluster started", clusterName)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger)
cl.Run(stopCh) cl.Run(stopCh)
c.clustersMu.Lock() c.clustersMu.Lock()
@ -183,7 +173,7 @@ func (c *Controller) processEvent(obj interface{}) error {
// no race condition because a cluster is always processed by single worker // no race condition because a cluster is always processed by single worker
if !clusterFound { if !clusterFound {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger)
cl.Run(stopCh) cl.Run(stopCh)
c.clustersMu.Lock() c.clustersMu.Lock()

View File

@ -22,10 +22,7 @@ func (c *Controller) makeClusterConfig() cluster.Config {
} }
return cluster.Config{ return cluster.Config{
KubeClient: c.KubeClient,
RestClient: c.RestClient,
RestConfig: c.RestConfig, RestConfig: c.RestConfig,
TeamsAPIClient: c.TeamsAPIClient,
OpConfig: config.Copy(c.opConfig), OpConfig: config.Copy(c.opConfig),
InfrastructureRoles: infrastructureRoles, InfrastructureRoles: infrastructureRoles,
} }

View File

@ -4,10 +4,7 @@ import "time"
// General kubernetes-related constants // General kubernetes-related constants
const ( const (
ListClustersURITemplate = "/apis/" + TPRGroup + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace K8sAPIPath = "/apis"
WatchClustersURITemplate = "/apis/" + TPRGroup + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace
K8sVersion = "v1"
K8sAPIPath = "/api"
StatefulsetDeletionInterval = 1 * time.Second StatefulsetDeletionInterval = 1 * time.Second
StatefulsetDeletionTimeout = 30 * time.Second StatefulsetDeletionTimeout = 30 * time.Second

View File

@ -6,6 +6,6 @@ const (
TPRGroup = "acid.zalan.do" TPRGroup = "acid.zalan.do"
TPRDescription = "Managed PostgreSQL clusters" TPRDescription = "Managed PostgreSQL clusters"
TPRApiVersion = "v1" TPRApiVersion = "v1"
TPRName = TPRKind + "." + TPRKind TPRName = TPRKind + "." + TPRGroup
ResourceName = TPRKind + "s" ResourceName = TPRKind + "s"
) )

View File

@ -1,20 +1,16 @@
package k8sutil package k8sutil
import ( import (
"fmt"
"time" "time"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/pkg/api"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
) )
@ -23,6 +19,7 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) {
if outOfCluster { if outOfCluster {
return clientcmd.BuildConfigFromFlags("", kubeConfig) return clientcmd.BuildConfigFromFlags("", kubeConfig)
} }
return rest.InClusterConfig() return rest.InClusterConfig()
} }
@ -38,35 +35,24 @@ func ResourceNotFound(err error) bool {
return apierrors.IsNotFound(err) return apierrors.IsNotFound(err)
} }
func KubernetesRestClient(c *rest.Config) (rest.Interface, error) { func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) {
c.GroupVersion = &schema.GroupVersion{Version: constants.K8sVersion} cfg.GroupVersion = &schema.GroupVersion{
c.APIPath = constants.K8sAPIPath
c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}
schemeBuilder := runtime.NewSchemeBuilder(
func(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(
schema.GroupVersion{
Group: constants.TPRGroup, Group: constants.TPRGroup,
Version: constants.TPRApiVersion, Version: constants.TPRApiVersion,
},
&spec.Postgresql{},
&spec.PostgresqlList{},
&meta_v1.ListOptions{},
&meta_v1.DeleteOptions{},
)
return nil
})
if err := schemeBuilder.AddToScheme(scheme.Scheme); err != nil {
return nil, fmt.Errorf("could not apply functions to register PostgreSQL TPR type: %v", err)
} }
cfg.APIPath = constants.K8sAPIPath
cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
return rest.RESTClientFor(c) return rest.RESTClientFor(&cfg)
} }
func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error {
return retryutil.Retry(interval, timeout, func() (bool, error) { return retryutil.Retry(interval, timeout, func() (bool, error) {
_, err := restclient.Get().RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, ns)).DoRaw() _, err := restclient.
Get().
Namespace(ns).
Resource(constants.ResourceName).
DoRaw()
if err != nil { if err != nil {
if ResourceNotFound(err) { // not set up yet. wait more. if ResourceNotFound(err) { // not set up yet. wait more.
return false, nil return false, nil