Fix the golint, gosimple warnings
This commit is contained in:
parent
233e8529c1
commit
d34273543e
|
|
@ -39,7 +39,7 @@ type Config struct {
|
||||||
KubeClient *kubernetes.Clientset //TODO: move clients to the better place?
|
KubeClient *kubernetes.Clientset //TODO: move clients to the better place?
|
||||||
RestClient *rest.RESTClient
|
RestClient *rest.RESTClient
|
||||||
EtcdClient etcdclient.KeysAPI
|
EtcdClient etcdclient.KeysAPI
|
||||||
TeamsAPIClient *teams.TeamsAPI
|
TeamsAPIClient *teams.API
|
||||||
OpConfig config.Config
|
OpConfig config.Config
|
||||||
InfrastructureRoles map[string]spec.PgUser // inherited from the controller
|
InfrastructureRoles map[string]spec.PgUser // inherited from the controller
|
||||||
}
|
}
|
||||||
|
|
@ -100,7 +100,7 @@ func (c *Cluster) ClusterName() spec.NamespacedName {
|
||||||
|
|
||||||
func (c *Cluster) teamName() string {
|
func (c *Cluster) teamName() string {
|
||||||
// TODO: check Teams API for the actual name (in case the user passes an integer Id).
|
// TODO: check Teams API for the actual name (in case the user passes an integer Id).
|
||||||
return c.Spec.TeamId
|
return c.Spec.TeamID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) setStatus(status spec.PostgresStatus) {
|
func (c *Cluster) setStatus(status spec.PostgresStatus) {
|
||||||
|
|
@ -199,9 +199,8 @@ func (c *Cluster) Create(stopCh <-chan struct{}) error {
|
||||||
service, err := c.createService()
|
service, err := c.createService()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Can't create Service: %s", err)
|
return fmt.Errorf("Can't create Service: %s", err)
|
||||||
} else {
|
|
||||||
c.logger.Infof("Service '%s' has been successfully created", util.NameFromMeta(service.ObjectMeta))
|
|
||||||
}
|
}
|
||||||
|
c.logger.Infof("Service '%s' has been successfully created", util.NameFromMeta(service.ObjectMeta))
|
||||||
|
|
||||||
if err = c.initUsers(); err != nil {
|
if err = c.initUsers(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -230,20 +229,21 @@ func (c *Cluster) Create(stopCh <-chan struct{}) error {
|
||||||
if !(c.masterLess || c.databaseAccessDisabled()) {
|
if !(c.masterLess || c.databaseAccessDisabled()) {
|
||||||
if err := c.initDbConn(); err != nil {
|
if err := c.initDbConn(); err != nil {
|
||||||
return fmt.Errorf("Can't init db connection: %s", err)
|
return fmt.Errorf("Can't init db connection: %s", err)
|
||||||
} else {
|
|
||||||
if err = c.createUsers(); err != nil {
|
|
||||||
return fmt.Errorf("Can't create users: %s", err)
|
|
||||||
} else {
|
|
||||||
c.logger.Infof("Users have been successfully created")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if err = c.createUsers(); err != nil {
|
||||||
|
return fmt.Errorf("Can't create users: %s", err)
|
||||||
|
}
|
||||||
|
c.logger.Infof("Users have been successfully created")
|
||||||
} else {
|
} else {
|
||||||
if c.masterLess {
|
if c.masterLess {
|
||||||
c.logger.Warnln("Cluster is masterless")
|
c.logger.Warnln("Cluster is masterless")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c.ListResources()
|
err = c.ListResources()
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Errorf("Can't list resources: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -406,9 +406,8 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
|
||||||
if err := c.updateService(newService); err != nil {
|
if err := c.updateService(newService); err != nil {
|
||||||
c.setStatus(spec.ClusterStatusUpdateFailed)
|
c.setStatus(spec.ClusterStatusUpdateFailed)
|
||||||
return fmt.Errorf("Can't update Service: %s", err)
|
return fmt.Errorf("Can't update Service: %s", err)
|
||||||
} else {
|
|
||||||
c.logger.Infof("Service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta))
|
|
||||||
}
|
}
|
||||||
|
c.logger.Infof("Service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta))
|
||||||
}
|
}
|
||||||
|
|
||||||
if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match {
|
if match, reason := c.sameVolumeWith(newSpec.Spec.Volume); !match {
|
||||||
|
|
@ -530,12 +529,11 @@ func (c *Cluster) initHumanUsers() error {
|
||||||
teamMembers, err := c.getTeamMembers()
|
teamMembers, err := c.getTeamMembers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Can't get list of team members: %s", err)
|
return fmt.Errorf("Can't get list of team members: %s", err)
|
||||||
} else {
|
}
|
||||||
for _, username := range teamMembers {
|
for _, username := range teamMembers {
|
||||||
flags := []string{constants.RoleFlagLogin, constants.RoleFlagSuperuser}
|
flags := []string{constants.RoleFlagLogin, constants.RoleFlagSuperuser}
|
||||||
memberOf := []string{c.OpConfig.PamRoleName}
|
memberOf := []string{c.OpConfig.PamRoleName}
|
||||||
c.pgUsers[username] = spec.PgUser{Name: username, Flags: flags, MemberOf: memberOf}
|
c.pgUsers[username] = spec.PgUser{Name: username, Flags: flags, MemberOf: memberOf}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -547,11 +545,11 @@ func (c *Cluster) initInfrastructureRoles() error {
|
||||||
if !isValidUsername(username) {
|
if !isValidUsername(username) {
|
||||||
return fmt.Errorf("Invalid username: '%s'", username)
|
return fmt.Errorf("Invalid username: '%s'", username)
|
||||||
}
|
}
|
||||||
if flags, err := normalizeUserFlags(data.Flags); err != nil {
|
flags, err := normalizeUserFlags(data.Flags)
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("Invalid flags for user '%s': %s", username, err)
|
return fmt.Errorf("Invalid flags for user '%s': %s", username, err)
|
||||||
} else {
|
|
||||||
data.Flags = flags
|
|
||||||
}
|
}
|
||||||
|
data.Flags = flags
|
||||||
c.pgUsers[username] = data
|
c.pgUsers[username] = data
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -52,8 +52,8 @@ func (c *Cluster) resourceRequirements(resources spec.Resources) (*v1.ResourceRe
|
||||||
|
|
||||||
config := c.OpConfig
|
config := c.OpConfig
|
||||||
|
|
||||||
defaultRequests := spec.ResourceDescription{Cpu: config.DefaultCpuRequest, Memory: config.DefaultMemoryRequest}
|
defaultRequests := spec.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest}
|
||||||
defaultLimits := spec.ResourceDescription{Cpu: config.DefaultCpuLimit, Memory: config.DefaultMemoryLimit}
|
defaultLimits := spec.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit}
|
||||||
|
|
||||||
result := v1.ResourceRequirements{}
|
result := v1.ResourceRequirements{}
|
||||||
|
|
||||||
|
|
@ -74,13 +74,13 @@ func fillResourceList(spec spec.ResourceDescription, defaults spec.ResourceDescr
|
||||||
var err error
|
var err error
|
||||||
requests := v1.ResourceList{}
|
requests := v1.ResourceList{}
|
||||||
|
|
||||||
if spec.Cpu != "" {
|
if spec.CPU != "" {
|
||||||
requests[v1.ResourceCPU], err = resource.ParseQuantity(spec.Cpu)
|
requests[v1.ResourceCPU], err = resource.ParseQuantity(spec.CPU)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Can't parse CPU quantity: %s", err)
|
return nil, fmt.Errorf("Can't parse CPU quantity: %s", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
requests[v1.ResourceCPU], err = resource.ParseQuantity(defaults.Cpu)
|
requests[v1.ResourceCPU], err = resource.ParseQuantity(defaults.CPU)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Can't parse default CPU quantity: %s", err)
|
return nil, fmt.Errorf("Can't parse default CPU quantity: %s", err)
|
||||||
}
|
}
|
||||||
|
|
@ -432,7 +432,7 @@ func (c *Cluster) genService(allowedSourceRanges []string) *v1.Service {
|
||||||
Namespace: c.Metadata.Namespace,
|
Namespace: c.Metadata.Namespace,
|
||||||
Labels: c.labelsSet(),
|
Labels: c.labelsSet(),
|
||||||
Annotations: map[string]string{
|
Annotations: map[string]string{
|
||||||
constants.ZalandoDnsNameAnnotation: c.dnsName(),
|
constants.ZalandoDNSNameAnnotation: c.dnsName(),
|
||||||
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
|
constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -33,11 +33,11 @@ func (c *Cluster) pgConnectionString() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) databaseAccessDisabled() bool {
|
func (c *Cluster) databaseAccessDisabled() bool {
|
||||||
if c.OpConfig.EnableDBAccess == false {
|
if !c.OpConfig.EnableDBAccess {
|
||||||
c.logger.Debugf("Database access is disabled")
|
c.logger.Debugf("Database access is disabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.OpConfig.EnableDBAccess == false
|
return !c.OpConfig.EnableDBAccess
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) initDbConn() (err error) {
|
func (c *Cluster) initDbConn() (err error) {
|
||||||
|
|
|
||||||
|
|
@ -170,12 +170,12 @@ func (c *Cluster) recreatePods() error {
|
||||||
listOptions := v1.ListOptions{
|
listOptions := v1.ListOptions{
|
||||||
LabelSelector: ls.String(),
|
LabelSelector: ls.String(),
|
||||||
}
|
}
|
||||||
|
|
||||||
pods, err := c.KubeClient.Pods(namespace).List(listOptions)
|
pods, err := c.KubeClient.Pods(namespace).List(listOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Can't get the list of Pods: %s", err)
|
return fmt.Errorf("Can't get the list of Pods: %s", err)
|
||||||
} else {
|
|
||||||
c.logger.Infof("There are %d Pods in the cluster to recreate", len(pods.Items))
|
|
||||||
}
|
}
|
||||||
|
c.logger.Infof("There are %d Pods in the cluster to recreate", len(pods.Items))
|
||||||
|
|
||||||
var masterPod v1.Pod
|
var masterPod v1.Pod
|
||||||
for _, pod := range pods.Items {
|
for _, pod := range pods.Items {
|
||||||
|
|
@ -186,8 +186,7 @@ func (c *Cluster) recreatePods() error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.recreatePod(pod)
|
if err := c.recreatePod(pod); err != nil {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Can't recreate replica Pod '%s': %s", util.NameFromMeta(pod.ObjectMeta), err)
|
return fmt.Errorf("Can't recreate replica Pod '%s': %s", util.NameFromMeta(pod.ObjectMeta), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -185,12 +185,11 @@ func (c *Cluster) replaceStatefulSet(newStatefulSet *v1beta1.StatefulSet) error
|
||||||
createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet)
|
createdStatefulset, err := c.KubeClient.StatefulSets(newStatefulSet.Namespace).Create(newStatefulSet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Can't create statefulset '%s': %s", statefulSetName, err)
|
return fmt.Errorf("Can't create statefulset '%s': %s", statefulSetName, err)
|
||||||
} else {
|
}
|
||||||
// check that all the previous replicas were picked up.
|
// check that all the previous replicas were picked up.
|
||||||
if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas &&
|
if newStatefulSet.Spec.Replicas == oldStatefulset.Spec.Replicas &&
|
||||||
createdStatefulset.Status.Replicas != oldStatefulset.Status.Replicas {
|
createdStatefulset.Status.Replicas != oldStatefulset.Status.Replicas {
|
||||||
c.logger.Warnf("Number of pods for the old and updated Statefulsets is not identical")
|
c.logger.Warnf("Number of pods for the old and updated Statefulsets is not identical")
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Statefulset = createdStatefulset
|
c.Statefulset = createdStatefulset
|
||||||
|
|
@ -296,12 +295,6 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) {
|
||||||
return endpoints, nil
|
return endpoints, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) updateEndpoint(newEndpoint *v1.Endpoints) error {
|
|
||||||
//TODO: to be implemented
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cluster) deleteEndpoint() error {
|
func (c *Cluster) deleteEndpoint() error {
|
||||||
c.logger.Debugln("Deleting Endpoint")
|
c.logger.Debugln("Deleting Endpoint")
|
||||||
if c.Endpoint == nil {
|
if c.Endpoint == nil {
|
||||||
|
|
@ -371,9 +364,6 @@ func (c *Cluster) createUsers() (err error) {
|
||||||
// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
|
// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
|
||||||
reqs := c.userSyncStrategy.ProduceSyncRequests(nil, c.pgUsers)
|
reqs := c.userSyncStrategy.ProduceSyncRequests(nil, c.pgUsers)
|
||||||
err = c.userSyncStrategy.ExecuteSyncRequests(reqs, c.pgDb)
|
err = c.userSyncStrategy.ExecuteSyncRequests(reqs, c.pgDb)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,10 @@ func (c *Cluster) Sync(stopCh <-chan struct{}) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
c.loadResources()
|
err := c.loadResources()
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Errorf("Can't load resources: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
if !c.podDispatcherRunning {
|
if !c.podDispatcherRunning {
|
||||||
go c.podEventsDispatcher(stopCh)
|
go c.podEventsDispatcher(stopCh)
|
||||||
|
|
@ -59,11 +62,8 @@ func (c *Cluster) syncSecrets() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.applySecrets()
|
err := c.applySecrets()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) syncService() error {
|
func (c *Cluster) syncService() error {
|
||||||
|
|
@ -80,11 +80,11 @@ func (c *Cluster) syncService() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
desiredSvc := c.genService(cSpec.AllowedSourceRanges)
|
desiredSvc := c.genService(cSpec.AllowedSourceRanges)
|
||||||
if match, reason := c.sameServiceWith(desiredSvc); match {
|
match, reason := c.sameServiceWith(desiredSvc)
|
||||||
|
if match {
|
||||||
return nil
|
return nil
|
||||||
} else {
|
|
||||||
c.logServiceChanges(c.Service, desiredSvc, false, reason)
|
|
||||||
}
|
}
|
||||||
|
c.logServiceChanges(c.Service, desiredSvc, false, reason)
|
||||||
|
|
||||||
if err := c.updateService(desiredSvc); err != nil {
|
if err := c.updateService(desiredSvc); err != nil {
|
||||||
return fmt.Errorf("Can't update Service to match desired state: %s", err)
|
return fmt.Errorf("Can't update Service to match desired state: %s", err)
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,10 @@
|
||||||
package cluster
|
package cluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
etcdclient "github.com/coreos/etcd/client"
|
|
||||||
"k8s.io/apimachinery/pkg/util/json"
|
"k8s.io/apimachinery/pkg/util/json"
|
||||||
"k8s.io/client-go/pkg/api/v1"
|
"k8s.io/client-go/pkg/api/v1"
|
||||||
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
"k8s.io/client-go/pkg/apis/apps/v1beta1"
|
||||||
|
|
@ -30,11 +28,10 @@ func normalizeUserFlags(userFlags []string) (flags []string, err error) {
|
||||||
if !alphaNumericRegexp.MatchString(flag) {
|
if !alphaNumericRegexp.MatchString(flag) {
|
||||||
err = fmt.Errorf("User flag '%s' is not alphanumeric", flag)
|
err = fmt.Errorf("User flag '%s' is not alphanumeric", flag)
|
||||||
return
|
return
|
||||||
} else {
|
}
|
||||||
flag = strings.ToUpper(flag)
|
flag = strings.ToUpper(flag)
|
||||||
if _, ok := uniqueFlags[flag]; !ok {
|
if _, ok := uniqueFlags[flag]; !ok {
|
||||||
uniqueFlags[flag] = true
|
uniqueFlags[flag] = true
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if uniqueFlags[constants.RoleFlagLogin] && uniqueFlags[constants.RoleFlagNoLogin] {
|
if uniqueFlags[constants.RoleFlagLogin] && uniqueFlags[constants.RoleFlagNoLogin] {
|
||||||
|
|
@ -109,10 +106,10 @@ func (c *Cluster) logVolumeChanges(old, new spec.Volume, reason string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) getTeamMembers() ([]string, error) {
|
func (c *Cluster) getTeamMembers() ([]string, error) {
|
||||||
if c.Spec.TeamId == "" {
|
if c.Spec.TeamID == "" {
|
||||||
return nil, fmt.Errorf("No teamId specified")
|
return nil, fmt.Errorf("No teamId specified")
|
||||||
}
|
}
|
||||||
teamInfo, err := c.TeamsAPIClient.TeamInfo(c.Spec.TeamId)
|
teamInfo, err := c.TeamsAPIClient.TeamInfo(c.Spec.TeamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Can't get team info: %s", err)
|
return nil, fmt.Errorf("Can't get team info: %s", err)
|
||||||
}
|
}
|
||||||
|
|
@ -216,11 +213,7 @@ func (c *Cluster) waitPodLabelsReady() error {
|
||||||
|
|
||||||
//TODO: wait for master for a while and then set masterLess flag
|
//TODO: wait for master for a while and then set masterLess flag
|
||||||
|
|
||||||
if err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) waitStatefulsetPodsReady() error {
|
func (c *Cluster) waitStatefulsetPodsReady() error {
|
||||||
|
|
@ -259,25 +252,6 @@ func (c *Cluster) credentialSecretName(username string) string {
|
||||||
c.Metadata.Name)
|
c.Metadata.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) deleteEtcdKey() error {
|
|
||||||
etcdKey := fmt.Sprintf("/%s/%s", c.OpConfig.EtcdScope, c.Metadata.Name)
|
|
||||||
|
|
||||||
//TODO: retry multiple times
|
|
||||||
resp, err := c.EtcdClient.Delete(context.Background(),
|
|
||||||
etcdKey,
|
|
||||||
&etcdclient.DeleteOptions{Recursive: true})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Can't delete etcd key: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp == nil {
|
|
||||||
return fmt.Errorf("No response from etcd cluster")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cluster) podSpiloRole(pod *v1.Pod) string {
|
func (c *Cluster) podSpiloRole(pod *v1.Pod) string {
|
||||||
return pod.Labels[c.OpConfig.PodRoleLabel]
|
return pod.Labels[c.OpConfig.PodRoleLabel]
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ type Config struct {
|
||||||
KubeClient *kubernetes.Clientset
|
KubeClient *kubernetes.Clientset
|
||||||
RestClient *rest.RESTClient
|
RestClient *rest.RESTClient
|
||||||
EtcdClient etcdclient.KeysAPI
|
EtcdClient etcdclient.KeysAPI
|
||||||
TeamsAPIClient *teams.TeamsAPI
|
TeamsAPIClient *teams.API
|
||||||
InfrastructureRoles map[string]spec.PgUser
|
InfrastructureRoles map[string]spec.PgUser
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -30,7 +30,6 @@ type Controller struct {
|
||||||
Config
|
Config
|
||||||
opConfig *config.Config
|
opConfig *config.Config
|
||||||
logger *logrus.Entry
|
logger *logrus.Entry
|
||||||
waitCluster sync.WaitGroup
|
|
||||||
|
|
||||||
clustersMu sync.RWMutex
|
clustersMu sync.RWMutex
|
||||||
clusters map[spec.NamespacedName]*cluster.Cluster
|
clusters map[spec.NamespacedName]*cluster.Cluster
|
||||||
|
|
|
||||||
|
|
@ -8,10 +8,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Controller) initEtcdClient(etcdHost string) error {
|
func (c *Controller) initEtcdClient(etcdHost string) error {
|
||||||
etcdUrl := fmt.Sprintf("http://%s", etcdHost)
|
etcdURL := fmt.Sprintf("http://%s", etcdHost)
|
||||||
|
|
||||||
cfg, err := etcdclient.New(etcdclient.Config{
|
cfg, err := etcdclient.New(etcdclient.Config{
|
||||||
Endpoints: []string{etcdUrl},
|
Endpoints: []string{etcdURL},
|
||||||
Transport: etcdclient.DefaultTransport,
|
Transport: etcdclient.DefaultTransport,
|
||||||
HeaderTimeoutPerRequest: time.Second,
|
HeaderTimeoutPerRequest: time.Second,
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -209,18 +209,18 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
workerId := c.clusterWorkerId(clusterName)
|
workerID := c.clusterWorkerID(clusterName)
|
||||||
clusterEvent := spec.ClusterEvent{
|
clusterEvent := spec.ClusterEvent{
|
||||||
EventType: eventType,
|
EventType: eventType,
|
||||||
UID: uid,
|
UID: uid,
|
||||||
OldSpec: old,
|
OldSpec: old,
|
||||||
NewSpec: new,
|
NewSpec: new,
|
||||||
WorkerID: workerId,
|
WorkerID: workerID,
|
||||||
}
|
}
|
||||||
//TODO: if we delete cluster, discard all the previous events for the cluster
|
//TODO: if we delete cluster, discard all the previous events for the cluster
|
||||||
|
|
||||||
c.clusterEventQueues[workerId].Add(clusterEvent)
|
c.clusterEventQueues[workerID].Add(clusterEvent)
|
||||||
c.logger.WithField("worker", workerId).Infof("%s of the '%s' cluster has been queued for", eventType, clusterName)
|
c.logger.WithField("worker", workerID).Infof("%s of the '%s' cluster has been queued for", eventType, clusterName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) postgresqlAdd(obj interface{}) {
|
func (c *Controller) postgresqlAdd(obj interface{}) {
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,7 @@ func thirdPartyResource(TPRName string) *extv1beta.ThirdPartyResource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) clusterWorkerId(clusterName spec.NamespacedName) uint32 {
|
func (c *Controller) clusterWorkerID(clusterName spec.NamespacedName) uint32 {
|
||||||
return crc32.ChecksumIEEE([]byte(clusterName.String())) % c.opConfig.Workers
|
return crc32.ChecksumIEEE([]byte(clusterName.String())) % c.opConfig.Workers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -74,9 +74,8 @@ func (c *Controller) createTPR() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !k8sutil.ResourceAlreadyExists(err) {
|
if !k8sutil.ResourceAlreadyExists(err) {
|
||||||
return err
|
return err
|
||||||
} else {
|
|
||||||
c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName)
|
|
||||||
}
|
}
|
||||||
|
c.logger.Infof("ThirdPartyResource '%s' is already registered", TPRName)
|
||||||
} else {
|
} else {
|
||||||
c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName)
|
c.logger.Infof("ThirdPartyResource '%s' has been registered", TPRName)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ type PostgresqlParam struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ResourceDescription struct {
|
type ResourceDescription struct {
|
||||||
Cpu string `json:"cpu"`
|
CPU string `json:"cpu"`
|
||||||
Memory string `json:"memory"`
|
Memory string `json:"memory"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -81,7 +81,7 @@ type PostgresSpec struct {
|
||||||
Patroni `json:"patroni,omitempty"`
|
Patroni `json:"patroni,omitempty"`
|
||||||
Resources `json:"resources,omitempty"`
|
Resources `json:"resources,omitempty"`
|
||||||
|
|
||||||
TeamId string `json:"teamId"`
|
TeamID string `json:"teamId"`
|
||||||
AllowedSourceRanges []string `json:"allowedSourceRanges"`
|
AllowedSourceRanges []string `json:"allowedSourceRanges"`
|
||||||
NumberOfInstances int32 `json:"numberOfInstances"`
|
NumberOfInstances int32 `json:"numberOfInstances"`
|
||||||
Users map[string]UserFlags `json:"users"`
|
Users map[string]UserFlags `json:"users"`
|
||||||
|
|
@ -162,7 +162,7 @@ func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if got.EndTime.Before(got.StartTime) {
|
if got.EndTime.Before(got.StartTime) {
|
||||||
return fmt.Errorf("'From' time must be prior to the 'To' time.")
|
return fmt.Errorf("'From' time must be prior to the 'To' time")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !weekdayProvidedFrom || !weekdayProvidedTo {
|
if !weekdayProvidedFrom || !weekdayProvidedTo {
|
||||||
|
|
@ -228,7 +228,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error {
|
||||||
}
|
}
|
||||||
tmp2 := Postgresql(tmp)
|
tmp2 := Postgresql(tmp)
|
||||||
|
|
||||||
clusterName, err := clusterName(tmp2.Metadata.Name, tmp2.Spec.TeamId)
|
clusterName, err := clusterName(tmp2.Metadata.Name, tmp2.Spec.TeamID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
tmp2.Spec.ClusterName = clusterName
|
tmp2.Spec.ClusterName = clusterName
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -60,16 +60,16 @@ type UserSyncer interface {
|
||||||
ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error
|
ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p NamespacedName) String() string {
|
func (n NamespacedName) String() string {
|
||||||
if p.Namespace == "" && p.Name == "" {
|
if n.Namespace == "" && n.Name == "" {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
return types.NamespacedName(p).String()
|
return types.NamespacedName(n).String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p NamespacedName) MarshalJSON() ([]byte, error) {
|
func (n NamespacedName) MarshalJSON() ([]byte, error) {
|
||||||
return []byte("\"" + p.String() + "\""), nil
|
return []byte("\"" + n.String() + "\""), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NamespacedName) Decode(value string) error {
|
func (n *NamespacedName) Decode(value string) error {
|
||||||
|
|
|
||||||
|
|
@ -23,9 +23,9 @@ type Resources struct {
|
||||||
ClusterLabels map[string]string `name:"cluster_labels" default:"application:spilo"`
|
ClusterLabels map[string]string `name:"cluster_labels" default:"application:spilo"`
|
||||||
ClusterNameLabel string `name:"cluster_name_label" default:"cluster-name"`
|
ClusterNameLabel string `name:"cluster_name_label" default:"cluster-name"`
|
||||||
PodRoleLabel string `name:"pod_role_label" default:"spilo-role"`
|
PodRoleLabel string `name:"pod_role_label" default:"spilo-role"`
|
||||||
DefaultCpuRequest string `name:"default_cpu_request" default:"100m"`
|
DefaultCPURequest string `name:"default_cpu_request" default:"100m"`
|
||||||
DefaultMemoryRequest string `name:"default_memory_request" default:"100Mi"`
|
DefaultMemoryRequest string `name:"default_memory_request" default:"100Mi"`
|
||||||
DefaultCpuLimit string `name:"default_cpu_limit" default:"3"`
|
DefaultCPULimit string `name:"default_cpu_limit" default:"3"`
|
||||||
DefaultMemoryLimit string `name:"default_memory_limit" default:"1Gi"`
|
DefaultMemoryLimit string `name:"default_memory_limit" default:"1Gi"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@ package constants
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
//Constants
|
|
||||||
TPRName = "postgresql"
|
TPRName = "postgresql"
|
||||||
TPRVendor = "acid.zalan.do"
|
TPRVendor = "acid.zalan.do"
|
||||||
TPRDescription = "Managed PostgreSQL clusters"
|
TPRDescription = "Managed PostgreSQL clusters"
|
||||||
|
|
@ -11,11 +10,11 @@ const (
|
||||||
ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace
|
ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace
|
||||||
WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace
|
WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace
|
||||||
K8sVersion = "v1"
|
K8sVersion = "v1"
|
||||||
K8sApiPath = "/api"
|
K8sAPIPath = "/api"
|
||||||
DataVolumeName = "pgdata"
|
DataVolumeName = "pgdata"
|
||||||
PasswordLength = 64
|
PasswordLength = 64
|
||||||
UserSecretTemplate = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName
|
UserSecretTemplate = "%s.%s.credentials." + TPRName + "." + TPRVendor // Username, ClusterName
|
||||||
ZalandoDnsNameAnnotation = "external-dns.alpha.kubernetes.io/hostname"
|
ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname"
|
||||||
ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout"
|
ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout"
|
||||||
ElbTimeoutAnnotationValue = "3600"
|
ElbTimeoutAnnotationValue = "3600"
|
||||||
KubeIAmAnnotation = "iam.amazonaws.com/role"
|
KubeIAmAnnotation = "iam.amazonaws.com/role"
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ func ResourceNotFound(err error) bool {
|
||||||
|
|
||||||
func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) {
|
func KubernetesRestClient(c *rest.Config) (*rest.RESTClient, error) {
|
||||||
c.GroupVersion = &unversioned.GroupVersion{Version: constants.K8sVersion}
|
c.GroupVersion = &unversioned.GroupVersion{Version: constants.K8sVersion}
|
||||||
c.APIPath = constants.K8sApiPath
|
c.APIPath = constants.K8sAPIPath
|
||||||
c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
|
c.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
|
||||||
|
|
||||||
schemeBuilder := runtime.NewSchemeBuilder(
|
schemeBuilder := runtime.NewSchemeBuilder(
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type InfrastructureAccount struct {
|
type InfrastructureAccount struct {
|
||||||
Id string `json:"id"`
|
ID string `json:"id"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Provider string `json:"provider"`
|
Provider string `json:"provider"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
|
|
@ -22,9 +22,9 @@ type InfrastructureAccount struct {
|
||||||
|
|
||||||
type Team struct {
|
type Team struct {
|
||||||
Dn string `json:"dn"`
|
Dn string `json:"dn"`
|
||||||
Id string `json:"id"`
|
ID string `json:"id"`
|
||||||
TeamName string `json:"id_name"`
|
TeamName string `json:"id_name"`
|
||||||
TeamId string `json:"team_id"`
|
TeamID string `json:"team_id"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
FullName string `json:"name"`
|
FullName string `json:"name"`
|
||||||
Aliases []string `json:"alias"`
|
Aliases []string `json:"alias"`
|
||||||
|
|
@ -32,12 +32,12 @@ type Team struct {
|
||||||
Members []string `json:"member"`
|
Members []string `json:"member"`
|
||||||
CostCenter string `json:"cost_center"`
|
CostCenter string `json:"cost_center"`
|
||||||
DeliveryLead string `json:"delivery_lead"`
|
DeliveryLead string `json:"delivery_lead"`
|
||||||
ParentTeamId string `json:"parent_team_id"`
|
ParentTeamID string `json:"parent_team_id"`
|
||||||
|
|
||||||
InfrastructureAccounts []InfrastructureAccount `json:"infrastructure-accounts"`
|
InfrastructureAccounts []InfrastructureAccount `json:"infrastructure-accounts"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TeamsAPI struct {
|
type API struct {
|
||||||
url string
|
url string
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
logger *logrus.Entry
|
logger *logrus.Entry
|
||||||
|
|
@ -45,8 +45,8 @@ type TeamsAPI struct {
|
||||||
enabled bool
|
enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTeamsAPI(url string, log *logrus.Logger, enabled bool) *TeamsAPI {
|
func NewTeamsAPI(url string, log *logrus.Logger, enabled bool) *API {
|
||||||
t := TeamsAPI{
|
t := API{
|
||||||
url: strings.TrimRight(url, "/"),
|
url: strings.TrimRight(url, "/"),
|
||||||
httpClient: &http.Client{},
|
httpClient: &http.Client{},
|
||||||
logger: log.WithField("pkg", "teamsapi"),
|
logger: log.WithField("pkg", "teamsapi"),
|
||||||
|
|
@ -56,7 +56,7 @@ func NewTeamsAPI(url string, log *logrus.Logger, enabled bool) *TeamsAPI {
|
||||||
return &t
|
return &t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TeamsAPI) TeamInfo(teamId string) (*Team, error) {
|
func (t *API) TeamInfo(teamID string) (*Team, error) {
|
||||||
// TODO: avoid getting a new token on every call to the Teams API.
|
// TODO: avoid getting a new token on every call to the Teams API.
|
||||||
if !t.enabled {
|
if !t.enabled {
|
||||||
t.logger.Debug("Team API is disabled, returning empty list of members")
|
t.logger.Debug("Team API is disabled, returning empty list of members")
|
||||||
|
|
@ -66,7 +66,7 @@ func (t *TeamsAPI) TeamInfo(teamId string) (*Team, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
url := fmt.Sprintf("%s/teams/%s", t.url, teamId)
|
url := fmt.Sprintf("%s/teams/%s", t.url, teamID)
|
||||||
t.logger.Debugf("Request url: %s", url)
|
t.logger.Debugf("Request url: %s", url)
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -89,9 +89,9 @@ func (t *TeamsAPI) TeamInfo(teamId string) (*Team, error) {
|
||||||
|
|
||||||
if errMessage, ok := raw["error"]; ok {
|
if errMessage, ok := raw["error"]; ok {
|
||||||
return nil, fmt.Errorf("Team API query failed with status code %d and message: '%s'", resp.StatusCode, string(errMessage))
|
return nil, fmt.Errorf("Team API query failed with status code %d and message: '%s'", resp.StatusCode, string(errMessage))
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("Team API query failed with status code %d", resp.StatusCode)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("Team API query failed with status code %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
teamInfo := &Team{}
|
teamInfo := &Team{}
|
||||||
d := json.NewDecoder(resp.Body)
|
d := json.NewDecoder(resp.Body)
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ func (s DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserMap,
|
||||||
for name, newUser := range newUsers {
|
for name, newUser := range newUsers {
|
||||||
dbUser, exists := dbUsers[name]
|
dbUser, exists := dbUsers[name]
|
||||||
if !exists {
|
if !exists {
|
||||||
reqs = append(reqs, spec.PgSyncUserRequest{spec.PGSyncUserAdd, newUser})
|
reqs = append(reqs, spec.PgSyncUserRequest{Kind: spec.PGSyncUserAdd, User: newUser})
|
||||||
} else {
|
} else {
|
||||||
r := spec.PgSyncUserRequest{}
|
r := spec.PgSyncUserRequest{}
|
||||||
newMD5Password := util.PGUserPassword(newUser)
|
newMD5Password := util.PGUserPassword(newUser)
|
||||||
|
|
@ -67,7 +67,7 @@ func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserReque
|
||||||
return fmt.Errorf("Can't alter user '%s': %s", r.User.Name, err)
|
return fmt.Errorf("Can't alter user '%s': %s", r.User.Name, err)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unrecognized operation: %s", r.Kind)
|
return fmt.Errorf("Unrecognized operation: %v", r.Kind)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MD5Prefix = "md5"
|
md5prefix = "md5"
|
||||||
)
|
)
|
||||||
|
|
||||||
var passwordChars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
|
var passwordChars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
|
||||||
|
|
@ -41,12 +41,12 @@ func NameFromMeta(meta v1.ObjectMeta) spec.NamespacedName {
|
||||||
}
|
}
|
||||||
|
|
||||||
func PGUserPassword(user spec.PgUser) string {
|
func PGUserPassword(user spec.PgUser) string {
|
||||||
if (len(user.Password) == md5.Size && user.Password[:3] == MD5Prefix) || user.Password == "" {
|
if (len(user.Password) == md5.Size && user.Password[:3] == md5prefix) || user.Password == "" {
|
||||||
// Avoid processing already encrypted or empty passwords
|
// Avoid processing already encrypted or empty passwords
|
||||||
return user.Password
|
return user.Password
|
||||||
}
|
}
|
||||||
s := md5.Sum([]byte(user.Password + user.Name))
|
s := md5.Sum([]byte(user.Password + user.Name))
|
||||||
return MD5Prefix + hex.EncodeToString(s[:])
|
return md5prefix + hex.EncodeToString(s[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
func Pretty(x interface{}) (f fmt.Formatter) {
|
func Pretty(x interface{}) (f fmt.Formatter) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue