diff --git a/cmd/main.go b/cmd/main.go index d8e189b23..9785c051f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,11 +8,8 @@ import ( "sync" "syscall" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/zalando-incubator/postgres-operator/pkg/controller" "github.com/zalando-incubator/postgres-operator/pkg/spec" - "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" ) @@ -47,28 +44,19 @@ func init() { func ControllerConfig() *controller.Config { restConfig, err := k8sutil.RestConfig(KubeConfigFile, OutOfCluster) if err != nil { - log.Fatalf("Can't get REST config: %v", err) - } - - client, err := k8sutil.KubernetesClient(restConfig) - if err != nil { - log.Fatalf("Can't create client: %v", err) - } - - restClient, err := k8sutil.KubernetesRestClient(restConfig) - if err != nil { - log.Fatalf("Can't create rest client: %v", err) + log.Fatalf("couldn't get REST config: %v", err) } return &controller.Config{ - RestConfig: restConfig, - KubeClient: client, - RestClient: restClient, + RestConfig: restConfig, + NoDatabaseAccess: noDatabaseAccess, + NoTeamsAPI: noTeamsAPI, + ConfigMapName: configMapName, + Namespace: podNamespace, } } func main() { - configMapData := make(map[string]string) log.SetOutput(os.Stdout) log.Printf("Spilo operator %s\n", version) @@ -78,33 +66,7 @@ func main() { wg := &sync.WaitGroup{} // Goroutines can add themselves to this to be waited on - controllerConfig := ControllerConfig() - - if configMapName != (spec.NamespacedName{}) { - configMap, err := controllerConfig.KubeClient.ConfigMaps(configMapName.Namespace).Get(configMapName.Name, meta_v1.GetOptions{}) - if err != nil { - panic(err) - } - - configMapData = configMap.Data - } else { - log.Printf("No ConfigMap specified. Loading default values") - } - - if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var - configMapData["namespace"] = podNamespace - } - if noDatabaseAccess { - configMapData["enable_database_access"] = "false" - } - if noTeamsAPI { - configMapData["enable_teams_api"] = "false" - } - cfg := config.NewFromMap(configMapData) - - log.Printf("Config: %s", cfg.MustMarshal()) - - c := controller.New(controllerConfig, cfg) + c := controller.New(ControllerConfig()) c.Run(stop, wg) sig := <-sigs diff --git a/glide.lock b/glide.lock index eda1edf36..f9a3aa4b1 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ hash: 140e0c8a606d18ca405e9c50359cc673e4aa0cc88bcae5d7f83791e7002bd6a1 -updated: 2017-07-12T12:52:55.896264+02:00 +updated: 2017-07-24T19:24:17.604824235+02:00 imports: - name: github.com/aws/aws-sdk-go - version: b1a7b51924b90a6ecdbaeb17e96418740ff07a1e + version: afd601335e2a72d43caa3af6bd2abe512fcc3bfd subpackages: - aws - aws/awserr @@ -30,7 +30,7 @@ imports: - service/ec2 - service/sts - name: github.com/davecgh/go-spew - version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d + version: 782f4967f2dc4564575ca782fe2d04090b5faca8 subpackages: - spew - name: github.com/docker/distribution @@ -104,7 +104,7 @@ imports: - name: github.com/PuerkitoBio/urlesc version: 5bd2802263f21d8788851d5305584c82a5c75d7e - name: github.com/Sirupsen/logrus - version: 7f976d3a76720c4c27af2ba716b85d2e0a7e38b1 + version: a3f95b5c423586578a4e099b11a46c2479628cac - name: github.com/spf13/pflag version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 - name: github.com/ugorji/go @@ -118,12 +118,15 @@ imports: - name: golang.org/x/net version: f2499483f923065a842d38eb4c7f1927e6fc6e6d subpackages: + - html + - html/atom - http2 - http2/hpack - idna - lex/httplex + - websocket - name: golang.org/x/sys - version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 + version: c4489faa6e5ab84c0ef40d6ee878f7a030281f0f subpackages: - unix - name: golang.org/x/text diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index bff10235f..068e0b64e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -36,11 +36,8 @@ var ( // Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. type Config struct { - KubeClient *kubernetes.Clientset //TODO: move clients to the better place? - RestClient rest.Interface - RestConfig *rest.Config - TeamsAPIClient *teams.API OpConfig config.Config + RestConfig *rest.Config InfrastructureRoles map[string]spec.PgUser // inherited from the controller } @@ -68,6 +65,9 @@ type Cluster struct { userSyncStrategy spec.UserSyncer deleteOptions *meta_v1.DeleteOptions podEventsQueue *cache.FIFO + + teamsAPIClient *teams.API + KubeClient *kubernetes.Clientset //TODO: move clients to the better place? } type compareStatefulsetResult struct { @@ -78,7 +78,7 @@ type compareStatefulsetResult struct { } // New creates a new cluster. This function should be called from a controller. -func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { +func New(cfg Config, kubeClient *kubernetes.Clientset, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} orphanDependents := true @@ -104,6 +104,7 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { userSyncStrategy: users.DefaultUserSyncStrategy{}, deleteOptions: &meta_v1.DeleteOptions{OrphanDependents: &orphanDependents}, podEventsQueue: podEventsQueue, + KubeClient: kubeClient, } return cluster @@ -126,7 +127,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { } request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods - _, err = c.RestClient.Patch(types.MergePatchType). + _, err = c.KubeClient.CoreV1().RESTClient().Patch(types.MergePatchType). RequestURI(c.Metadata.GetSelfLink()). Body(request). DoRaw() diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 1f34fbfc2..f7a7a16b1 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -153,7 +153,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) { return []string{}, fmt.Errorf("could not get oauth token: %v", err) } - teamInfo, err := c.TeamsAPIClient.TeamInfo(c.Spec.TeamID, token) + teamInfo, err := c.teamsAPIClient.TeamInfo(c.Spec.TeamID, token) if err != nil { return nil, fmt.Errorf("could not get team info: %v", err) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 6112ec6b9..a4199d771 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/Sirupsen/logrus" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/rest" @@ -14,15 +15,18 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/config" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" + "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/teams" ) type Config struct { RestConfig *rest.Config - KubeClient *kubernetes.Clientset - RestClient rest.Interface - TeamsAPIClient *teams.API InfrastructureRoles map[string]spec.PgUser + + NoDatabaseAccess bool + NoTeamsAPI bool + ConfigMapName spec.NamespacedName + Namespace string } type Controller struct { @@ -30,6 +34,10 @@ type Controller struct { opConfig *config.Config logger *logrus.Entry + KubeClient *kubernetes.Clientset + RestClient rest.Interface + TeamsAPIClient *teams.API + clustersMu sync.RWMutex clusters map[spec.NamespacedName]*cluster.Cluster stopChs map[spec.NamespacedName]chan struct{} @@ -43,22 +51,58 @@ type Controller struct { lastClusterSyncTime int64 } -func New(controllerConfig *Config, operatorConfig *config.Config) *Controller { +func New(controllerConfig *Config) *Controller { + configMapData := make(map[string]string) logger := logrus.New() + client, err := k8sutil.KubernetesClient(controllerConfig.RestConfig) + if err != nil { + logger.Fatalf("couldn't create client: %v", err) + } + + restClient, err := k8sutil.KubernetesRestClient(*controllerConfig.RestConfig) + if err != nil { + logger.Fatalf("couldn't create rest client: %v", err) + } + + if controllerConfig.ConfigMapName != (spec.NamespacedName{}) { + configMap, err := client.ConfigMaps(controllerConfig.ConfigMapName.Namespace).Get(controllerConfig.ConfigMapName.Name, meta_v1.GetOptions{}) + if err != nil { + panic(err) + } + + configMapData = configMap.Data + } else { + logger.Infoln("No ConfigMap specified. Loading default values") + } + + if configMapData["namespace"] == "" { // Namespace in ConfigMap has priority over env var + configMapData["namespace"] = controllerConfig.Namespace + } + if controllerConfig.NoDatabaseAccess { + configMapData["enable_database_access"] = "false" + } + if controllerConfig.NoTeamsAPI { + configMapData["enable_teams_api"] = "false" + } + operatorConfig := config.NewFromMap(configMapData) + + logger.Infof("Config: %s", operatorConfig.MustMarshal()) + if operatorConfig.DebugLogging { logger.Level = logrus.DebugLevel } - controllerConfig.TeamsAPIClient = teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger) - return &Controller{ - Config: *controllerConfig, - opConfig: operatorConfig, - logger: logger.WithField("pkg", "controller"), - clusters: make(map[spec.NamespacedName]*cluster.Cluster), - stopChs: make(map[spec.NamespacedName]chan struct{}), - podCh: make(chan spec.PodEvent), + Config: *controllerConfig, + opConfig: operatorConfig, + logger: logger.WithField("pkg", "controller"), + clusters: make(map[spec.NamespacedName]*cluster.Cluster), + stopChs: make(map[spec.NamespacedName]chan struct{}), + podCh: make(chan spec.PodEvent), + TeamsAPIClient: teams.NewTeamsAPI(operatorConfig.TeamsAPIUrl, logger), + KubeClient: client, + RestClient: restClient, } } @@ -89,15 +133,14 @@ func (c *Controller) initController() { } // Postgresqls - clusterLw := &cache.ListWatch{ - ListFunc: c.clusterListFunc, - WatchFunc: c.clusterWatchFunc, - } c.postgresqlInformer = cache.NewSharedIndexInformer( - clusterLw, + &cache.ListWatch{ + ListFunc: c.clusterListFunc, + WatchFunc: c.clusterWatchFunc, + }, &spec.Postgresql{}, constants.QueueResyncPeriodTPR, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + cache.Indexers{}) c.postgresqlInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.postgresqlAdd, diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index c458ba78e..d9fc8ff1c 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -3,18 +3,15 @@ package controller import ( "fmt" "reflect" - "sync/atomic" "time" - "k8s.io/apimachinery/pkg/api/meta" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + "encoding/json" "github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -35,67 +32,60 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}) { } func (c *Controller) clusterListFunc(options meta_v1.ListOptions) (runtime.Object, error) { - c.logger.Info("Getting list of currently running clusters") - - req := c.RestClient.Get(). - RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). - VersionedParams(&options, scheme.ParameterCodec). - FieldsSelectorParam(fields.Everything()) - - object, err := req.Do().Get() + req := c.RestClient. + Get(). + Namespace(c.opConfig.Namespace). + Resource(constants.ResourceName). + VersionedParams(&options, meta_v1.ParameterCodec) + b, err := req.DoRaw() if err != nil { - return nil, fmt.Errorf("could not get list of postgresql objects: %v", err) + return nil, err + } + var list spec.PostgresqlList + + return &list, json.Unmarshal(b, &list) +} + +type tprDecoder struct { + dec *json.Decoder + close func() error +} + +func (d *tprDecoder) Close() { + d.close() +} + +func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { + var e struct { + Type watch.EventType + Object spec.Postgresql + } + if err := d.dec.Decode(&e); err != nil { + return watch.Error, nil, err } - objList, err := meta.ExtractList(object) - if err != nil { - return nil, fmt.Errorf("could not extract list of postgresql objects: %v", err) - } - - if time.Now().Unix()-atomic.LoadInt64(&c.lastClusterSyncTime) <= int64(c.opConfig.ResyncPeriod.Seconds()) { - c.logger.Debugln("skipping resync of clusters") - return object, err - } - - var activeClustersCnt, failedClustersCnt int - for _, obj := range objList { - pg, ok := obj.(*spec.Postgresql) - if !ok { - return nil, fmt.Errorf("could not cast object to postgresql") - } - - if pg.Error != nil { - failedClustersCnt++ - continue - } - c.queueClusterEvent(nil, pg, spec.EventSync) - activeClustersCnt++ - } - if len(objList) > 0 { - if failedClustersCnt > 0 && activeClustersCnt == 0 { - c.logger.Infof("There are no clusters running. %d are in the failed state", failedClustersCnt) - } else if failedClustersCnt == 0 && activeClustersCnt > 0 { - c.logger.Infof("There are %d clusters running", activeClustersCnt) - } else { - c.logger.Infof("There are %d clusters running and %d are in the failed state", activeClustersCnt, failedClustersCnt) - } - } else { - c.logger.Infof("No clusters running") - } - - atomic.StoreInt64(&c.lastClusterSyncTime, time.Now().Unix()) - - return object, err + return e.Type, &e.Object, nil } func (c *Controller) clusterWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) { options.Watch = true - req := c.RestClient.Get(). - RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, c.opConfig.Namespace)). - VersionedParams(&options, scheme.ParameterCodec). - FieldsSelectorParam(fields.Everything()) - return req.Watch() + r, err := c.RestClient. + Get(). + Namespace(c.opConfig.Namespace). + Resource(constants.ResourceName). + VersionedParams(&options, meta_v1.ParameterCodec). + FieldsSelectorParam(nil). + Stream() + + if err != nil { + return nil, err + } + + return watch.NewStreamWatcher(&tprDecoder{ + dec: json.NewDecoder(r), + close: r.Close, + }), nil } func (c *Controller) processEvent(obj interface{}) error { @@ -127,7 +117,7 @@ func (c *Controller) processEvent(obj interface{}) error { logger.Infof("Creation of the %q cluster started", clusterName) stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) cl.Run(stopCh) c.clustersMu.Lock() @@ -183,7 +173,7 @@ func (c *Controller) processEvent(obj interface{}) error { // no race condition because a cluster is always processed by single worker if !clusterFound { stopCh := make(chan struct{}) - cl = cluster.New(c.makeClusterConfig(), *event.NewSpec, logger) + cl = cluster.New(c.makeClusterConfig(), c.KubeClient, *event.NewSpec, logger) cl.Run(stopCh) c.clustersMu.Lock() diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 1ad23f7c6..b3f10c4bd 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -22,10 +22,7 @@ func (c *Controller) makeClusterConfig() cluster.Config { } return cluster.Config{ - KubeClient: c.KubeClient, - RestClient: c.RestClient, RestConfig: c.RestConfig, - TeamsAPIClient: c.TeamsAPIClient, OpConfig: config.Copy(c.opConfig), InfrastructureRoles: infrastructureRoles, } diff --git a/pkg/util/constants/kubernetes.go b/pkg/util/constants/kubernetes.go index 3fa134349..79c60cad2 100644 --- a/pkg/util/constants/kubernetes.go +++ b/pkg/util/constants/kubernetes.go @@ -4,10 +4,7 @@ import "time" // General kubernetes-related constants const ( - ListClustersURITemplate = "/apis/" + TPRGroup + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace - WatchClustersURITemplate = "/apis/" + TPRGroup + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace - K8sVersion = "v1" - K8sAPIPath = "/api" + K8sAPIPath = "/apis" StatefulsetDeletionInterval = 1 * time.Second StatefulsetDeletionTimeout = 30 * time.Second diff --git a/pkg/util/constants/thirdpartyresource.go b/pkg/util/constants/thirdpartyresource.go index a815adc55..a0a00d259 100644 --- a/pkg/util/constants/thirdpartyresource.go +++ b/pkg/util/constants/thirdpartyresource.go @@ -6,6 +6,6 @@ const ( TPRGroup = "acid.zalan.do" TPRDescription = "Managed PostgreSQL clusters" TPRApiVersion = "v1" - TPRName = TPRKind + "." + TPRKind + TPRName = TPRKind + "." + TPRGroup ResourceName = TPRKind + "s" ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index c3f665138..c5780743b 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -1,20 +1,16 @@ package k8sutil import ( - "fmt" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/pkg/api" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" ) @@ -23,6 +19,7 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { if outOfCluster { return clientcmd.BuildConfigFromFlags("", kubeConfig) } + return rest.InClusterConfig() } @@ -38,35 +35,24 @@ func ResourceNotFound(err error) bool { return apierrors.IsNotFound(err) } -func KubernetesRestClient(c *rest.Config) (rest.Interface, error) { - c.GroupVersion = &schema.GroupVersion{Version: constants.K8sVersion} - c.APIPath = constants.K8sAPIPath - c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} - - schemeBuilder := runtime.NewSchemeBuilder( - func(scheme *runtime.Scheme) error { - scheme.AddKnownTypes( - schema.GroupVersion{ - Group: constants.TPRGroup, - Version: constants.TPRApiVersion, - }, - &spec.Postgresql{}, - &spec.PostgresqlList{}, - &meta_v1.ListOptions{}, - &meta_v1.DeleteOptions{}, - ) - return nil - }) - if err := schemeBuilder.AddToScheme(scheme.Scheme); err != nil { - return nil, fmt.Errorf("could not apply functions to register PostgreSQL TPR type: %v", err) +func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) { + cfg.GroupVersion = &schema.GroupVersion{ + Group: constants.TPRGroup, + Version: constants.TPRApiVersion, } + cfg.APIPath = constants.K8sAPIPath + cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs} - return rest.RESTClientFor(c) + return rest.RESTClientFor(&cfg) } func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { return retryutil.Retry(interval, timeout, func() (bool, error) { - _, err := restclient.Get().RequestURI(fmt.Sprintf(constants.ListClustersURITemplate, ns)).DoRaw() + _, err := restclient. + Get(). + Namespace(ns). + Resource(constants.ResourceName). + DoRaw() if err != nil { if ResourceNotFound(err) { // not set up yet. wait more. return false, nil