Fix golint warnings

This commit is contained in:
Murat Kabilov 2017-08-01 16:08:56 +02:00 committed by GitHub
parent c02a740e10
commit cf663cb841
26 changed files with 122 additions and 89 deletions

View File

@ -41,7 +41,7 @@ type Config struct {
} }
type kubeResources struct { type kubeResources struct {
Service map[PostgresRole]*v1.Service Service map[postgresRole]*v1.Service
Endpoint *v1.Endpoints Endpoint *v1.Endpoints
Secrets map[types.UID]*v1.Secret Secrets map[types.UID]*v1.Secret
Statefulset *v1beta1.StatefulSet Statefulset *v1beta1.StatefulSet
@ -49,6 +49,7 @@ type kubeResources struct {
//PVCs are treated separately //PVCs are treated separately
} }
// Cluster describes postgresql cluster
type Cluster struct { type Cluster struct {
kubeResources kubeResources
spec.Postgresql spec.Postgresql
@ -79,7 +80,7 @@ type compareStatefulsetResult struct {
// New creates a new cluster. This function should be called from a controller. // New creates a new cluster. This function should be called from a controller.
func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster {
lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Name) lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Name)
kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} k8sResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[postgresRole]*v1.Service)}
orphanDependents := true orphanDependents := true
podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) {
@ -98,7 +99,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql
pgUsers: make(map[string]spec.PgUser), pgUsers: make(map[string]spec.PgUser),
systemUsers: make(map[string]spec.PgUser), systemUsers: make(map[string]spec.PgUser),
podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent), podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent),
kubeResources: kubeResources, kubeResources: k8sResources,
masterLess: false, masterLess: false,
userSyncStrategy: users.DefaultUserSyncStrategy{}, userSyncStrategy: users.DefaultUserSyncStrategy{},
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
@ -164,7 +165,13 @@ func (c *Cluster) initUsers() error {
func (c *Cluster) Create() error { func (c *Cluster) Create() error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
var err error var (
err error
service *v1.Service
ep *v1.Endpoints
ss *v1beta1.StatefulSet
)
defer func() { defer func() {
if err == nil { if err == nil {
@ -177,17 +184,17 @@ func (c *Cluster) Create() error {
c.setStatus(spec.ClusterStatusCreating) c.setStatus(spec.ClusterStatusCreating)
//TODO: service will create endpoint implicitly //TODO: service will create endpoint implicitly
ep, err := c.createEndpoint() ep, err = c.createEndpoint()
if err != nil { if err != nil {
return fmt.Errorf("could not create endpoint: %v", err) return fmt.Errorf("could not create endpoint: %v", err)
} }
c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta)) c.logger.Infof("endpoint %q has been successfully created", util.NameFromMeta(ep.ObjectMeta))
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []postgresRole{master, replica} {
if role == Replica && !c.Spec.ReplicaLoadBalancer { if role == replica && !c.Spec.ReplicaLoadBalancer {
continue continue
} }
service, err := c.createService(role) service, err = c.createService(role)
if err != nil { if err != nil {
return fmt.Errorf("could not create %s service: %v", role, err) return fmt.Errorf("could not create %s service: %v", role, err)
} }
@ -204,7 +211,7 @@ func (c *Cluster) Create() error {
} }
c.logger.Infof("secrets have been successfully created") c.logger.Infof("secrets have been successfully created")
ss, err := c.createStatefulSet() ss, err = c.createStatefulSet()
if err != nil { if err != nil {
return fmt.Errorf("could not create statefulset: %v", err) return fmt.Errorf("could not create statefulset: %v", err)
} }
@ -219,10 +226,12 @@ func (c *Cluster) Create() error {
c.logger.Infof("pods are ready") c.logger.Infof("pods are ready")
if !(c.masterLess || c.databaseAccessDisabled()) { if !(c.masterLess || c.databaseAccessDisabled()) {
if err := c.initDbConn(); err != nil { err = c.initDbConn()
if err != nil {
return fmt.Errorf("could not init db connection: %v", err) return fmt.Errorf("could not init db connection: %v", err)
} }
if err = c.createUsers(); err != nil { err = c.createUsers()
if err != nil {
return fmt.Errorf("could not create users: %v", err) return fmt.Errorf("could not create users: %v", err)
} }
c.logger.Infof("Users have been successfully created") c.logger.Infof("Users have been successfully created")
@ -240,9 +249,8 @@ func (c *Cluster) Create() error {
return nil return nil
} }
func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match bool, reason string) { func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match bool, reason string) {
//TODO: improve comparison //TODO: improve comparison
match = true
if c.Service[role].Spec.Type != service.Spec.Type { if c.Service[role].Spec.Type != service.Spec.Type {
return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q", return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q",
role, service.Spec.Type, c.Service[role].Spec.Type) role, service.Spec.Type, c.Service[role].Spec.Type)
@ -414,8 +422,8 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
c.Postgresql = *newSpec c.Postgresql = *newSpec
}() }()
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []postgresRole{master, replica} {
if role == Replica { if role == replica {
if !newSpec.Spec.ReplicaLoadBalancer { if !newSpec.Spec.ReplicaLoadBalancer {
// old spec had a load balancer, but the new one doesn't // old spec had a load balancer, but the new one doesn't
if c.Spec.ReplicaLoadBalancer { if c.Spec.ReplicaLoadBalancer {
@ -513,8 +521,8 @@ func (c *Cluster) Delete() error {
return fmt.Errorf("could not delete endpoint: %v", err) return fmt.Errorf("could not delete endpoint: %v", err)
} }
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []postgresRole{master, replica} {
if role == Replica && !c.Spec.ReplicaLoadBalancer { if role == replica && !c.Spec.ReplicaLoadBalancer {
continue continue
} }
if err := c.deleteService(role); err != nil { if err := c.deleteService(role); err != nil {

View File

@ -13,6 +13,7 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
) )
//ExecCommand executes arbitrary command inside the pod
func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (string, error) { func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (string, error) {
var ( var (
execOut bytes.Buffer execOut bytes.Buffer

View File

@ -427,12 +427,12 @@ func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser)
return &secret return &secret
} }
func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec) *v1.Service { func (c *Cluster) generateService(role postgresRole, newSpec *spec.PostgresSpec) *v1.Service {
dnsNameFunction := c.masterDnsName dnsNameFunction := c.masterDNSName
name := c.Name name := c.Name
if role == Replica { if role == replica {
dnsNameFunction = c.replicaDnsName dnsNameFunction = c.replicaDNSName
name = name + "-repl" name = name + "-repl"
} }
@ -441,8 +441,8 @@ func (c *Cluster) generateService(role PostgresRole, newSpec *spec.PostgresSpec)
Type: v1.ServiceTypeClusterIP, Type: v1.ServiceTypeClusterIP,
} }
if role == Replica { if role == replica {
serviceSpec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)} serviceSpec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(replica)}
} }
var annotations map[string]string var annotations map[string]string
@ -486,7 +486,7 @@ func (c *Cluster) generateMasterEndpoints(subsets []v1.EndpointSubset) *v1.Endpo
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: c.Name, Name: c.Name,
Namespace: c.Namespace, Namespace: c.Namespace,
Labels: c.roleLabelsSet(Master), Labels: c.roleLabelsSet(master),
}, },
} }
if len(subsets) > 0 { if len(subsets) > 0 {

View File

@ -49,7 +49,7 @@ func (c *Cluster) initDbConn() (err error) {
err = conn.Ping() err = conn.Ping()
if err != nil { if err != nil {
if err2 := conn.Close(); err2 != nil { if err2 := conn.Close(); err2 != nil {
c.logger.Error("error when closing PostgreSQL connection after another error: %v", err2) c.logger.Errorf("error when closing PostgreSQL connection after another error: %v", err2)
} }
return err return err
} }

View File

@ -29,11 +29,11 @@ func (c *Cluster) loadResources() error {
return fmt.Errorf("too many(%d) services for a cluster", len(services.Items)) return fmt.Errorf("too many(%d) services for a cluster", len(services.Items))
} }
for i, svc := range services.Items { for i, svc := range services.Items {
switch PostgresRole(svc.Labels[c.OpConfig.PodRoleLabel]) { switch postgresRole(svc.Labels[c.OpConfig.PodRoleLabel]) {
case Replica: case replica:
c.Service[Replica] = &services.Items[i] c.Service[replica] = &services.Items[i]
default: default:
c.Service[Master] = &services.Items[i] c.Service[master] = &services.Items[i]
} }
} }
@ -46,7 +46,7 @@ func (c *Cluster) loadResources() error {
} }
for i, ep := range endpoints.Items { for i, ep := range endpoints.Items {
if ep.Labels[c.OpConfig.PodRoleLabel] != string(Replica) { if ep.Labels[c.OpConfig.PodRoleLabel] != string(replica) {
c.Endpoint = &endpoints.Items[i] c.Endpoint = &endpoints.Items[i]
break break
} }
@ -230,7 +230,7 @@ func (c *Cluster) deleteStatefulSet() error {
return nil return nil
} }
func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { func (c *Cluster) createService(role postgresRole) (*v1.Service, error) {
if c.Service[role] != nil { if c.Service[role] != nil {
return nil, fmt.Errorf("service already exists in the cluster") return nil, fmt.Errorf("service already exists in the cluster")
} }
@ -245,7 +245,7 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) {
return service, nil return service, nil
} }
func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error { func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error {
if c.Service[role] == nil { if c.Service[role] == nil {
return fmt.Errorf("there is no service in the cluster") return fmt.Errorf("there is no service in the cluster")
} }
@ -260,7 +260,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
err error err error
) )
if role == Master { if role == master {
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of // for the master service we need to re-create the endpoint as well. Get the up-to-date version of
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint) // the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name, metav1.GetOptions{}) currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name, metav1.GetOptions{})
@ -278,7 +278,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
return fmt.Errorf("could not create service %q: %v", serviceName, err) return fmt.Errorf("could not create service %q: %v", serviceName, err)
} }
c.Service[role] = svc c.Service[role] = svc
if role == Master { if role == master {
// create the new endpoint using the addresses obtained from the previous one // create the new endpoint using the addresses obtained from the previous one
endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets) endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets)
ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec) ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec)
@ -320,7 +320,7 @@ func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error
return nil return nil
} }
func (c *Cluster) deleteService(role PostgresRole) error { func (c *Cluster) deleteService(role postgresRole) error {
c.logger.Debugf("Deleting service %s", role) c.logger.Debugf("Deleting service %s", role)
if c.Service[role] == nil { if c.Service[role] == nil {
return fmt.Errorf("There is no %s service in the cluster", role) return fmt.Errorf("There is no %s service in the cluster", role)

View File

@ -34,8 +34,8 @@ func (c *Cluster) Sync() error {
} }
c.logger.Debugf("Syncing services") c.logger.Debugf("Syncing services")
for _, role := range []PostgresRole{Master, Replica} { for _, role := range []postgresRole{master, replica} {
if role == Replica && !c.Spec.ReplicaLoadBalancer { if role == replica && !c.Spec.ReplicaLoadBalancer {
if c.Service[role] != nil { if c.Service[role] != nil {
// delete the left over replica service // delete the left over replica service
if err := c.deleteService(role); err != nil { if err := c.deleteService(role); err != nil {
@ -87,7 +87,7 @@ func (c *Cluster) syncSecrets() error {
return err return err
} }
func (c *Cluster) syncService(role PostgresRole) error { func (c *Cluster) syncService(role postgresRole) error {
cSpec := c.Spec cSpec := c.Spec
if c.Service[role] == nil { if c.Service[role] == nil {
c.logger.Infof("could not find the cluster's %s service", role) c.logger.Infof("could not find the cluster's %s service", role)

View File

@ -1,8 +1,8 @@
package cluster package cluster
type PostgresRole string type postgresRole string
const ( const (
Master PostgresRole = "master" master postgresRole = "master"
Replica PostgresRole = "replica" replica postgresRole = "replica"
) )

View File

@ -94,7 +94,7 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate
} }
} }
func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) { func (c *Cluster) logServiceChanges(role postgresRole, old, new *v1.Service, isUpdate bool, reason string) {
if isUpdate { if isUpdate {
c.logger.Infof("%s service %q has been changed", c.logger.Infof("%s service %q has been changed",
role, util.NameFromMeta(old.ObjectMeta), role, util.NameFromMeta(old.ObjectMeta),
@ -283,20 +283,20 @@ func (c *Cluster) labelsSet() labels.Set {
return labels.Set(lbls) return labels.Set(lbls)
} }
func (c *Cluster) roleLabelsSet(role PostgresRole) labels.Set { func (c *Cluster) roleLabelsSet(role postgresRole) labels.Set {
lbls := c.labelsSet() lbls := c.labelsSet()
lbls[c.OpConfig.PodRoleLabel] = string(role) lbls[c.OpConfig.PodRoleLabel] = string(role)
return lbls return lbls
} }
func (c *Cluster) masterDnsName() string { func (c *Cluster) masterDNSName() string {
return strings.ToLower(c.OpConfig.MasterDNSNameFormat.Format( return strings.ToLower(c.OpConfig.MasterDNSNameFormat.Format(
"cluster", c.Spec.ClusterName, "cluster", c.Spec.ClusterName,
"team", c.teamName(), "team", c.teamName(),
"hostedzone", c.OpConfig.DbHostedZone)) "hostedzone", c.OpConfig.DbHostedZone))
} }
func (c *Cluster) replicaDnsName() string { func (c *Cluster) replicaDNSName() string {
return strings.ToLower(c.OpConfig.ReplicaDNSNameFormat.Format( return strings.ToLower(c.OpConfig.ReplicaDNSNameFormat.Format(
"cluster", c.Spec.ClusterName, "cluster", c.Spec.ClusterName,
"team", c.teamName(), "team", c.teamName(),

View File

@ -103,7 +103,7 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume
if !resizer.VolumeBelongsToProvider(pv) { if !resizer.VolumeBelongsToProvider(pv) {
continue continue
} }
totalCompatible += 1 totalCompatible++
if !resizer.IsConnectedToProvider() { if !resizer.IsConnectedToProvider() {
err := resizer.ConnectToProvider() err := resizer.ConnectToProvider()
if err != nil { if err != nil {
@ -115,13 +115,13 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume
} }
}() }()
} }
awsVolumeId, err := resizer.GetProviderVolumeID(pv) awsVolumeID, err := resizer.GetProviderVolumeID(pv)
if err != nil { if err != nil {
return err return err
} }
c.logger.Debugf("updating persistent volume %q to %d", pv.Name, newSize) c.logger.Debugf("updating persistent volume %q to %d", pv.Name, newSize)
if err := resizer.ResizeVolume(awsVolumeId, newSize); err != nil { if err := resizer.ResizeVolume(awsVolumeID, newSize); err != nil {
return fmt.Errorf("could not resize EBS volume %q: %v", awsVolumeId, err) return fmt.Errorf("could not resize EBS volume %q: %v", awsVolumeID, err)
} }
c.logger.Debugf("resizing the filesystem on the volume %q", pv.Name) c.logger.Debugf("resizing the filesystem on the volume %q", pv.Name)
podName := getPodNameFromPersistentVolume(pv) podName := getPodNameFromPersistentVolume(pv)
@ -174,7 +174,7 @@ func (c *Cluster) listVolumesWithManifestSize(newVolume spec.Volume) ([]*v1.Pers
func getPodNameFromPersistentVolume(pv *v1.PersistentVolume) *spec.NamespacedName { func getPodNameFromPersistentVolume(pv *v1.PersistentVolume) *spec.NamespacedName {
namespace := pv.Spec.ClaimRef.Namespace namespace := pv.Spec.ClaimRef.Namespace
name := pv.Spec.ClaimRef.Name[len(constants.DataVolumeName)+1:] name := pv.Spec.ClaimRef.Name[len(constants.DataVolumeName)+1:]
return &spec.NamespacedName{namespace, name} return &spec.NamespacedName{Namespace: namespace, Name: name}
} }
func quantityToGigabyte(q resource.Quantity) int64 { func quantityToGigabyte(q resource.Quantity) int64 {

View File

@ -17,6 +17,7 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil" "github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
) )
// Config describes configuration of the controller
type Config struct { type Config struct {
RestConfig *rest.Config RestConfig *rest.Config
InfrastructureRoles map[string]spec.PgUser InfrastructureRoles map[string]spec.PgUser
@ -27,6 +28,7 @@ type Config struct {
Namespace string Namespace string
} }
// Controller represents operator controller
type Controller struct { type Controller struct {
config Config config Config
opConfig *config.Config opConfig *config.Config
@ -47,6 +49,7 @@ type Controller struct {
lastClusterSyncTime int64 lastClusterSyncTime int64
} }
// NewController creates a new controller
func NewController(controllerConfig *Config) *Controller { func NewController(controllerConfig *Config) *Controller {
logger := logrus.New() logger := logrus.New()
@ -168,6 +171,7 @@ func (c *Controller) initController() {
} }
} }
// Run starts background controller processes
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
c.initController() c.initController()

View File

@ -27,7 +27,10 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
c.clusterListFunc(metav1.ListOptions{ResourceVersion: "0"}) _, err := c.clusterListFunc(metav1.ListOptions{ResourceVersion: "0"})
if err != nil {
c.logger.Errorf("could not list clusters: %v", err)
}
case <-stopCh: case <-stopCh:
return return
} }

View File

@ -51,7 +51,7 @@ func newMockController() *Controller {
controller := NewController(&Config{}) controller := NewController(&Config{})
controller.opConfig.ClusterNameLabel = "cluster-name" controller.opConfig.ClusterNameLabel = "cluster-name"
controller.opConfig.InfrastructureRolesSecretName = controller.opConfig.InfrastructureRolesSecretName =
spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName} spec.NamespacedName{Namespace: v1.NamespaceDefault, Name: testInfrastructureRolesSecretName}
controller.opConfig.Workers = 4 controller.opConfig.Workers = 4
controller.KubeClient = newMockKubernetesClient() controller.KubeClient = newMockKubernetesClient()
return controller return controller
@ -77,7 +77,7 @@ func TestPodClusterName(t *testing.T) {
}, },
}, },
}, },
spec.NamespacedName{v1.NamespaceDefault, "testcluster"}, spec.NamespacedName{Namespace: v1.NamespaceDefault, Name: "testcluster"},
}, },
} }
for _, test := range testTable { for _, test := range testTable {
@ -94,11 +94,11 @@ func TestClusterWorkerID(t *testing.T) {
expected uint32 expected uint32
}{ }{
{ {
in: spec.NamespacedName{"foo", "bar"}, in: spec.NamespacedName{Namespace: "foo", Name: "bar"},
expected: 2, expected: 2,
}, },
{ {
in: spec.NamespacedName{"default", "testcluster"}, in: spec.NamespacedName{Namespace: "default", Name: "testcluster"},
expected: 3, expected: 3,
}, },
} }
@ -122,18 +122,17 @@ func TestGetInfrastructureRoles(t *testing.T) {
nil, nil,
}, },
{ {
spec.NamespacedName{v1.NamespaceDefault, "null"}, spec.NamespacedName{Namespace: v1.NamespaceDefault, Name: "null"},
nil, nil,
fmt.Errorf(`could not get infrastructure roles secret: NotFound`), fmt.Errorf(`could not get infrastructure roles secret: NotFound`),
}, },
{ {
spec.NamespacedName{v1.NamespaceDefault, testInfrastructureRolesSecretName}, spec.NamespacedName{Namespace: v1.NamespaceDefault, Name: testInfrastructureRolesSecretName},
map[string]spec.PgUser{ map[string]spec.PgUser{
"testrole": { "testrole": {
"testrole", Name: "testrole",
"testpassword", Password: "testpassword",
nil, MemberOf: []string{"testinrole"},
[]string{"testinrole"},
}, },
}, },
nil, nil,

View File

@ -9,7 +9,7 @@ import (
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
) )
// EvenType contains type of the events for the TPRs and Pods received from Kubernetes // EventType contains type of the events for the TPRs and Pods received from Kubernetes
type EventType string type EventType string
// NamespacedName describes the namespace/name pairs used in Kubernetes names. // NamespacedName describes the namespace/name pairs used in Kubernetes names.

View File

@ -8,12 +8,14 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/spec"
) )
// TPR describes ThirdPartyResource specific configuration parameters
type TPR struct { type TPR struct {
ReadyWaitInterval time.Duration `name:"ready_wait_interval" default:"4s"` ReadyWaitInterval time.Duration `name:"ready_wait_interval" default:"4s"`
ReadyWaitTimeout time.Duration `name:"ready_wait_timeout" default:"30s"` ReadyWaitTimeout time.Duration `name:"ready_wait_timeout" default:"30s"`
ResyncPeriod time.Duration `name:"resync_period" default:"5m"` ResyncPeriod time.Duration `name:"resync_period" default:"5m"`
} }
// Resources describes kubernetes resource specific configuration parameters
type Resources struct { type Resources struct {
ResourceCheckInterval time.Duration `name:"resource_check_interval" default:"3s"` ResourceCheckInterval time.Duration `name:"resource_check_interval" default:"3s"`
ResourceCheckTimeout time.Duration `name:"resource_check_timeout" default:"10m"` ResourceCheckTimeout time.Duration `name:"resource_check_timeout" default:"10m"`
@ -28,6 +30,7 @@ type Resources struct {
DefaultMemoryLimit string `name:"default_memory_limit" default:"1Gi"` DefaultMemoryLimit string `name:"default_memory_limit" default:"1Gi"`
} }
// Auth describes authentication specific configuration parameters
type Auth struct { type Auth struct {
PamRoleName string `name:"pam_rol_name" default:"zalandos"` PamRoleName string `name:"pam_rol_name" default:"zalandos"`
PamConfiguration string `name:"pam_configuration" default:"https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"` PamConfiguration string `name:"pam_configuration" default:"https://info.example.com/oauth2/tokeninfo?access_token= uid realm=/employees"`
@ -38,6 +41,7 @@ type Auth struct {
ReplicationUsername string `name:"replication_username" default:"replication"` ReplicationUsername string `name:"replication_username" default:"replication"`
} }
// Config describes operator config
type Config struct { type Config struct {
TPR TPR
Resources Resources
@ -59,6 +63,7 @@ type Config struct {
Workers uint32 `name:"workers" default:"4"` Workers uint32 `name:"workers" default:"4"`
} }
// MustMarshal marshals the config or panics
func (c Config) MustMarshal() string { func (c Config) MustMarshal() string {
b, err := json.MarshalIndent(c, "", "\t") b, err := json.MarshalIndent(c, "", "\t")
if err != nil { if err != nil {
@ -68,6 +73,7 @@ func (c Config) MustMarshal() string {
return string(b) return string(b)
} }
// NewFromMap creates Config from the map
func NewFromMap(m map[string]string) *Config { func NewFromMap(m map[string]string) *Config {
cfg := Config{} cfg := Config{}
fields, _ := structFields(&cfg) fields, _ := structFields(&cfg)
@ -91,6 +97,7 @@ func NewFromMap(m map[string]string) *Config {
return &cfg return &cfg
} }
// Copy creates a copy of the config
func Copy(c *Config) Config { func Copy(c *Config) Config {
cfg := *c cfg := *c

View File

@ -5,7 +5,7 @@ import "time"
// AWS specific constants used by other modules // AWS specific constants used by other modules
const ( const (
// default region for AWS. TODO: move it to the operator configuration // default region for AWS. TODO: move it to the operator configuration
AWS_REGION = "eu-central-1" AWSRegion = "eu-central-1"
// EBS related constants // EBS related constants
EBSVolumeIDStart = "/vol-" EBSVolumeIDStart = "/vol-"
EBSProvisioner = "kubernetes.io/aws-ebs" EBSProvisioner = "kubernetes.io/aws-ebs"

View File

@ -1,5 +1,6 @@
package constants package constants
// PostgreSQL specific constants
const ( const (
DataVolumeName = "pgdata" DataVolumeName = "pgdata"
PodRoleMaster = "master" PodRoleMaster = "master"

View File

@ -1,5 +1,6 @@
package constants package constants
// Roles specific constants
const ( const (
PasswordLength = 64 PasswordLength = 64
UserSecretTemplate = "%s.%s.credentials." + TPRKind + "." + TPRGroup // Username, ClusterName UserSecretTemplate = "%s.%s.credentials." + TPRKind + "." + TPRGroup // Username, ClusterName

View File

@ -1,5 +1,6 @@
package filesystems package filesystems
// FilesystemResizer has methods to work with resizing of a filesystem
type FilesystemResizer interface { type FilesystemResizer interface {
CanResizeFilesystem(fstype string) bool CanResizeFilesystem(fstype string) bool
ResizeFilesystem(deviceName string, commandExecutor func(string) (out string, err error)) error ResizeFilesystem(deviceName string, commandExecutor func(string) (out string, err error)) error

View File

@ -18,6 +18,7 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
) )
// KubernetesClient describes getters for Kubernetes objects
type KubernetesClient struct { type KubernetesClient struct {
v1core.SecretsGetter v1core.SecretsGetter
v1core.ServicesGetter v1core.ServicesGetter
@ -31,6 +32,7 @@ type KubernetesClient struct {
RESTClient rest.Interface RESTClient rest.Interface
} }
// NewFromKubernetesInterface creates KubernetesClient from kubernetes Interface
func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) { func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) {
c = KubernetesClient{} c = KubernetesClient{}
c.PodsGetter = src.CoreV1() c.PodsGetter = src.CoreV1()
@ -46,6 +48,7 @@ func NewFromKubernetesInterface(src kubernetes.Interface) (c KubernetesClient) {
return return
} }
// RestConfig creates REST config
func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) { func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) {
if outOfCluster { if outOfCluster {
return clientcmd.BuildConfigFromFlags("", kubeConfig) return clientcmd.BuildConfigFromFlags("", kubeConfig)
@ -54,18 +57,22 @@ func RestConfig(kubeConfig string, outOfCluster bool) (*rest.Config, error) {
return rest.InClusterConfig() return rest.InClusterConfig()
} }
// ClientSet creates clientset using REST config
func ClientSet(config *rest.Config) (client *kubernetes.Clientset, err error) { func ClientSet(config *rest.Config) (client *kubernetes.Clientset, err error) {
return kubernetes.NewForConfig(config) return kubernetes.NewForConfig(config)
} }
// ResourceAlreadyExists checks if error corresponds to Already exists error
func ResourceAlreadyExists(err error) bool { func ResourceAlreadyExists(err error) bool {
return apierrors.IsAlreadyExists(err) return apierrors.IsAlreadyExists(err)
} }
// ResourceNotFound checks if error corresponds to Not found error
func ResourceNotFound(err error) bool { func ResourceNotFound(err error) bool {
return apierrors.IsNotFound(err) return apierrors.IsNotFound(err)
} }
// KubernetesRestClient create kubernets Interface using REST config
func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) { func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) {
cfg.GroupVersion = &schema.GroupVersion{ cfg.GroupVersion = &schema.GroupVersion{
Group: constants.TPRGroup, Group: constants.TPRGroup,
@ -77,6 +84,7 @@ func KubernetesRestClient(cfg rest.Config) (rest.Interface, error) {
return rest.RESTClientFor(&cfg) return rest.RESTClientFor(&cfg)
} }
// WaitTPRReady waits until ThirdPartyResource is ready
func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error { func WaitTPRReady(restclient rest.Interface, interval, timeout time.Duration, ns string) error {
return retryutil.Retry(interval, timeout, func() (bool, error) { return retryutil.Retry(interval, timeout, func() (bool, error) {
_, err := restclient. _, err := restclient.

View File

@ -5,10 +5,8 @@ import (
"time" "time"
) )
type ConditionFunc func() (bool, error)
// Retry calls ConditionFunc until it returns boolean true, a timeout expires or an error occurs. // Retry calls ConditionFunc until it returns boolean true, a timeout expires or an error occurs.
func Retry(interval time.Duration, timeout time.Duration, f ConditionFunc) error { func Retry(interval time.Duration, timeout time.Duration, f func() (bool, error)) error {
//TODO: make the retry exponential //TODO: make the retry exponential
if timeout < interval { if timeout < interval {
return fmt.Errorf("timout(%s) should be greater than interval(%v)", timeout, interval) return fmt.Errorf("timout(%s) should be greater than interval(%v)", timeout, interval)

View File

@ -22,7 +22,7 @@ type infrastructureAccount struct {
} }
// Team defines informaiton for a single team, including the list of members and infrastructure accounts. // Team defines informaiton for a single team, including the list of members and infrastructure accounts.
type team struct { type Team struct {
Dn string `json:"dn"` Dn string `json:"dn"`
ID string `json:"id"` ID string `json:"id"`
TeamName string `json:"id_name"` TeamName string `json:"id_name"`
@ -43,6 +43,7 @@ type httpClient interface {
Do(req *http.Request) (*http.Response, error) Do(req *http.Request) (*http.Response, error)
} }
// API describes teams API
type API struct { type API struct {
httpClient httpClient
url string url string
@ -61,7 +62,7 @@ func NewTeamsAPI(url string, log *logrus.Logger) *API {
} }
// TeamInfo returns information about a given team using its ID and a token to authenticate to the API service. // TeamInfo returns information about a given team using its ID and a token to authenticate to the API service.
func (t *API) TeamInfo(teamID, token string) (tm *team, err error) { func (t *API) TeamInfo(teamID, token string) (tm *Team, err error) {
var ( var (
req *http.Request req *http.Request
resp *http.Response resp *http.Response
@ -103,7 +104,7 @@ func (t *API) TeamInfo(teamID, token string) (tm *team, err error) {
return return
} }
tm = &team{} tm = &Team{}
d := json.NewDecoder(resp.Body) d := json.NewDecoder(resp.Body)
err = d.Decode(tm) err = d.Decode(tm)
if err != nil { if err != nil {

View File

@ -18,7 +18,7 @@ var (
var teamsAPItc = []struct { var teamsAPItc = []struct {
in string in string
inCode int inCode int
out *team out *Team
err error err error
}{ }{
{`{ {`{
@ -67,7 +67,7 @@ var teamsAPItc = []struct {
"parent_team_id": "111221" "parent_team_id": "111221"
}`, }`,
200, 200,
&team{ &Team{
Dn: "cn=100100,ou=official,ou=foobar,dc=zalando,dc=net", Dn: "cn=100100,ou=official,ou=foobar,dc=zalando,dc=net",
ID: "acid", ID: "acid",
TeamName: "ACID", TeamName: "ACID",
@ -169,7 +169,7 @@ func TestInfo(t *testing.T) {
} }
} }
type mockHttpClient struct { type mockHTTPClient struct {
} }
type mockBody struct { type mockBody struct {
@ -183,7 +183,7 @@ func (b *mockBody) Close() error {
return fmt.Errorf("close error") return fmt.Errorf("close error")
} }
func (c *mockHttpClient) Do(req *http.Request) (*http.Response, error) { func (c *mockHTTPClient) Do(req *http.Request) (*http.Response, error) {
resp := http.Response{ resp := http.Response{
Status: "200 OK", Status: "200 OK",
StatusCode: 200, StatusCode: 200,
@ -200,7 +200,7 @@ func TestHttpClientClose(t *testing.T) {
ts := httptest.NewServer(nil) ts := httptest.NewServer(nil)
api := NewTeamsAPI(ts.URL, logger) api := NewTeamsAPI(ts.URL, logger)
api.httpClient = &mockHttpClient{} api.httpClient = &mockHTTPClient{}
_, err := api.TeamInfo("acid", token) _, err := api.TeamInfo("acid", token)
expError := fmt.Errorf("error when closing response: close error") expError := fmt.Errorf("error when closing response: close error")

View File

@ -20,7 +20,7 @@ const (
var passwordChars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") var passwordChars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
func init() { func init() {
rand.Seed(int64(time.Now().Unix())) rand.Seed(time.Now().Unix())
} }
// RandomPassword generates random alphanumeric password of a given length. // RandomPassword generates random alphanumeric password of a given length.

View File

@ -7,10 +7,10 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/client-go/pkg/api/v1"
"github.com/zalando-incubator/postgres-operator/pkg/util/constants" "github.com/zalando-incubator/postgres-operator/pkg/util/constants"
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil" "github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
"k8s.io/client-go/pkg/api/v1"
) )
// EBSVolumeResizer implements volume resizing interface for AWS EBS volumes. // EBSVolumeResizer implements volume resizing interface for AWS EBS volumes.
@ -20,7 +20,7 @@ type EBSVolumeResizer struct {
// ConnectToProvider connects to AWS. // ConnectToProvider connects to AWS.
func (c *EBSVolumeResizer) ConnectToProvider() error { func (c *EBSVolumeResizer) ConnectToProvider() error {
sess, err := session.NewSession(&aws.Config{Region: aws.String(constants.AWS_REGION)}) sess, err := session.NewSession(&aws.Config{Region: aws.String(constants.AWSRegion)})
if err != nil { if err != nil {
return fmt.Errorf("could not establish AWS session: %v", err) return fmt.Errorf("could not establish AWS session: %v", err)
} }
@ -52,21 +52,21 @@ func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string,
} }
// ResizeVolume actually calls AWS API to resize the EBS volume if necessary. // ResizeVolume actually calls AWS API to resize the EBS volume if necessary.
func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { func (c *EBSVolumeResizer) ResizeVolume(volumeID string, newSize int64) error {
/* first check if the volume is already of a requested size */ /* first check if the volume is already of a requested size */
volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeId}}) volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeID}})
if err != nil { if err != nil {
return fmt.Errorf("could not get information about the volume: %v", err) return fmt.Errorf("could not get information about the volume: %v", err)
} }
vol := volumeOutput.Volumes[0] vol := volumeOutput.Volumes[0]
if *vol.VolumeId != volumeId { if *vol.VolumeId != volumeID {
return fmt.Errorf("describe volume %q returned information about a non-matching volume %q", volumeId, *vol.VolumeId) return fmt.Errorf("describe volume %q returned information about a non-matching volume %q", volumeID, *vol.VolumeId)
} }
if *vol.Size == newSize { if *vol.Size == newSize {
// nothing to do // nothing to do
return nil return nil
} }
input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeId} input := ec2.ModifyVolumeInput{Size: &newSize, VolumeId: &volumeID}
output, err := c.connection.ModifyVolume(&input) output, err := c.connection.ModifyVolume(&input)
if err != nil { if err != nil {
return fmt.Errorf("could not modify persistent volume: %v", err) return fmt.Errorf("could not modify persistent volume: %v", err)
@ -74,7 +74,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error {
state := *output.VolumeModification.ModificationState state := *output.VolumeModification.ModificationState
if state == constants.EBSVolumeStateFailed { if state == constants.EBSVolumeStateFailed {
return fmt.Errorf("could not modify persistent volume %q: modification state failed", volumeId) return fmt.Errorf("could not modify persistent volume %q: modification state failed", volumeID)
} }
if state == "" { if state == "" {
return fmt.Errorf("received empty modification status") return fmt.Errorf("received empty modification status")
@ -83,7 +83,7 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error {
return nil return nil
} }
// wait until the volume reaches the "optimizing" or "completed" state // wait until the volume reaches the "optimizing" or "completed" state
in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeId}} in := ec2.DescribeVolumesModificationsInput{VolumeIds: []*string{&volumeID}}
return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout, return retryutil.Retry(constants.EBSVolumeResizeWaitInterval, constants.EBSVolumeResizeWaitTimeout,
func() (bool, error) { func() (bool, error) {
out, err := c.connection.DescribeVolumesModifications(&in) out, err := c.connection.DescribeVolumesModifications(&in)
@ -91,16 +91,17 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error {
return false, fmt.Errorf("could not describe volume modification: %v", err) return false, fmt.Errorf("could not describe volume modification: %v", err)
} }
if len(out.VolumesModifications) != 1 { if len(out.VolumesModifications) != 1 {
return false, fmt.Errorf("describe volume modification didn't return one record for volume %q", volumeId) return false, fmt.Errorf("describe volume modification didn't return one record for volume %q", volumeID)
} }
if *out.VolumesModifications[0].VolumeId != volumeId { if *out.VolumesModifications[0].VolumeId != volumeID {
return false, fmt.Errorf("non-matching volume id when describing modifications: %q is different from %q", return false, fmt.Errorf("non-matching volume id when describing modifications: %q is different from %q",
*out.VolumesModifications[0].VolumeId, volumeId) *out.VolumesModifications[0].VolumeId, volumeID)
} }
return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil
}) })
} }
// DisconnectFromProvider closes connection to the EC2 instance
func (c *EBSVolumeResizer) DisconnectFromProvider() error { func (c *EBSVolumeResizer) DisconnectFromProvider() error {
c.connection = nil c.connection = nil
return nil return nil

View File

@ -10,6 +10,6 @@ type VolumeResizer interface {
IsConnectedToProvider() bool IsConnectedToProvider() bool
VolumeBelongsToProvider(pv *v1.PersistentVolume) bool VolumeBelongsToProvider(pv *v1.PersistentVolume) bool
GetProviderVolumeID(pv *v1.PersistentVolume) (string, error) GetProviderVolumeID(pv *v1.PersistentVolume) (string, error)
ResizeVolume(providerVolumeId string, newSize int64) error ResizeVolume(providerVolumeID string, newSize int64) error
DisconnectFromProvider() error DisconnectFromProvider() error
} }