Merge branch 'master' into feature/refactor-tpr
# Conflicts: # cmd/main.go # pkg/cluster/cluster.go # pkg/controller/controller.go # pkg/controller/util.go
This commit is contained in:
commit
035069137a
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/pkg/api"
|
||||||
"k8s.io/client-go/pkg/api/v1"
|
"k8s.io/client-go/pkg/api/v1"
|
||||||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
|
|
@ -67,7 +67,7 @@ type Cluster struct {
|
||||||
podEventsQueue *cache.FIFO
|
podEventsQueue *cache.FIFO
|
||||||
|
|
||||||
teamsAPIClient *teams.API
|
teamsAPIClient *teams.API
|
||||||
KubeClient *kubernetes.Clientset //TODO: move clients to the better place?
|
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
|
||||||
}
|
}
|
||||||
|
|
||||||
type compareStatefulsetResult struct {
|
type compareStatefulsetResult struct {
|
||||||
|
|
@ -78,7 +78,7 @@ type compareStatefulsetResult struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new cluster. This function should be called from a controller.
|
// New creates a new cluster. This function should be called from a controller.
|
||||||
func New(cfg Config, kubeClient *kubernetes.Clientset, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
|
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
|
||||||
lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name)
|
lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name)
|
||||||
kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)}
|
kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)}
|
||||||
orphanDependents := true
|
orphanDependents := true
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ type Config struct {
|
||||||
|
|
||||||
type Controller struct {
|
type Controller struct {
|
||||||
Config
|
Config
|
||||||
|
|
||||||
opConfig *config.Config
|
opConfig *config.Config
|
||||||
logger *logrus.Entry
|
logger *logrus.Entry
|
||||||
|
|
||||||
|
|
@ -51,7 +52,7 @@ type Controller struct {
|
||||||
lastClusterSyncTime int64
|
lastClusterSyncTime int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(controllerConfig *Config) *Controller {
|
func NewController(controllerConfig *Config) *Controller {
|
||||||
configMapData := make(map[string]string)
|
configMapData := make(map[string]string)
|
||||||
logger := logrus.New()
|
logger := logrus.New()
|
||||||
|
|
||||||
|
|
@ -126,7 +127,7 @@ func (c *Controller) initController() {
|
||||||
c.logger.Fatalf("could not register ThirdPartyResource: %v", err)
|
c.logger.Fatalf("could not register ThirdPartyResource: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if infraRoles, err := c.getInfrastructureRoles(); err != nil {
|
if infraRoles, err := c.getInfrastructureRoles(&c.opConfig.InfrastructureRolesSecretName); err != nil {
|
||||||
c.logger.Warningf("could not get infrastructure roles: %v", err)
|
c.logger.Warningf("could not get infrastructure roles: %v", err)
|
||||||
} else {
|
} else {
|
||||||
c.InfrastructureRoles = infraRoles
|
c.InfrastructureRoles = infraRoles
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ func (c *Controller) podListFunc(options meta_v1.ListOptions) (runtime.Object, e
|
||||||
TimeoutSeconds: options.TimeoutSeconds,
|
TimeoutSeconds: options.TimeoutSeconds,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.KubeClient.CoreV1().Pods(c.opConfig.Namespace).List(opts)
|
return c.KubeClient.Pods(c.opConfig.Namespace).List(opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) podWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) {
|
func (c *Controller) podWatchFunc(options meta_v1.ListOptions) (watch.Interface, error) {
|
||||||
|
|
@ -37,7 +37,7 @@ func (c *Controller) podWatchFunc(options meta_v1.ListOptions) (watch.Interface,
|
||||||
TimeoutSeconds: options.TimeoutSeconds,
|
TimeoutSeconds: options.TimeoutSeconds,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.KubeClient.CoreV1Client.Pods(c.opConfig.Namespace).Watch(opts)
|
return c.KubeClient.Pods(c.opConfig.Namespace).Watch(opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) podAdd(obj interface{}) {
|
func (c *Controller) podAdd(obj interface{}) {
|
||||||
|
|
|
||||||
|
|
@ -61,17 +61,17 @@ func (c *Controller) createTPR() error {
|
||||||
return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace)
|
return k8sutil.WaitTPRReady(c.RestClient, c.opConfig.TPR.ReadyWaitInterval, c.opConfig.TPR.ReadyWaitTimeout, c.opConfig.Namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) getInfrastructureRoles() (result map[string]spec.PgUser, err error) {
|
func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (result map[string]spec.PgUser, err error) {
|
||||||
if c.opConfig.InfrastructureRolesSecretName == (spec.NamespacedName{}) {
|
if *rolesSecret == (spec.NamespacedName{}) {
|
||||||
// we don't have infrastructure roles defined, bail out
|
// we don't have infrastructure roles defined, bail out
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
infraRolesSecret, err := c.KubeClient.
|
infraRolesSecret, err := c.KubeClient.
|
||||||
Secrets(c.opConfig.InfrastructureRolesSecretName.Namespace).
|
Secrets(rolesSecret.Namespace).
|
||||||
Get(c.opConfig.InfrastructureRolesSecretName.Name, meta_v1.GetOptions{})
|
Get(rolesSecret.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Debugf("Infrastructure roles secret name: %q", c.opConfig.InfrastructureRolesSecretName)
|
c.logger.Debugf("Infrastructure roles secret name: %q", *rolesSecret)
|
||||||
return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err)
|
return nil, fmt.Errorf("could not get infrastructure roles secret: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,154 @@
|
||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
|
"k8s.io/client-go/pkg/api/v1"
|
||||||
|
|
||||||
|
"github.com/zalando-incubator/postgres-operator/pkg/spec"
|
||||||
|
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
|
||||||
|
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
testInfrastructureRolesSecretName = "infrastructureroles-test"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockSecret struct {
|
||||||
|
v1core.SecretInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockSecret) Get(name string) (*v1.Secret, error) {
|
||||||
|
if name != testInfrastructureRolesSecretName {
|
||||||
|
return nil, fmt.Errorf("NotFound")
|
||||||
|
}
|
||||||
|
secret := &v1.Secret{}
|
||||||
|
secret.Name = mockController.opConfig.ClusterNameLabel
|
||||||
|
secret.Data = map[string][]byte{
|
||||||
|
"user1": []byte("testrole"),
|
||||||
|
"password1": []byte("testpassword"),
|
||||||
|
"inrole1": []byte("testinrole"),
|
||||||
|
}
|
||||||
|
return secret, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
type MockSecretGetter struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *MockSecretGetter) Secrets(namespace string) v1core.SecretInterface {
|
||||||
|
return &mockSecret{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockKubernetesClient() k8sutil.KubernetesClient {
|
||||||
|
return k8sutil.KubernetesClient{SecretsGetter: &MockSecretGetter{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockController() *Controller {
|
||||||
|
controller := NewController(&Config{}, &config.Config{})
|
||||||
|
controller.opConfig.ClusterNameLabel = "cluster-name"
|
||||||
|
controller.opConfig.InfrastructureRolesSecretName =
|
||||||
|
spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName}
|
||||||
|
controller.opConfig.Workers = 4
|
||||||
|
controller.KubeClient = newMockKubernetesClient()
|
||||||
|
return controller
|
||||||
|
}
|
||||||
|
|
||||||
|
var mockController = newMockController()
|
||||||
|
|
||||||
|
func TestPodClusterName(t *testing.T) {
|
||||||
|
var testTable = []struct {
|
||||||
|
in *v1.Pod
|
||||||
|
expected spec.NamespacedName
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
&v1.Pod{},
|
||||||
|
spec.NamespacedName{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
&v1.Pod{
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
Namespace: v1.NamespaceDefault,
|
||||||
|
Labels: map[string]string{
|
||||||
|
mockController.opConfig.ClusterNameLabel: "testcluster",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
spec.NamespacedName{v1.NamespaceDefault, "testcluster"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range testTable {
|
||||||
|
resp := mockController.podClusterName(test.in)
|
||||||
|
if resp != test.expected {
|
||||||
|
t.Errorf("expected response %v does not match the actual %v", test.expected, resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClusterWorkerID(t *testing.T) {
|
||||||
|
var testTable = []struct {
|
||||||
|
in spec.NamespacedName
|
||||||
|
expected uint32
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
in: spec.NamespacedName{"foo", "bar"},
|
||||||
|
expected: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
in: spec.NamespacedName{"default", "testcluster"},
|
||||||
|
expected: 3,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range testTable {
|
||||||
|
resp := mockController.clusterWorkerID(test.in)
|
||||||
|
if resp != test.expected {
|
||||||
|
t.Errorf("expected response %v does not match the actual %v", test.expected, resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetInfrastructureRoles(t *testing.T) {
|
||||||
|
var testTable = []struct {
|
||||||
|
secretName spec.NamespacedName
|
||||||
|
expectedRoles map[string]spec.PgUser
|
||||||
|
expectedError error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
spec.NamespacedName{},
|
||||||
|
nil,
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
spec.NamespacedName{v1.NamespaceDefault, "null"},
|
||||||
|
nil,
|
||||||
|
fmt.Errorf(`could not get infrastructure roles secret: NotFound`),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName},
|
||||||
|
map[string]spec.PgUser{
|
||||||
|
"testrole": {
|
||||||
|
"testrole",
|
||||||
|
"testpassword",
|
||||||
|
nil,
|
||||||
|
[]string{"testinrole"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range testTable {
|
||||||
|
roles, err := mockController.getInfrastructureRoles(&test.secretName)
|
||||||
|
if err != test.expectedError {
|
||||||
|
if err != nil && test.expectedError != nil && err.Error() == test.expectedError.Error() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.Errorf("expected error '%v' does not match the actual error '%v'", test.expectedError, err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(roles, test.expectedRoles) {
|
||||||
|
t.Errorf("expected roles output %v does not match the actual %v", test.expectedRoles, roles)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,7 +3,6 @@ package spec
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,9 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
v1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1"
|
||||||
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
|
extensions "k8s.io/client-go/kubernetes/typed/extensions/v1beta1"
|
||||||
"k8s.io/client-go/pkg/api"
|
"k8s.io/client-go/pkg/api"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
|
|
@ -15,6 +18,32 @@ import (
|
||||||
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
|
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type KubernetesClient struct {
|
||||||
|
v1core.SecretsGetter
|
||||||
|
v1core.ServicesGetter
|
||||||
|
v1core.EndpointsGetter
|
||||||
|
v1core.PodsGetter
|
||||||
|
v1core.PersistentVolumesGetter
|
||||||
|
v1core.PersistentVolumeClaimsGetter
|
||||||
|
v1core.ConfigMapsGetter
|
||||||
|
v1beta1.StatefulSetsGetter
|
||||||
|
extensions.ThirdPartyResourcesGetter
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) {
|
||||||
|
c = KubernetesClient{}
|
||||||
|
c.PodsGetter = src.CoreV1()
|
||||||
|
c.ServicesGetter = src.CoreV1()
|
||||||
|
c.EndpointsGetter = src.CoreV1()
|
||||||
|
c.SecretsGetter = src.CoreV1()
|
||||||
|
c.ConfigMapsGetter = src.CoreV1()
|
||||||
|
c.PersistentVolumeClaimsGetter = src.CoreV1()
|
||||||
|
c.PersistentVolumesGetter = src.CoreV1()
|
||||||
|
c.StatefulSetsGetter = src.AppsV1beta1()
|
||||||
|
c.ThirdPartyResourcesGetter = src.ExtensionsV1beta1()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) {
|
func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) {
|
||||||
if outOfCluster {
|
if outOfCluster {
|
||||||
return clientcmd.BuildConfigFromFlags("", kubeConfig)
|
return clientcmd.BuildConfigFromFlags("", kubeConfig)
|
||||||
|
|
@ -23,7 +52,7 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) {
|
||||||
return rest.InClusterConfig()
|
return rest.InClusterConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
func KubernetesClient(config *rest.Config) (client *kubernetes.Clientset, err error) {
|
func ClientSet(config *rest.Config) (client *kubernetes.Clientset, err error) {
|
||||||
return kubernetes.NewForConfig(config)
|
return kubernetes.NewForConfig(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue