move from tpr to crd

This commit is contained in:
Murat Kabilov 2017-10-05 18:16:22 +02:00
parent bd9b0b613a
commit a35e9c6119
13 changed files with 141 additions and 115 deletions

23
glide.lock generated
View File

@ -1,8 +1,8 @@
hash: 285cea8ceeee9bfe82c59c750a1020922a77efa7a50d8217f58b1b328c8b256e hash: 42ffa063321a691ec1de30532989e66e81fb7a080d6d4867bbb2c9d7f2a008ce
updated: 2017-09-01T11:31:43.738137376+02:00 updated: 2017-10-06T15:06:00.742579+02:00
imports: imports:
- name: github.com/aws/aws-sdk-go - name: github.com/aws/aws-sdk-go
version: b79a722cb7aba0edd9bd2256361ae2e15e98f8ad version: da415b5fa0ff3f91d4707348a8ea1be53f700c22
subpackages: subpackages:
- aws - aws
- aws/awserr - aws/awserr
@ -51,7 +51,7 @@ imports:
- name: github.com/ghodss/yaml - name: github.com/ghodss/yaml
version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee
- name: github.com/go-ini/ini - name: github.com/go-ini/ini
version: e7fea39b01aea8d5671f6858f0532f56e8bff3a5 version: c787282c39ac1fc618827141a1f762240def08a3
- name: github.com/go-openapi/analysis - name: github.com/go-openapi/analysis
version: b44dc874b601d9e4e2f6e19140e794ba24bead3b version: b44dc874b601d9e4e2f6e19140e794ba24bead3b
- name: github.com/go-openapi/jsonpointer - name: github.com/go-openapi/jsonpointer
@ -88,7 +88,7 @@ imports:
- name: github.com/kr/text - name: github.com/kr/text
version: 7cafcd837844e784b526369c9bce262804aebc60 version: 7cafcd837844e784b526369c9bce262804aebc60
- name: github.com/lib/pq - name: github.com/lib/pq
version: 4a82388ebc5138c8289fe9bc602cb0b3e32cd617 version: b77235e3890a962fe8a6f8c4c7198679ca7814e7
subpackages: subpackages:
- oid - oid
- name: github.com/mailru/easyjson - name: github.com/mailru/easyjson
@ -112,7 +112,7 @@ imports:
subpackages: subpackages:
- codec - codec
- name: golang.org/x/crypto - name: golang.org/x/crypto
version: d172538b2cfce0c13cee31e647d0367aa8cd2486 version: 9419663f5a44be8b34ca85f08abc5fe1be11f8a3
subpackages: subpackages:
- ssh/terminal - ssh/terminal
- name: golang.org/x/net - name: golang.org/x/net
@ -126,6 +126,7 @@ imports:
version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9
subpackages: subpackages:
- unix - unix
- windows
- name: golang.org/x/text - name: golang.org/x/text
version: 2910a502d2bf9e43193af9d68ca516529614eed3 version: 2910a502d2bf9e43193af9d68ca516529614eed3
subpackages: subpackages:
@ -143,8 +144,16 @@ imports:
version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4
- name: gopkg.in/yaml.v2 - name: gopkg.in/yaml.v2
version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 version: 53feefa2559fb8dfa8d81baad31be332c97d6c77
- name: k8s.io/apiextensions-apiserver
version: fcd622fe88a4a6efcb5aea9e94ee87324ac1b036
subpackages:
- pkg/apis/apiextensions
- pkg/apis/apiextensions/v1beta1
- pkg/client/clientset/clientset
- pkg/client/clientset/clientset/scheme
- pkg/client/clientset/clientset/typed/apiextensions/v1beta1
- name: k8s.io/apimachinery - name: k8s.io/apimachinery
version: 1fd2e63a9a370677308a42f24fd40c86438afddf version: 8ab5f3d8a330c2e9baaf84e39042db8d49034ae2
subpackages: subpackages:
- pkg/api/equality - pkg/api/equality
- pkg/api/errors - pkg/api/errors

View File

@ -10,7 +10,11 @@ import:
- service/ec2 - service/ec2
- package: github.com/lib/pq - package: github.com/lib/pq
- package: github.com/motomux/pretty - package: github.com/motomux/pretty
- package: k8s.io/apiextensions-apiserver
subpackages:
- pkg/client/clientset/clientset
- package: k8s.io/apimachinery - package: k8s.io/apimachinery
version: release-1.7
subpackages: subpackages:
- pkg/api/errors - pkg/api/errors
- pkg/api/meta - pkg/api/meta

View File

@ -1,5 +1,5 @@
apiVersion: "acid.zalan.do/v1" apiVersion: "acid.zalan.do/v1"
kind: "Postgresql" kind: postgresql
metadata: metadata:
name: acid-testcluster name: acid-testcluster

View File

@ -1,6 +1,6 @@
package cluster package cluster
// Postgres ThirdPartyResource object i.e. Spilo // Postgres CustomResourceDefinition object i.e. Spilo
import ( import (
"database/sql" "database/sql"
@ -130,8 +130,10 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) {
} }
request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods
_, err = c.KubeClient.RESTClient.Patch(types.MergePatchType). _, err = c.KubeClient.CRDREST.Patch(types.MergePatchType).
RequestURI(c.GetSelfLink()). Namespace(c.Namespace).
Resource(constants.CRDResource).
Name(c.Name).
Body(request). Body(request).
DoRaw() DoRaw()

View File

@ -143,7 +143,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
return nil return nil
} }
// replaceStatefulSet deletes an old StatefulSet and creates the new using spec in the PostgreSQL TPR. // replaceStatefulSet deletes an old StatefulSet and creates the new using spec in the PostgreSQL CRD.
func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error { func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
if c.Statefulset == nil { if c.Statefulset == nil {
return fmt.Errorf("there is no statefulset in the cluster") return fmt.Errorf("there is no statefulset in the cluster")

View File

@ -338,8 +338,8 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st
return c.OpConfig.SecretNameTemplate.Format( return c.OpConfig.SecretNameTemplate.Format(
"username", strings.Replace(username, "_", "-", -1), "username", strings.Replace(username, "_", "-", -1),
"clustername", clusterName, "clustername", clusterName,
"tprkind", constants.TPRKind, "tprkind", constants.CRDKind,
"tprgroup", constants.TPRGroup) "tprgroup", constants.CRDGroup)
} }
func (c *Cluster) podSpiloRole(pod *v1.Pod) string { func (c *Cluster) podSpiloRole(pod *v1.Pod) string {

View File

@ -8,7 +8,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"github.com/zalando-incubator/postgres-operator/pkg/apiserver" "github.com/zalando-incubator/postgres-operator/pkg/apiserver"
@ -27,7 +26,6 @@ type Controller struct {
logger *logrus.Entry logger *logrus.Entry
KubeClient k8sutil.KubernetesClient KubeClient k8sutil.KubernetesClient
RestClient rest.Interface // kubernetes API group REST client
apiserver *apiserver.Server apiserver *apiserver.Server
stopCh chan struct{} stopCh chan struct{}
@ -69,15 +67,11 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller {
} }
func (c *Controller) initClients() { func (c *Controller) initClients() {
client, err := k8sutil.ClientSet(c.config.RestConfig) var err error
if err != nil {
c.logger.Fatalf("couldn't create client: %v", err)
}
c.KubeClient = k8sutil.NewFromKubernetesInterface(client)
c.RestClient, err = k8sutil.KubernetesRestClient(*c.config.RestConfig) c.KubeClient, err = k8sutil.NewFromConfig(c.config.RestConfig)
if err != nil { if err != nil {
c.logger.Fatalf("couldn't create rest client: %v", err) c.logger.Fatalf("could not create kubernetes clients: %v", err)
} }
} }
@ -119,8 +113,8 @@ func (c *Controller) initController() {
c.logger.Logger.Level = logrus.DebugLevel c.logger.Logger.Level = logrus.DebugLevel
} }
if err := c.createTPR(); err != nil { if err := c.createCRD(); err != nil {
c.logger.Fatalf("could not register ThirdPartyResource: %v", err) c.logger.Fatalf("could not register CustomResourceDefinition: %v", err)
} }
if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil { if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil {

View File

@ -44,10 +44,10 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object
var list spec.PostgresqlList var list spec.PostgresqlList
var activeClustersCnt, failedClustersCnt int var activeClustersCnt, failedClustersCnt int
req := c.RestClient. req := c.KubeClient.CRDREST.
Get(). Get().
Namespace(c.opConfig.Namespace). Namespace(c.opConfig.Namespace).
Resource(constants.ResourceName). Resource(constants.CRDResource).
VersionedParams(&options, metav1.ParameterCodec) VersionedParams(&options, metav1.ParameterCodec)
b, err := req.DoRaw() b, err := req.DoRaw()
@ -109,10 +109,10 @@ func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, er
func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interface, error) { func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true options.Watch = true
r, err := c.RestClient. r, err := c.KubeClient.CRDREST.
Get(). Get().
Namespace(c.opConfig.Namespace). Namespace(c.opConfig.Namespace).
Resource(constants.ResourceName). Resource(constants.CRDResource).
VersionedParams(&options, metav1.ParameterCodec). VersionedParams(&options, metav1.ParameterCodec).
FieldsSelectorParam(nil). FieldsSelectorParam(nil).
Stream() Stream()

View File

@ -4,9 +4,10 @@ import (
"fmt" "fmt"
"hash/crc32" "hash/crc32"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"github.com/zalando-incubator/postgres-operator/pkg/cluster" "github.com/zalando-incubator/postgres-operator/pkg/cluster"
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
@ -28,36 +29,59 @@ func (c *Controller) makeClusterConfig() cluster.Config {
} }
} }
func thirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource {
return &extv1beta.ThirdPartyResource{
ObjectMeta: metav1.ObjectMeta{
//ThirdPartyResources are cluster-wide
Name: TPRName,
},
Versions: []extv1beta.APIVersion{
{Name: constants.TPRApiVersion},
},
Description: constants.TPRDescription,
}
}
func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 { func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 {
return crc32.ChecksumIEEE([]byte(clusterName.String())) % c.opConfig.Workers return crc32.ChecksumIEEE([]byte(clusterName.String())) % c.opConfig.Workers
} }
func (c *Controller) createTPR() error { func (c *Controller) createCRD() error {
tpr := thirdPartyResource(constants.TPRName) crd := &apiextv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: constants.CRDResource + "." + constants.CRDGroup,
},
Spec: apiextv1beta1.CustomResourceDefinitionSpec{
Group: constants.CRDGroup,
Version: constants.CRDApiVersion,
Names: apiextv1beta1.CustomResourceDefinitionNames{
Plural: constants.CRDResource,
Singular: constants.CRDKind,
ShortNames: []string{constants.CRDShort},
Kind: constants.CRDKind,
ListKind: constants.CRDKind + "List",
},
Scope: apiextv1beta1.NamespaceScoped,
},
}
if _, err := c.KubeClient.ThirdPartyResources().Create(tpr); err != nil { if _, err := c.KubeClient.CustomResourceDefinitions().Create(crd); err != nil {
if !k8sutil.ResourceAlreadyExists(err) { if !k8sutil.ResourceAlreadyExists(err) {
return err return fmt.Errorf("could not create customResourceDefinition: %v", err)
} }
c.logger.Infof("thirdPartyResource %q is already registered", constants.TPRName) c.logger.Infof("customResourceDefinition %q is already registered", crd.Name)
} else { } else {
c.logger.Infof("thirdPartyResource %q' has been registered", constants.TPRName) c.logger.Infof("customResourceDefinition %q has been registered", crd.Name)
} }
return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace) return wait.Poll(c.opConfig.CRD.ReadyWaitInterval, c.opConfig.CRD.ReadyWaitTimeout, func() (bool, error) {
c, err := c.KubeClient.CustomResourceDefinitions().Get(crd.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
for _, cond := range c.Status.Conditions {
switch cond.Type {
case apiextv1beta1.Established:
if cond.Status == apiextv1beta1.ConditionTrue {
return true, err
}
case apiextv1beta1.NamesAccepted:
if cond.Status == apiextv1beta1.ConditionFalse {
return false, fmt.Errorf("name conflict: %v", cond.Reason)
}
}
}
return false, err
})
} }
func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (result map[string]spec.PgUser, err error) { func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (result map[string]spec.PgUser, err error) {

View File

@ -73,7 +73,7 @@ const (
ClusterStatusInvalid PostgresStatus = "Invalid" ClusterStatusInvalid PostgresStatus = "Invalid"
) )
// Postgresql defines PostgreSQL Third Party (resource) Object. // Postgresql defines PostgreSQL Custom Resource Definition Object.
type Postgresql struct { type Postgresql struct {
metav1.TypeMeta `json:",inline"` metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"` metav1.ObjectMeta `json:"metadata"`

View File

@ -8,8 +8,8 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
) )
// TPR describes ThirdPartyResource specific configuration parameters // CRD describes CustomResourceDefinition specific configuration parameters
type TPR struct { type CRD struct {
ReadyWaitInterval time.Duration `name:"ready_wait_interval" default:"4s"` ReadyWaitInterval time.Duration `name:"ready_wait_interval" default:"4s"`
ReadyWaitTimeout time.Duration `name:"ready_wait_timeout" default:"30s"` ReadyWaitTimeout time.Duration `name:"ready_wait_timeout" default:"30s"`
ResyncPeriod time.Duration `name:"resync_period" default:"5m"` ResyncPeriod time.Duration `name:"resync_period" default:"5m"`
@ -44,7 +44,7 @@ type Auth struct {
// Config describes operator config // Config describes operator config
type Config struct { type Config struct {
TPR CRD
Resources Resources
Auth Auth
Namespace string `name:"namespace"` Namespace string `name:"namespace"`

View File

@ -1,11 +1,10 @@
package constants package constants
// Different properties of the PostgreSQL Third Party Resources // Different properties of the PostgreSQL Custom Resource Definition
const ( const (
TPRKind = "postgresql" CRDKind = "postgresql"
TPRGroup = "acid.zalan.do" CRDResource = "postgresqls"
TPRDescription = "Managed PostgreSQL clusters" CRDShort = "pg"
TPRApiVersion = "v1" CRDGroup = "acid.zalan.do"
TPRName = TPRKind + "." + TPRGroup CRDApiVersion = "v1"
ResourceName = TPRKind + "s"
) )

View File

@ -1,21 +1,21 @@
package k8sutil package k8sutil
import ( import (
"time" "fmt"
apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1" v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
extensions "k8s.io/client-go/kubernetes/typed/extensions/v1beta1"
"k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
) )
// KubernetesClient describes getters for Kubernetes objects // KubernetesClient describes getters for Kubernetes objects
@ -28,24 +28,10 @@ type KubernetesClient struct {
v1core.PersistentVolumeClaimsGetter v1core.PersistentVolumeClaimsGetter
v1core.ConfigMapsGetter v1core.ConfigMapsGetter
v1beta1.StatefulSetsGetter v1beta1.StatefulSetsGetter
extensions.ThirdPartyResourcesGetter apiextbeta1.CustomResourceDefinitionsGetter
RESTClient rest.Interface
}
// NewFromKubernetesInterface creates KubernetesClient from kubernetes Interface RESTClient rest.Interface
func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { CRDREST rest.Interface
c = KubernetesClient{}
c.PodsGetter = src.CoreV1()
c.ServicesGetter = src.CoreV1()
c.EndpointsGetter = src.CoreV1()
c.SecretsGetter = src.CoreV1()
c.ConfigMapsGetter = src.CoreV1()
c.PersistentVolumeClaimsGetter = src.CoreV1()
c.PersistentVolumesGetter = src.CoreV1()
c.StatefulSetsGetter = src.AppsV1beta1()
c.ThirdPartyResourcesGetter = src.ExtensionsV1beta1()
c.RESTClient = src.CoreV1().RESTClient()
return
} }
// RestConfig creates REST config // RestConfig creates REST config
@ -57,11 +43,6 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) {
return rest.InClusterConfig() return rest.InClusterConfig()
} }
// ClientSet creates clientset using REST config
func ClientSet(config *rest.Config) (client *kubernetes.Clientset, err error) {
return kubernetes.NewForConfig(config)
}
// ResourceAlreadyExists checks if error corresponds to Already exists error // ResourceAlreadyExists checks if error corresponds to Already exists error
func ResourceAlreadyExists(err error) bool { func ResourceAlreadyExists(err error) bool {
return apierrors.IsAlreadyExists(err) return apierrors.IsAlreadyExists(err)
@ -72,32 +53,45 @@ func ResourceNotFound(err error) bool {
return apierrors.IsNotFound(err) return apierrors.IsNotFound(err)
} }
// KubernetesRestClient create kubernets Interface using REST config func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) { kubeClient := KubernetesClient{}
cfg.GroupVersion = &schema.GroupVersion{
Group: constants.TPRGroup,
Version: constants.TPRApiVersion,
}
cfg.APIPath = constants.K8sAPIPath
cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
return rest.RESTClientFor(&cfg) client, err := kubernetes.NewForConfig(cfg)
}
// WaitTPRReady waits until ThirdPartyResource is ready
func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error {
return retryutil.Retry(interval, timeout, func() (bool, error) {
_, err := restclient.
Get().
Namespace(ns).
Resource(constants.ResourceName).
DoRaw()
if err != nil { if err != nil {
if ResourceNotFound(err) { // not set up yet. wait more. return kubeClient, fmt.Errorf("could not get clientset: %v", err)
return false, nil
} }
return false, err
kubeClient.PodsGetter = client.CoreV1()
kubeClient.ServicesGetter = client.CoreV1()
kubeClient.EndpointsGetter = client.CoreV1()
kubeClient.SecretsGetter = client.CoreV1()
kubeClient.ConfigMapsGetter = client.CoreV1()
kubeClient.PersistentVolumeClaimsGetter = client.CoreV1()
kubeClient.PersistentVolumesGetter = client.CoreV1()
kubeClient.StatefulSetsGetter = client.AppsV1beta1()
kubeClient.RESTClient = client.CoreV1().RESTClient()
cfg2 := *cfg
cfg2.GroupVersion = &schema.GroupVersion{
Group: constants.CRDGroup,
Version: constants.CRDApiVersion,
} }
return true, nil cfg2.APIPath = constants.K8sAPIPath
}) cfg2.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
crd, err := rest.RESTClientFor(&cfg2)
if err != nil {
return kubeClient, fmt.Errorf("could not get rest client: %v", err)
}
kubeClient.CRDREST = crd
apiextClient, err := apiextclient.NewForConfig(cfg)
if err != nil {
return kubeClient, fmt.Errorf("could not create api client:%v", err)
}
kubeClient.CustomResourceDefinitionsGetter = apiextClient.ApiextensionsV1beta1()
return kubeClient, nil
} }