Merge pull request #117 from zalando-incubator/crd

Replace TPR with CRD
This commit is contained in:
Jan Mussler 2017-10-09 12:23:07 +02:00 committed by GitHub
commit d6393d46cb
15 changed files with 151 additions and 125 deletions

View File

@ -5,8 +5,8 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/zalando-incubator/postgres-operator)](https://goreportcard.com/report/github.com/zalando-incubator/postgres-operator)
The Postgres operator manages Postgres clusters in Kubernetes using the [operator pattern](https://coreos.com/blog/introducing-operators.html).
During the initial run it registers the [Third Party Resource (TPR)](https://kubernetes.io/docs/user-guide/thirdpartyresources/) for Postgres.
The Postgresql TPR is essentially the schema that describes the contents of the manifests for deploying individual Postgres clusters using Statefulsets and Patroni.
During the initial run it registers the [Custom Resource Definition (CRD)](https://kubernetes.io/docs/concepts/api-extension/custom-resources/#customresourcedefinitions) for Postgres.
The Postgresql CRD is essentially the schema that describes the contents of the manifests for deploying individual Postgres clusters using Statefulsets and Patroni.
Once the operator is running, it performs the following actions:
@ -127,12 +127,12 @@ The last line changes the docker image tag in the manifest to the one the operat
the serviceAccountName definition, as the ServiceAccount is not defined in minikube (neither it should, as one has admin
permissions there).
### Check if ThirdPartyResource has been registered
### Check if CustomResourceDefinition has been registered
$ kubectl --context minikube get thirdpartyresources
$ kubectl --context minikube get crd
NAME DESCRIPTION VERSION(S)
postgresql.acid.zalan.do Managed PostgreSQL clusters v1
NAME KIND
postgresqls.acid.zalan.do CustomResourceDefinition.v1beta1.apiextensions.k8s.io
### Create a new spilo cluster

23
glide.lock generated
View File

@ -1,8 +1,8 @@
hash: 285cea8ceeee9bfe82c59c750a1020922a77efa7a50d8217f58b1b328c8b256e
updated: 2017-09-01T11:31:43.738137376+02:00
hash: 42ffa063321a691ec1de30532989e66e81fb7a080d6d4867bbb2c9d7f2a008ce
updated: 2017-10-06T15:06:00.742579+02:00
imports:
- name: github.com/aws/aws-sdk-go
version: b79a722cb7aba0edd9bd2256361ae2e15e98f8ad
version: da415b5fa0ff3f91d4707348a8ea1be53f700c22
subpackages:
- aws
- aws/awserr
@ -51,7 +51,7 @@ imports:
- name: github.com/ghodss/yaml
version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee
- name: github.com/go-ini/ini
version: e7fea39b01aea8d5671f6858f0532f56e8bff3a5
version: c787282c39ac1fc618827141a1f762240def08a3
- name: github.com/go-openapi/analysis
version: b44dc874b601d9e4e2f6e19140e794ba24bead3b
- name: github.com/go-openapi/jsonpointer
@ -88,7 +88,7 @@ imports:
- name: github.com/kr/text
version: 7cafcd837844e784b526369c9bce262804aebc60
- name: github.com/lib/pq
version: 4a82388ebc5138c8289fe9bc602cb0b3e32cd617
version: b77235e3890a962fe8a6f8c4c7198679ca7814e7
subpackages:
- oid
- name: github.com/mailru/easyjson
@ -112,7 +112,7 @@ imports:
subpackages:
- codec
- name: golang.org/x/crypto
version: d172538b2cfce0c13cee31e647d0367aa8cd2486
version: 9419663f5a44be8b34ca85f08abc5fe1be11f8a3
subpackages:
- ssh/terminal
- name: golang.org/x/net
@ -126,6 +126,7 @@ imports:
version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9
subpackages:
- unix
- windows
- name: golang.org/x/text
version: 2910a502d2bf9e43193af9d68ca516529614eed3
subpackages:
@ -143,8 +144,16 @@ imports:
version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4
- name: gopkg.in/yaml.v2
version: 53feefa2559fb8dfa8d81baad31be332c97d6c77
- name: k8s.io/apiextensions-apiserver
version: fcd622fe88a4a6efcb5aea9e94ee87324ac1b036
subpackages:
- pkg/apis/apiextensions
- pkg/apis/apiextensions/v1beta1
- pkg/client/clientset/clientset
- pkg/client/clientset/clientset/scheme
- pkg/client/clientset/clientset/typed/apiextensions/v1beta1
- name: k8s.io/apimachinery
version: 1fd2e63a9a370677308a42f24fd40c86438afddf
version: 8ab5f3d8a330c2e9baaf84e39042db8d49034ae2
subpackages:
- pkg/api/equality
- pkg/api/errors

View File

@ -10,7 +10,11 @@ import:
- service/ec2
- package: github.com/lib/pq
- package: github.com/motomux/pretty
- package: k8s.io/apiextensions-apiserver
subpackages:
- pkg/client/clientset/clientset
- package: k8s.io/apimachinery
version: release-1.7
subpackages:
- pkg/api/errors
- pkg/api/meta

View File

@ -1,5 +1,5 @@
apiVersion: "acid.zalan.do/v1"
kind: "Postgresql"
kind: postgresql
metadata:
name: acid-testcluster

View File

@ -1,6 +1,6 @@
package cluster
// Postgres ThirdPartyResource object i.e. Spilo
// Postgres CustomResourceDefinition object i.e. Spilo
import (
"database/sql"
@ -130,8 +130,10 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) {
}
request := []byte(fmt.Sprintf(`{"status": %s}`, string(b))) //TODO: Look into/wait for k8s go client methods
_, err = c.KubeClient.RESTClient.Patch(types.MergePatchType).
RequestURI(c.GetSelfLink()).
_, err = c.KubeClient.CRDREST.Patch(types.MergePatchType).
Namespace(c.Namespace).
Resource(constants.CRDResource).
Name(c.Name).
Body(request).
DoRaw()

View File

@ -143,7 +143,7 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
return nil
}
// replaceStatefulSet deletes an old StatefulSet and creates the new using spec in the PostgreSQL TPR.
// replaceStatefulSet deletes an old StatefulSet and creates the new using spec in the PostgreSQL CRD.
func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
if c.Statefulset == nil {
return fmt.Errorf("there is no statefulset in the cluster")

View File

@ -338,8 +338,8 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st
return c.OpConfig.SecretNameTemplate.Format(
"username", strings.Replace(username, "_", "-", -1),
"clustername", clusterName,
"tprkind", constants.TPRKind,
"tprgroup", constants.TPRGroup)
"tprkind", constants.CRDKind,
"tprgroup", constants.CRDGroup)
}
func (c *Cluster) podSpiloRole(pod *v1.Pod) string {

View File

@ -8,7 +8,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"github.com/zalando-incubator/postgres-operator/pkg/apiserver"
@ -27,7 +26,6 @@ type Controller struct {
logger *logrus.Entry
KubeClient k8sutil.KubernetesClient
RestClient rest.Interface // kubernetes API group REST client
apiserver *apiserver.Server
stopCh chan struct{}
@ -69,15 +67,11 @@ func NewController(controllerConfig *spec.ControllerConfig) *Controller {
}
func (c *Controller) initClients() {
client, err := k8sutil.ClientSet(c.config.RestConfig)
if err != nil {
c.logger.Fatalf("couldn't create client: %v", err)
}
c.KubeClient = k8sutil.NewFromKubernetesInterface(client)
var err error
c.RestClient, err = k8sutil.KubernetesRestClient(*c.config.RestConfig)
c.KubeClient, err = k8sutil.NewFromConfig(c.config.RestConfig)
if err != nil {
c.logger.Fatalf("couldn't create rest client: %v", err)
c.logger.Fatalf("could not create kubernetes clients: %v", err)
}
}
@ -119,8 +113,8 @@ func (c *Controller) initController() {
c.logger.Logger.Level = logrus.DebugLevel
}
if err := c.createTPR(); err != nil {
c.logger.Fatalf("could not register ThirdPartyResource: %v", err)
if err := c.createCRD(); err != nil {
c.logger.Fatalf("could not register CustomResourceDefinition: %v", err)
}
if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil {

View File

@ -44,10 +44,10 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object
var list spec.PostgresqlList
var activeClustersCnt, failedClustersCnt int
req := c.RestClient.
req := c.KubeClient.CRDREST.
Get().
Namespace(c.opConfig.Namespace).
Resource(constants.ResourceName).
Resource(constants.CRDResource).
VersionedParams(&options, metav1.ParameterCodec)
b, err := req.DoRaw()
@ -109,10 +109,10 @@ func (d *tprDecoder) Decode() (action watch.EventType, object runtime.Object, er
func (c *Controller) clusterWatchFunc(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
r, err := c.RestClient.
r, err := c.KubeClient.CRDREST.
Get().
Namespace(c.opConfig.Namespace).
Resource(constants.ResourceName).
Resource(constants.CRDResource).
VersionedParams(&options, metav1.ParameterCodec).
FieldsSelectorParam(nil).
Stream()

View File

@ -4,9 +4,10 @@ import (
"fmt"
"hash/crc32"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/pkg/api/v1"
extv1beta "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"github.com/zalando-incubator/postgres-operator/pkg/cluster"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
@ -28,36 +29,59 @@ func (c *Controller) makeClusterConfig() cluster.Config {
}
}
func thirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource {
return &extv1beta.ThirdPartyResource{
ObjectMeta: metav1.ObjectMeta{
//ThirdPartyResources are cluster-wide
Name: TPRName,
},
Versions: []extv1beta.APIVersion{
{Name: constants.TPRApiVersion},
},
Description: constants.TPRDescription,
}
}
func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 {
return crc32.ChecksumIEEE([]byte(clusterName.String())) % c.opConfig.Workers
}
func (c *Controller) createTPR() error {
tpr := thirdPartyResource(constants.TPRName)
func (c *Controller) createCRD() error {
crd := &apiextv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: constants.CRDResource + "." + constants.CRDGroup,
},
Spec: apiextv1beta1.CustomResourceDefinitionSpec{
Group: constants.CRDGroup,
Version: constants.CRDApiVersion,
Names: apiextv1beta1.CustomResourceDefinitionNames{
Plural: constants.CRDResource,
Singular: constants.CRDKind,
ShortNames: []string{constants.CRDShort},
Kind: constants.CRDKind,
ListKind: constants.CRDKind + "List",
},
Scope: apiextv1beta1.NamespaceScoped,
},
}
if _, err := c.KubeClient.ThirdPartyResources().Create(tpr); err != nil {
if _, err := c.KubeClient.CustomResourceDefinitions().Create(crd); err != nil {
if !k8sutil.ResourceAlreadyExists(err) {
return err
return fmt.Errorf("could not create customResourceDefinition: %v", err)
}
c.logger.Infof("thirdPartyResource %q is already registered", constants.TPRName)
c.logger.Infof("customResourceDefinition %q is already registered", crd.Name)
} else {
c.logger.Infof("thirdPartyResource %q' has been registered", constants.TPRName)
c.logger.Infof("customResourceDefinition %q has been registered", crd.Name)
}
return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace)
return wait.Poll(c.opConfig.CRD.ReadyWaitInterval, c.opConfig.CRD.ReadyWaitTimeout, func() (bool, error) {
c, err := c.KubeClient.CustomResourceDefinitions().Get(crd.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
for _, cond := range c.Status.Conditions {
switch cond.Type {
case apiextv1beta1.Established:
if cond.Status == apiextv1beta1.ConditionTrue {
return true, err
}
case apiextv1beta1.NamesAccepted:
if cond.Status == apiextv1beta1.ConditionFalse {
return false, fmt.Errorf("name conflict: %v", cond.Reason)
}
}
}
return false, err
})
}
func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (result map[string]spec.PgUser, err error) {

View File

@ -73,7 +73,7 @@ const (
ClusterStatusInvalid PostgresStatus = "Invalid"
)
// Postgresql defines PostgreSQL Third Party (resource) Object.
// Postgresql defines PostgreSQL Custom Resource Definition Object.
type Postgresql struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`

View File

@ -8,8 +8,8 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/spec"
)
// TPR describes ThirdPartyResource specific configuration parameters
type TPR struct {
// CRD describes CustomResourceDefinition specific configuration parameters
type CRD struct {
ReadyWaitInterval time.Duration `name:"ready_wait_interval" default:"4s"`
ReadyWaitTimeout time.Duration `name:"ready_wait_timeout" default:"30s"`
ResyncPeriod time.Duration `name:"resync_period" default:"5m"`
@ -44,7 +44,7 @@ type Auth struct {
// Config describes operator config
type Config struct {
TPR
CRD
Resources
Auth
Namespace string `name:"namespace"`

10
pkg/util/constants/crd.go Normal file
View File

@ -0,0 +1,10 @@
package constants
// Different properties of the PostgreSQL Custom Resource Definition
const (
CRDKind = "postgresql"
CRDResource = "postgresqls"
CRDShort = "pg"
CRDGroup = "acid.zalan.do"
CRDApiVersion = "v1"
)

View File

@ -1,11 +0,0 @@
package constants
// Different properties of the PostgreSQL Third Party Resources
const (
TPRKind = "postgresql"
TPRGroup = "acid.zalan.do"
TPRDescription = "Managed PostgreSQL clusters"
TPRApiVersion = "v1"
TPRName = TPRKind + "." + TPRGroup
ResourceName = TPRKind + "s"
)

View File

@ -1,21 +1,21 @@
package k8sutil
import (
"time"
"fmt"
apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes"
v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
extensions "k8s.io/client-go/kubernetes/typed/extensions/v1beta1"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
)
// KubernetesClient describes getters for Kubernetes objects
@ -28,24 +28,10 @@ type KubernetesClient struct {
v1core.PersistentVolumeClaimsGetter
v1core.ConfigMapsGetter
v1beta1.StatefulSetsGetter
extensions.ThirdPartyResourcesGetter
RESTClient rest.Interface
}
apiextbeta1.CustomResourceDefinitionsGetter
// NewFromKubernetesInterface creates KubernetesClient from kubernetes Interface
func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) {
c = KubernetesClient{}
c.PodsGetter = src.CoreV1()
c.ServicesGetter = src.CoreV1()
c.EndpointsGetter = src.CoreV1()
c.SecretsGetter = src.CoreV1()
c.ConfigMapsGetter = src.CoreV1()
c.PersistentVolumeClaimsGetter = src.CoreV1()
c.PersistentVolumesGetter = src.CoreV1()
c.StatefulSetsGetter = src.AppsV1beta1()
c.ThirdPartyResourcesGetter = src.ExtensionsV1beta1()
c.RESTClient = src.CoreV1().RESTClient()
return
RESTClient rest.Interface
CRDREST rest.Interface
}
// RestConfig creates REST config
@ -57,11 +43,6 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) {
return rest.InClusterConfig()
}
// ClientSet creates clientset using REST config
func ClientSet(config *rest.Config) (client *kubernetes.Clientset, err error) {
return kubernetes.NewForConfig(config)
}
// ResourceAlreadyExists checks if error corresponds to Already exists error
func ResourceAlreadyExists(err error) bool {
return apierrors.IsAlreadyExists(err)
@ -72,32 +53,45 @@ func ResourceNotFound(err error) bool {
return apierrors.IsNotFound(err)
}
// KubernetesRestClient create kubernets Interface using REST config
func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) {
cfg.GroupVersion = &schema.GroupVersion{
Group: constants.TPRGroup,
Version: constants.TPRApiVersion,
}
cfg.APIPath = constants.K8sAPIPath
cfg.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
kubeClient := KubernetesClient{}
return rest.RESTClientFor(&cfg)
}
// WaitTPRReady waits until ThirdPartyResource is ready
func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error {
return retryutil.Retry(interval, timeout, func() (bool, error) {
_, err := restclient.
Get().
Namespace(ns).
Resource(constants.ResourceName).
DoRaw()
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
if ResourceNotFound(err) { // not set up yet. wait more.
return false, nil
return kubeClient, fmt.Errorf("could not get clientset: %v", err)
}
return false, err
kubeClient.PodsGetter = client.CoreV1()
kubeClient.ServicesGetter = client.CoreV1()
kubeClient.EndpointsGetter = client.CoreV1()
kubeClient.SecretsGetter = client.CoreV1()
kubeClient.ConfigMapsGetter = client.CoreV1()
kubeClient.PersistentVolumeClaimsGetter = client.CoreV1()
kubeClient.PersistentVolumesGetter = client.CoreV1()
kubeClient.StatefulSetsGetter = client.AppsV1beta1()
kubeClient.RESTClient = client.CoreV1().RESTClient()
cfg2 := *cfg
cfg2.GroupVersion = &schema.GroupVersion{
Group: constants.CRDGroup,
Version: constants.CRDApiVersion,
}
return true, nil
})
cfg2.APIPath = constants.K8sAPIPath
cfg2.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
crd, err := rest.RESTClientFor(&cfg2)
if err != nil {
return kubeClient, fmt.Errorf("could not get rest client: %v", err)
}
kubeClient.CRDREST = crd
apiextClient, err := apiextclient.NewForConfig(cfg)
if err != nil {
return kubeClient, fmt.Errorf("could not create api client:%v", err)
}
kubeClient.CustomResourceDefinitionsGetter = apiextClient.ApiextensionsV1beta1()
return kubeClient, nil
}