diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 854abfed4..2b61be9e1 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -34,7 +34,7 @@ var ( userRegexp = regexp.MustCompile(`^[a-z0-9]([-_a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-_a-z0-9]*[a-z0-9])?)*$`) ) -//TODO: remove struct duplication +// Config contains operator-wide clients and configuration used from a cluster. TODO: remove struct duplication. type Config struct { KubeClient *kubernetes.Clientset //TODO: move clients to the better place? RestClient *rest.RESTClient @@ -45,7 +45,7 @@ type Config struct { } type kubeResources struct { - Service *v1.Service + Service map[PostgresRole]*v1.Service Endpoint *v1.Endpoints Secrets map[types.UID]*v1.Secret Statefulset *v1beta1.StatefulSet @@ -77,9 +77,10 @@ type compareStatefulsetResult struct { reasons []string } +// New creates a new cluster. This function should be called from a controller. func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { lg := logger.WithField("pkg", "cluster").WithField("cluster-name", pgSpec.Metadata.Name) - kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret)} + kubeResources := kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[PostgresRole]*v1.Service)} orphanDependents := true podEventsQueue := cache.NewFIFO(func(obj interface{}) (string, error) { @@ -108,7 +109,7 @@ func New(cfg Config, pgSpec spec.Postgresql, logger *logrus.Entry) *Cluster { return cluster } -func (c *Cluster) ClusterName() spec.NamespacedName { +func (c *Cluster) clusterName() spec.NamespacedName { return util.NameFromMeta(c.Metadata) } @@ -136,7 +137,7 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) { } if err != nil { - c.logger.Warningf("could not set status for cluster '%s': %s", c.ClusterName(), err) + c.logger.Warningf("could not set status for cluster '%s': %s", c.clusterName(), err) } } @@ -155,11 +156,10 @@ func (c *Cluster) initUsers() error { return fmt.Errorf("could not init human users: %v", err) } - c.logger.Debugf("Initialized users: %# v", util.Pretty(c.pgUsers)) - return nil } +// Create creates the new kubernetes objects associated with the cluster. func (c *Cluster) Create() error { c.mu.Lock() defer c.mu.Unlock() @@ -182,11 +182,16 @@ func (c *Cluster) Create() error { } c.logger.Infof("endpoint '%s' has been successfully created", util.NameFromMeta(ep.ObjectMeta)) - service, err := c.createService() - if err != nil { - return fmt.Errorf("could not create service: %v", err) + for _, role := range []PostgresRole{Master, Replica} { + if role == Replica && !c.Spec.ReplicaLoadBalancer { + continue + } + service, err := c.createService(role) + if err != nil { + return fmt.Errorf("could not create %s service: %v", role, err) + } + c.logger.Infof("%s service '%s' has been successfully created", role, util.NameFromMeta(service.ObjectMeta)) } - c.logger.Infof("service '%s' has been successfully created", util.NameFromMeta(service.ObjectMeta)) if err = c.initUsers(); err != nil { return err @@ -226,7 +231,7 @@ func (c *Cluster) Create() error { } } - err = c.ListResources() + err = c.listResources() if err != nil { c.logger.Errorf("could not list resources: %s", err) } @@ -234,14 +239,19 @@ func (c *Cluster) Create() error { return nil } -func (c *Cluster) sameServiceWith(service *v1.Service) (match bool, reason string) { +func (c *Cluster) sameServiceWith(role PostgresRole, service *v1.Service) (match bool, reason string) { //TODO: improve comparison - if !reflect.DeepEqual(c.Service.Spec.LoadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) { - reason = "new service's LoadBalancerSourceRange doesn't match the current one" - } else { - match = true + match = true + old := c.Service[role].Spec.LoadBalancerSourceRanges + new := service.Spec.LoadBalancerSourceRanges + /* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */ + if (len(old) == 0) && (len(new) == 0) { + return true, "" } - return + if !reflect.DeepEqual(old, new) { + return false, fmt.Sprintf("new %s service's LoadBalancerSourceRange doesn't match the current one", role) + } + return true, "" } func (c *Cluster) sameVolumeWith(volume spec.Volume) (match bool, reason string) { @@ -377,6 +387,8 @@ func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resource } +// Update changes Kubernetes objects according to the new specification. Unlike the sync case, the missing object. +// (i.e. service) is treated as an error. func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() @@ -385,14 +397,46 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { c.logger.Debugf("Cluster update from version %s to %s", c.Metadata.ResourceVersion, newSpec.Metadata.ResourceVersion) - newService := c.genService(newSpec.Spec.AllowedSourceRanges) - if match, reason := c.sameServiceWith(newService); !match { - c.logServiceChanges(c.Service, newService, true, reason) - if err := c.updateService(newService); err != nil { - c.setStatus(spec.ClusterStatusUpdateFailed) - return fmt.Errorf("could not update service: %v", err) + /* Make sure we update when this function exists */ + defer func() { + c.Postgresql = *newSpec + }() + + for _, role := range []PostgresRole{Master, Replica} { + if role == Replica { + if !newSpec.Spec.ReplicaLoadBalancer { + // old spec had a load balancer, but the new one doesn't + if c.Spec.ReplicaLoadBalancer { + err := c.deleteService(role) + if err != nil { + return fmt.Errorf("could not delete obsolete %s service: %v", role, err) + } + c.logger.Infof("deleted obsolete %s service", role) + } + } else { + if !c.Spec.ReplicaLoadBalancer { + // old spec didn't have a load balancer, but the one does + service, err := c.createService(role) + if err != nil { + return fmt.Errorf("could not create new %s service: %v", role, err) + } + c.logger.Infof("%s service '%s' has been created", role, util.NameFromMeta(service.ObjectMeta)) + } + } + // only proceeed further if both old and new load balancer were present + if !(newSpec.Spec.ReplicaLoadBalancer && c.Spec.ReplicaLoadBalancer) { + continue + } + } + newService := c.genService(role, newSpec.Spec.AllowedSourceRanges) + if match, reason := c.sameServiceWith(role, newService); !match { + c.logServiceChanges(role, c.Service[role], newService, true, reason) + if err := c.updateService(role, newService); err != nil { + c.setStatus(spec.ClusterStatusUpdateFailed) + return fmt.Errorf("could not update %s service: %v", role, err) + } + c.logger.Infof("%s service '%s' has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta)) } - c.logger.Infof("service '%s' has been updated", util.NameFromMeta(c.Service.ObjectMeta)) } newStatefulSet, err := c.genStatefulSet(newSpec.Spec) @@ -448,6 +492,7 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error { return nil } +// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets). func (c *Cluster) Delete() error { c.mu.Lock() defer c.mu.Unlock() @@ -456,8 +501,13 @@ func (c *Cluster) Delete() error { return fmt.Errorf("could not delete endpoint: %v", err) } - if err := c.deleteService(); err != nil { - return fmt.Errorf("could not delete service: %v", err) + for _, role := range []PostgresRole{Master, Replica} { + if role == Replica && !c.Spec.ReplicaLoadBalancer { + continue + } + if err := c.deleteService(role); err != nil { + return fmt.Errorf("could not delete %s service: %v", role, err) + } } if err := c.deleteStatefulSet(); err != nil { @@ -473,6 +523,7 @@ func (c *Cluster) Delete() error { return nil } +// ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue. func (c *Cluster) ReceivePodEvent(event spec.PodEvent) { c.podEventsQueue.Add(event) } @@ -493,6 +544,7 @@ func (c *Cluster) processPodEvent(obj interface{}) error { return nil } +// Run starts the pod event dispatching for the given cluster. func (c *Cluster) Run(stopCh <-chan struct{}) { go c.processPodEventQueue(stopCh) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 203afb26e..fd71661ad 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -15,9 +15,9 @@ import ( ) const ( - PGBinariesLocationTemplate = "/usr/lib/postgresql/%s/bin" - PatroniPGBinariesParameterName = "pg_bin" - PatroniPGParametersParameterName = "parameters" + pgBinariesLocationTemplate = "/usr/lib/postgresql/%s/bin" + patroniPGBinariesParameterName = "bin_dir" + patroniPGParametersParameterName = "parameters" ) type pgUser struct { @@ -25,7 +25,7 @@ type pgUser struct { Options []string `json:"options"` } -type PatroniDCS struct { +type patroniDCS struct { TTL uint32 `json:"ttl,omitempty"` LoopWait uint32 `json:"loop_wait,omitempty"` RetryTimeout uint32 `json:"retry_timeout,omitempty"` @@ -36,7 +36,7 @@ type pgBootstrap struct { Initdb []interface{} `json:"initdb"` Users map[string]pgUser `json:"users"` PgHBA []string `json:"pg_hba"` - DCS PatroniDCS `json:"dcs,omitempty"` + DCS patroniDCS `json:"dcs,omitempty"` } type spiloConfiguration struct { @@ -185,9 +185,9 @@ PATRONI_INITDB_PARAMS: } config.PgLocalConfiguration = make(map[string]interface{}) - config.PgLocalConfiguration[PatroniPGBinariesParameterName] = fmt.Sprintf(PGBinariesLocationTemplate, pg.PgVersion) + config.PgLocalConfiguration[patroniPGBinariesParameterName] = fmt.Sprintf(pgBinariesLocationTemplate, pg.PgVersion) if len(pg.Parameters) > 0 { - config.PgLocalConfiguration[PatroniPGParametersParameterName] = pg.Parameters + config.PgLocalConfiguration[patroniPGParametersParameterName] = pg.Parameters } config.Bootstrap.Users = map[string]pgUser{ c.OpConfig.PamRoleName: { @@ -425,14 +425,22 @@ func (c *Cluster) genSingleUserSecret(namespace string, pgUser spec.PgUser) *v1. return &secret } -func (c *Cluster) genService(allowedSourceRanges []string) *v1.Service { +func (c *Cluster) genService(role PostgresRole, allowedSourceRanges []string) *v1.Service { + + dnsNameFunction := c.masterDnsName + name := c.Metadata.Name + if role == Replica { + dnsNameFunction = c.replicaDnsName + name = name + "-repl" + } + service := &v1.Service{ ObjectMeta: v1.ObjectMeta{ - Name: c.Metadata.Name, + Name: name, Namespace: c.Metadata.Namespace, - Labels: c.labelsSet(), + Labels: c.roleLabelsSet(role), Annotations: map[string]string{ - constants.ZalandoDNSNameAnnotation: c.dnsName(), + constants.ZalandoDNSNameAnnotation: dnsNameFunction(), constants.ElbTimeoutAnnotationName: constants.ElbTimeoutAnnotationValue, }, }, @@ -442,16 +450,19 @@ func (c *Cluster) genService(allowedSourceRanges []string) *v1.Service { LoadBalancerSourceRanges: allowedSourceRanges, }, } + if role == Replica { + service.Spec.Selector = map[string]string{c.OpConfig.PodRoleLabel: string(Replica)} + } return service } -func (c *Cluster) genEndpoints() *v1.Endpoints { +func (c *Cluster) genMasterEndpoints() *v1.Endpoints { endpoints := &v1.Endpoints{ ObjectMeta: v1.ObjectMeta{ Name: c.Metadata.Name, Namespace: c.Metadata.Namespace, - Labels: c.labelsSet(), + Labels: c.roleLabelsSet(Master), }, } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 44118295b..aa5964302 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -24,20 +24,31 @@ func (c *Cluster) loadResources() error { if err != nil { return fmt.Errorf("could not get list of services: %v", err) } - if len(services.Items) > 1 { + if len(services.Items) > 2 { return fmt.Errorf("too many(%d) services for a cluster", len(services.Items)) - } else if len(services.Items) == 1 { - c.Service = &services.Items[0] + } + for i, svc := range services.Items { + switch PostgresRole(svc.Labels[c.OpConfig.PodRoleLabel]) { + case Replica: + c.Service[Replica] = &services.Items[i] + default: + c.Service[Master] = &services.Items[i] + } } endpoints, err := c.KubeClient.Endpoints(ns).List(listOptions) if err != nil { return fmt.Errorf("could not get list of endpoints: %v", err) } - if len(endpoints.Items) > 1 { + if len(endpoints.Items) > 2 { return fmt.Errorf("too many(%d) endpoints for a cluster", len(endpoints.Items)) - } else if len(endpoints.Items) == 1 { - c.Endpoint = &endpoints.Items[0] + } + + for i, ep := range endpoints.Items { + if ep.Labels[c.OpConfig.PodRoleLabel] != string(Replica) { + c.Endpoint = &endpoints.Items[i] + break + } } secrets, err := c.KubeClient.Secrets(ns).List(listOptions) @@ -58,14 +69,15 @@ func (c *Cluster) loadResources() error { } if len(statefulSets.Items) > 1 { return fmt.Errorf("too many(%d) statefulsets for a cluster", len(statefulSets.Items)) - } else if len(statefulSets.Items) == 1 { + } + if len(statefulSets.Items) == 1 { c.Statefulset = &statefulSets.Items[0] } return nil } -func (c *Cluster) ListResources() error { +func (c *Cluster) listResources() error { if c.Statefulset != nil { c.logger.Infof("Found statefulset: %s (uid: %s)", util.NameFromMeta(c.Statefulset.ObjectMeta), c.Statefulset.UID) } @@ -78,8 +90,8 @@ func (c *Cluster) ListResources() error { c.logger.Infof("Found endpoint: %s (uid: %s)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID) } - if c.Service != nil { - c.logger.Infof("Found service: %s (uid: %s)", util.NameFromMeta(c.Service.ObjectMeta), c.Service.UID) + for role, service := range c.Service { + c.logger.Infof("Found %s service: %s (uid: %s)", role, util.NameFromMeta(service.ObjectMeta), service.UID) } pods, err := c.listPods() @@ -217,57 +229,56 @@ func (c *Cluster) deleteStatefulSet() error { return nil } -func (c *Cluster) createService() (*v1.Service, error) { - if c.Service != nil { +func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { + if c.Service[role] != nil { return nil, fmt.Errorf("service already exists in the cluster") } - serviceSpec := c.genService(c.Spec.AllowedSourceRanges) + serviceSpec := c.genService(role, c.Spec.AllowedSourceRanges) service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(serviceSpec) if err != nil { return nil, err } - c.Service = service + c.Service[role] = service return service, nil } -func (c *Cluster) updateService(newService *v1.Service) error { - if c.Service == nil { +func (c *Cluster) updateService(role PostgresRole, newService *v1.Service) error { + if c.Service[role] == nil { return fmt.Errorf("there is no service in the cluster") } - serviceName := util.NameFromMeta(c.Service.ObjectMeta) + serviceName := util.NameFromMeta(c.Service[role].ObjectMeta) patchData, err := specPatch(newService.Spec) if err != nil { return fmt.Errorf("could not form patch for the service '%s': %v", serviceName, err) } - svc, err := c.KubeClient.Services(c.Service.Namespace).Patch( - c.Service.Name, + svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch( + c.Service[role].Name, api.MergePatchType, patchData, "") if err != nil { return fmt.Errorf("could not patch service '%s': %v", serviceName, err) } - c.Service = svc + c.Service[role] = svc return nil } -func (c *Cluster) deleteService() error { - c.logger.Debugln("Deleting service") - - if c.Service == nil { - return fmt.Errorf("there is no service in the cluster") +func (c *Cluster) deleteService(role PostgresRole) error { + c.logger.Debugf("Deleting service %s", role) + if c.Service[role] == nil { + return fmt.Errorf("There is no %s service in the cluster", role) } - err := c.KubeClient.Services(c.Service.Namespace).Delete(c.Service.Name, c.deleteOptions) + service := c.Service[role] + err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions) if err != nil { return err } - c.logger.Infof("service '%s' has been deleted", util.NameFromMeta(c.Service.ObjectMeta)) - c.Service = nil - + c.logger.Infof("%s service '%s' has been deleted", role, util.NameFromMeta(service.ObjectMeta)) + c.Service[role] = nil return nil } @@ -275,7 +286,7 @@ func (c *Cluster) createEndpoint() (*v1.Endpoints, error) { if c.Endpoint != nil { return nil, fmt.Errorf("endpoint already exists in the cluster") } - endpointsSpec := c.genEndpoints() + endpointsSpec := c.genMasterEndpoints() endpoints, err := c.KubeClient.Endpoints(endpointsSpec.Namespace).Create(endpointsSpec) if err != nil { diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index bdc108a68..c7c071333 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -8,6 +8,8 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/util/volumes" ) +// Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest. +// Unlike the update, sync does not error out if some objects do not exist and takes care of creating them. func (c *Cluster) Sync() error { c.mu.Lock() defer c.mu.Unlock() @@ -32,9 +34,20 @@ func (c *Cluster) Sync() error { } c.logger.Debugf("Syncing services") - if err := c.syncService(); err != nil { - if !k8sutil.ResourceAlreadyExists(err) { - return fmt.Errorf("coud not sync services: %v", err) + for _, role := range []PostgresRole{Master, Replica} { + if role == Replica && !c.Spec.ReplicaLoadBalancer { + if c.Service[role] != nil { + // delete the left over replica service + if err := c.deleteService(role); err != nil { + return fmt.Errorf("could not delete obsolete %s service: %v", role) + } + } + continue + } + if err := c.syncService(role); err != nil { + if !k8sutil.ResourceAlreadyExists(err) { + return fmt.Errorf("coud not sync %s service: %v", role, err) + } } } @@ -48,16 +61,15 @@ func (c *Cluster) Sync() error { if !c.databaseAccessDisabled() { if err := c.initDbConn(); err != nil { return fmt.Errorf("could not init db connection: %v", err) - } else { - c.logger.Debugf("Syncing roles") - if err := c.SyncRoles(); err != nil { - return fmt.Errorf("could not sync roles: %v", err) - } + } + c.logger.Debugf("Syncing roles") + if err := c.syncRoles(); err != nil { + return fmt.Errorf("could not sync roles: %v", err) } } c.logger.Debugf("Syncing persistent volumes") - if err := c.SyncVolumes(); err != nil { + if err := c.syncVolumes(); err != nil { return fmt.Errorf("could not sync persistent volumes: %v", err) } @@ -75,30 +87,30 @@ func (c *Cluster) syncSecrets() error { return err } -func (c *Cluster) syncService() error { +func (c *Cluster) syncService(role PostgresRole) error { cSpec := c.Spec - if c.Service == nil { - c.logger.Infof("could not find the cluster's service") - svc, err := c.createService() + if c.Service[role] == nil { + c.logger.Infof("could not find the cluster's %s service", role) + svc, err := c.createService(role) if err != nil { - return fmt.Errorf("could not create missing service: %v", err) + return fmt.Errorf("could not create missing %s service: %v", role, err) } - c.logger.Infof("Created missing service '%s'", util.NameFromMeta(svc.ObjectMeta)) + c.logger.Infof("Created missing %s service '%s'", role, util.NameFromMeta(svc.ObjectMeta)) return nil } - desiredSvc := c.genService(cSpec.AllowedSourceRanges) - match, reason := c.sameServiceWith(desiredSvc) + desiredSvc := c.genService(role, cSpec.AllowedSourceRanges) + match, reason := c.sameServiceWith(role, desiredSvc) if match { return nil } - c.logServiceChanges(c.Service, desiredSvc, false, reason) + c.logServiceChanges(role, c.Service[role], desiredSvc, false, reason) - if err := c.updateService(desiredSvc); err != nil { - return fmt.Errorf("could not update service to match desired state: %v", err) + if err := c.updateService(role, desiredSvc); err != nil { + return fmt.Errorf("could not update %s service to match desired state: %v", role, err) } - c.logger.Infof("service '%s' is in the desired state now", util.NameFromMeta(desiredSvc.ObjectMeta)) + c.logger.Infof("%s service '%s' is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) return nil } @@ -181,7 +193,7 @@ func (c *Cluster) syncStatefulSet() error { return nil } -func (c *Cluster) SyncRoles() error { +func (c *Cluster) syncRoles() error { var userNames []string if err := c.initUsers(); err != nil { @@ -201,9 +213,9 @@ func (c *Cluster) SyncRoles() error { return nil } -/* SyncVolume reads all persistent volumes and checks that their size matches the one declared in the statefulset */ -func (c *Cluster) SyncVolumes() error { - act, err := c.VolumesNeedResizing(c.Spec.Volume) +// syncVolumes reads all persistent volumes and checks that their size matches the one declared in the statefulset. +func (c *Cluster) syncVolumes() error { + act, err := c.volumesNeedResizing(c.Spec.Volume) if err != nil { return fmt.Errorf("could not compare size of the volumes: %v", err) } diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go new file mode 100644 index 000000000..5091e532d --- /dev/null +++ b/pkg/cluster/types.go @@ -0,0 +1,8 @@ +package cluster + +type PostgresRole string + +const ( + Master PostgresRole = "master" + Replica PostgresRole = "replica" +) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 0212c103f..b6416cd7b 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -82,14 +82,14 @@ func (c *Cluster) logStatefulSetChanges(old, new *v1beta1.StatefulSet, isUpdate } } -func (c *Cluster) logServiceChanges(old, new *v1.Service, isUpdate bool, reason string) { +func (c *Cluster) logServiceChanges(role PostgresRole, old, new *v1.Service, isUpdate bool, reason string) { if isUpdate { - c.logger.Infof("service '%s' has been changed", - util.NameFromMeta(old.ObjectMeta), + c.logger.Infof("%s service '%s' has been changed", + role, util.NameFromMeta(old.ObjectMeta), ) } else { - c.logger.Infof("service '%s is not in the desired state and needs to be updated", - util.NameFromMeta(old.ObjectMeta), + c.logger.Infof("%s service '%s is not in the desired state and needs to be updated", + role, util.NameFromMeta(old.ObjectMeta), ) } c.logger.Debugf("diff\n%s\n", util.PrettyDiff(old.Spec, new.Spec)) @@ -145,7 +145,6 @@ func (c *Cluster) getTeamMembers() ([]string, error) { if err != nil { return nil, fmt.Errorf("could not get team info: %v", err) } - c.logger.Debugf("Got from the Team API: %+v", *teamInfo) return teamInfo.Members, nil } @@ -263,14 +262,30 @@ func (c *Cluster) waitStatefulsetPodsReady() error { } func (c *Cluster) labelsSet() labels.Set { - lbls := c.OpConfig.ClusterLabels + lbls := make(map[string]string) + for k, v := range c.OpConfig.ClusterLabels { + lbls[k] = v + } lbls[c.OpConfig.ClusterNameLabel] = c.Metadata.Name return labels.Set(lbls) } -func (c *Cluster) dnsName() string { - return strings.ToLower(c.OpConfig.DNSNameFormat.Format( +func (c *Cluster) roleLabelsSet(role PostgresRole) labels.Set { + lbls := c.labelsSet() + lbls[c.OpConfig.PodRoleLabel] = string(role) + return lbls +} + +func (c *Cluster) masterDnsName() string { + return strings.ToLower(c.OpConfig.MasterDNSNameFormat.Format( + "cluster", c.Spec.ClusterName, + "team", c.teamName(), + "hostedzone", c.OpConfig.DbHostedZone)) +} + +func (c *Cluster) replicaDnsName() string { + return strings.ToLower(c.OpConfig.ReplicaDNSNameFormat.Format( "cluster", c.Spec.ClusterName, "team", c.teamName(), "hostedzone", c.OpConfig.DbHostedZone)) diff --git a/pkg/cluster/volumes.go b/pkg/cluster/volumes.go index c54859665..ff5f00200 100644 --- a/pkg/cluster/volumes.go +++ b/pkg/cluster/volumes.go @@ -60,15 +60,14 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) { for _, pvc := range pvcs { lastDash := strings.LastIndex(pvc.Name, "-") if lastDash > 0 && lastDash < len(pvc.Name)-1 { - if pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]); err != nil { + pvcNumber, err := strconv.Atoi(pvc.Name[lastDash+1:]) + if err != nil { return nil, fmt.Errorf("could not convert last part of the persistent volume claim name %s to a number", pvc.Name) - } else { - if int32(pvcNumber) > lastPodIndex { - c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) - continue - } } - + if int32(pvcNumber) > lastPodIndex { + c.logger.Debugf("Skipping persistent volume %s corresponding to a non-running pods", pvc.Name) + continue + } } pv, err := c.KubeClient.PersistentVolumes().Get(pvc.Spec.VolumeName) if err != nil { @@ -139,7 +138,7 @@ func (c *Cluster) resizeVolumes(newVolume spec.Volume, resizers []volumes.Volume return nil } -func (c *Cluster) VolumesNeedResizing(newVolume spec.Volume) (bool, error) { +func (c *Cluster) volumesNeedResizing(newVolume spec.Volume) (bool, error) { volumes, manifestSize, err := c.listVolumesWithManifestSize(newVolume) if err != nil { return false, err diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 41668c033..3b31d439f 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -62,7 +62,7 @@ func (c *Controller) podAdd(obj interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.PodClusterName(pod), + ClusterName: c.podClusterName(pod), PodName: util.NameFromMeta(pod.ObjectMeta), CurPod: pod, EventType: spec.EventAdd, @@ -84,7 +84,7 @@ func (c *Controller) podUpdate(prev, cur interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.PodClusterName(curPod), + ClusterName: c.podClusterName(curPod), PodName: util.NameFromMeta(curPod.ObjectMeta), PrevPod: prevPod, CurPod: curPod, @@ -102,7 +102,7 @@ func (c *Controller) podDelete(obj interface{}) { } podEvent := spec.PodEvent{ - ClusterName: c.PodClusterName(pod), + ClusterName: c.podClusterName(pod), PodName: util.NameFromMeta(pod.ObjectMeta), CurPod: pod, EventType: spec.EventDelete, diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 714128d51..76b33168c 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -115,7 +115,7 @@ Users: return result, nil } -func (c *Controller) PodClusterName(pod *v1.Pod) spec.NamespacedName { +func (c *Controller) podClusterName(pod *v1.Pod) spec.NamespacedName { if name, ok := pod.Labels[c.opConfig.ClusterNameLabel]; ok { return spec.NamespacedName{ Namespace: pod.Namespace, diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index 0ff3d9ff3..c73e104f9 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -11,6 +11,7 @@ import ( "k8s.io/client-go/pkg/api/v1" ) +// MaintenanceWindow describes the time window when the operator is allowed to do maintenance on a cluster. type MaintenanceWindow struct { Everyday bool Weekday time.Weekday @@ -18,26 +19,31 @@ type MaintenanceWindow struct { EndTime time.Time // End time } +// Volume describes a single volume in the manifest. type Volume struct { Size string `json:"size"` StorageClass string `json:"storageClass"` } +// PostgresqlParam describes PostgreSQL version and pairs of configuration parameter name - values. type PostgresqlParam struct { PgVersion string `json:"version"` Parameters map[string]string `json:"parameters"` } +// ResourceDescription describes CPU and memory resources defined for a cluster. type ResourceDescription struct { CPU string `json:"cpu"` Memory string `json:"memory"` } +// Resources describes requests and limits for the cluster resouces. type Resources struct { ResourceRequest ResourceDescription `json:"requests,omitempty"` ResourceLimits ResourceDescription `json:"limits,omitempty"` } +// Patroni contains Patroni-specific configuration type Patroni struct { InitDB map[string]string `json:"initdb"` PgHba []string `json:"pg_hba"` @@ -47,10 +53,12 @@ type Patroni struct { MaximumLagOnFailover float32 `json:"maximum_lag_on_failover"` // float32 because https://github.com/kubernetes/kubernetes/issues/30213 } -type UserFlags []string +type userFlags []string +// PostgresStatus contains status of the PostgreSQL cluster (running, creation failed etc.) type PostgresStatus string +// possible values for PostgreSQL cluster statuses const ( ClusterStatusUnknown PostgresStatus = "" ClusterStatusCreating PostgresStatus = "Creating" @@ -61,7 +69,7 @@ const ( ClusterStatusInvalid PostgresStatus = "Invalid" ) -// PostgreSQL Third Party (resource) Object +// Postgresql defines PostgreSQL Third Party (resource) Object. type Postgresql struct { unversioned.TypeMeta `json:",inline"` Metadata v1.ObjectMeta `json:"metadata"` @@ -71,6 +79,7 @@ type Postgresql struct { Error error `json:"-"` } +// PostgresSpec defines the specification for the PostgreSQL TPR. type PostgresSpec struct { PostgresqlParam `json:"postgresql"` Volume `json:"volume,omitempty"` @@ -79,12 +88,14 @@ type PostgresSpec struct { TeamID string `json:"teamId"` AllowedSourceRanges []string `json:"allowedSourceRanges"` + ReplicaLoadBalancer bool `json:"replicaLoadBalancer,omitempty"` NumberOfInstances int32 `json:"numberOfInstances"` - Users map[string]UserFlags `json:"users"` + Users map[string]userFlags `json:"users"` MaintenanceWindows []MaintenanceWindow `json:"maintenanceWindows,omitempty"` ClusterName string `json:"-"` } +// PostgresqlList defines a list of PostgreSQL clusters. type PostgresqlList struct { unversioned.TypeMeta `json:",inline"` Metadata unversioned.ListMeta `json:"metadata"` @@ -118,6 +129,7 @@ func parseWeekday(s string) (time.Weekday, error) { return time.Weekday(weekday), nil } +// MarshalJSON converts a maintenance window definition to JSON. func (m *MaintenanceWindow) MarshalJSON() ([]byte, error) { if m.Everyday { return []byte(fmt.Sprintf("\"%s-%s\"", @@ -131,6 +143,7 @@ func (m *MaintenanceWindow) MarshalJSON() ([]byte, error) { } } +// UnmarshalJSON convets a JSON to the maintenance window definition. func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error { var ( got MaintenanceWindow @@ -176,18 +189,17 @@ func (m *MaintenanceWindow) UnmarshalJSON(data []byte) error { return nil } +// GetObject implements Object interface for PostgreSQL TPR spec object. func (p *Postgresql) GetObjectKind() unversioned.ObjectKind { return &p.TypeMeta } +// GetObjectMeta implements ObjectMetaAccessor interface for PostgreSQL TPR spec object. func (p *Postgresql) GetObjectMeta() meta.Object { return &p.Metadata } -func (pl *PostgresqlList) GetObjectKind() unversioned.ObjectKind { - return &pl.TypeMeta -} - +// GetListMeta implements ListMetaAccessor interface for PostgreSQL TPR List spec object. func (pl *PostgresqlList) GetListMeta() unversioned.List { return &pl.Metadata } @@ -213,11 +225,12 @@ func extractClusterName(clusterName string, teamName string) (string, error) { // resources and ugorji. If/when these issues are resolved, the code below // should no longer be required. // -type PostgresqlListCopy PostgresqlList -type PostgresqlCopy Postgresql +type postgresqlListCopy PostgresqlList +type postgresqlCopy Postgresql +// UnmarshalJSON converts a JSON into the PostgreSQL object. func (p *Postgresql) UnmarshalJSON(data []byte) error { - var tmp PostgresqlCopy + var tmp postgresqlCopy err := json.Unmarshal(data, &tmp) if err != nil { @@ -247,8 +260,9 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error { return nil } +// UnmarshalJSON converts a JSON into the PostgreSQL List object. func (pl *PostgresqlList) UnmarshalJSON(data []byte) error { - var tmp PostgresqlListCopy + var tmp postgresqlListCopy err := json.Unmarshal(data, &tmp) if err != nil { diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 864f51928..720bffcd0 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -7,10 +7,13 @@ import ( "k8s.io/client-go/pkg/types" ) +// EvenType contains type of the events for the TPRs and Pods received from Kubernetes type EventType string +// NamespacedName describes the namespace/name pairs used in Kubernetes names. type NamespacedName types.NamespacedName +// Possible values for the EventType const ( EventAdd EventType = "ADD" EventUpdate EventType = "UPDATE" @@ -18,6 +21,7 @@ const ( EventSync EventType = "SYNC" ) +// ClusterEvent carries the payload of the Cluster TPR events. type ClusterEvent struct { UID types.UID EventType EventType @@ -26,13 +30,15 @@ type ClusterEvent struct { WorkerID uint32 } -type SyncUserOperation int +type syncUserOperation int +// Possible values for the sync user operation (removal of users is not supported yet) const ( PGSyncUserAdd = iota PGsyncUserAlter ) +// PodEvent describes the event for a single Pod type PodEvent struct { ResourceVersion string ClusterName NamespacedName @@ -42,6 +48,7 @@ type PodEvent struct { EventType EventType } +// PgUser contains information about a single user. type PgUser struct { Name string Password string @@ -49,13 +56,16 @@ type PgUser struct { MemberOf []string } +// PgUserMap maps user names to the definitions. type PgUserMap map[string]PgUser +// PgSyncUserRequest has information about a single request to sync a user. type PgSyncUserRequest struct { - Kind SyncUserOperation + Kind syncUserOperation User PgUser } +// UserSyncer defines an interface for the implementations to sync users from the manifest to the DB. type UserSyncer interface { ProduceSyncRequests(dbUsers PgUserMap, newUsers PgUserMap) (req []PgSyncUserRequest) ExecuteSyncRequests(req []PgSyncUserRequest, db *sql.DB) error @@ -69,10 +79,12 @@ func (n NamespacedName) String() string { return types.NamespacedName(n).String() } +// MarshalJSON defines marshaling rule for the namespaced name type. func (n NamespacedName) MarshalJSON() ([]byte, error) { return []byte("\"" + n.String() + "\""), nil } +// Decode converts a (possibly unqualified) string into the namespaced name object. func (n *NamespacedName) Decode(value string) error { name := types.NewNamespacedNameFromString(value) if value != "" && name == (types.NamespacedName{}) { diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 560fc1efc..6c5178ab8 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -43,19 +43,20 @@ type Config struct { TPR Resources Auth - Namespace string `name:"namespace"` - EtcdHost string `name:"etcd_host" default:"etcd-client.default.svc.cluster.local:2379"` - DockerImage string `name:"docker_image" default:"registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4"` - ServiceAccountName string `name:"service_account_name" default:"operator"` - DbHostedZone string `name:"db_hosted_zone" default:"db.example.com"` - EtcdScope string `name:"etcd_scope" default:"service"` - WALES3Bucket string `name:"wal_s3_bucket"` - KubeIAMRole string `name:"kube_iam_role"` - DebugLogging bool `name:"debug_logging" default:"true"` - EnableDBAccess bool `name:"enable_database_access" default:"true"` - EnableTeamsAPI bool `name:"enable_teams_api" default:"true"` - DNSNameFormat stringTemplate `name:"dns_name_format" default:"{cluster}.{team}.{hostedzone}"` - Workers uint32 `name:"workers" default:"4"` + Namespace string `name:"namespace"` + EtcdHost string `name:"etcd_host" default:"etcd-client.default.svc.cluster.local:2379"` + DockerImage string `name:"docker_image" default:"registry.opensource.zalan.do/acid/spiloprivate-9.6:1.2-p4"` + ServiceAccountName string `name:"service_account_name" default:"operator"` + DbHostedZone string `name:"db_hosted_zone" default:"db.example.com"` + EtcdScope string `name:"etcd_scope" default:"service"` + WALES3Bucket string `name:"wal_s3_bucket"` + KubeIAMRole string `name:"kube_iam_role"` + DebugLogging bool `name:"debug_logging" default:"true"` + EnableDBAccess bool `name:"enable_database_access" default:"true"` + EnableTeamsAPI bool `name:"enable_teams_api" default:"true"` + MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"` + ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"` + Workers uint32 `name:"workers" default:"4"` } func (c Config) MustMarshal() string { diff --git a/pkg/util/config/util.go b/pkg/util/config/util.go index de3d55e24..5d9173efe 100644 --- a/pkg/util/config/util.go +++ b/pkg/util/config/util.go @@ -9,7 +9,7 @@ import ( "time" ) -type Decoder interface { +type decoder interface { Decode(value string) error } @@ -21,15 +21,15 @@ type fieldInfo struct { type stringTemplate string -func decoderFrom(field reflect.Value) (d Decoder) { +func decoderFrom(field reflect.Value) (d decoder) { // it may be impossible for a struct field to fail this check if !field.CanInterface() { return } - d, ok := field.Interface().(Decoder) + d, ok := field.Interface().(decoder) if !ok && field.CanAddr() { - d, _ = field.Addr().Interface().(Decoder) + d, _ = field.Addr().Interface().(decoder) } return d diff --git a/pkg/util/constants/annotations.go b/pkg/util/constants/annotations.go index 5a2e625d1..0b93fc2e1 100644 --- a/pkg/util/constants/annotations.go +++ b/pkg/util/constants/annotations.go @@ -1,5 +1,6 @@ package constants +// Names and values in Kubernetes annotation for services, statefulsets and volumes const ( ZalandoDNSNameAnnotation = "external-dns.alpha.kubernetes.io/hostname" ElbTimeoutAnnotationName = "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout" diff --git a/pkg/util/constants/aws.go b/pkg/util/constants/aws.go index 5140a1729..c2edfbb97 100644 --- a/pkg/util/constants/aws.go +++ b/pkg/util/constants/aws.go @@ -2,8 +2,11 @@ package constants import "time" +// AWS specific constants used by other modules const ( - AWS_REGION = "eu-central-1" + // default region for AWS. TODO: move it to the operator configuration + AWS_REGION = "eu-central-1" + // EBS related constants EBSVolumeIDStart = "/vol-" EBSProvisioner = "kubernetes.io/aws-ebs" //https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_VolumeModification.html diff --git a/pkg/util/constants/kubernetes.go b/pkg/util/constants/kubernetes.go index 0341d3734..048a83034 100644 --- a/pkg/util/constants/kubernetes.go +++ b/pkg/util/constants/kubernetes.go @@ -2,6 +2,7 @@ package constants import "time" +// General kubernetes-related constants const ( ListClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/namespaces/%s/" + ResourceName // Namespace WatchClustersURITemplate = "/apis/" + TPRVendor + "/" + TPRApiVersion + "/watch/namespaces/%s/" + ResourceName // Namespace diff --git a/pkg/util/constants/thirdpartyresource.go b/pkg/util/constants/thirdpartyresource.go index 64d702bc6..7207b4583 100644 --- a/pkg/util/constants/thirdpartyresource.go +++ b/pkg/util/constants/thirdpartyresource.go @@ -1,5 +1,6 @@ package constants +// Different properties of the PostgreSQL Third Party Resources const ( TPRName = "postgresql" TPRVendor = "acid.zalan.do" diff --git a/pkg/util/constants/units.go b/pkg/util/constants/units.go index 4ec3b2511..e124e0b79 100644 --- a/pkg/util/constants/units.go +++ b/pkg/util/constants/units.go @@ -1,5 +1,6 @@ package constants +// Measurement-unit definitions const ( Gigabyte = 1073741824 ) diff --git a/pkg/util/filesystems/ext234.go b/pkg/util/filesystems/ext234.go index 7d9215453..ceea73984 100644 --- a/pkg/util/filesystems/ext234.go +++ b/pkg/util/filesystems/ext234.go @@ -11,19 +11,22 @@ var ( ) const ( - EXT2 = "ext2" - EXT3 = "ext3" - EXT4 = "ext4" + ext2 = "ext2" + ext3 = "ext3" + ext4 = "ext4" resize2fs = "resize2fs" ) +// Ext234Resize implements the FilesystemResizer interface for the ext4/3/2fs. type Ext234Resize struct { } +// CanResizeFilesystem checks whether Ext234Resize can resize this filesystem. func (c *Ext234Resize) CanResizeFilesystem(fstype string) bool { - return fstype == EXT2 || fstype == EXT3 || fstype == EXT4 + return fstype == ext2 || fstype == ext3 || fstype == ext4 } +// ResizeFilesystem calls resize2fs to resize the filesystem if necessary. func (c *Ext234Resize) ResizeFilesystem(deviceName string, commandExecutor func(cmd string) (out string, err error)) error { command := fmt.Sprintf("%s %s 2>&1", resize2fs, deviceName) out, err := commandExecutor(command) diff --git a/pkg/util/retryutil/retry_util.go b/pkg/util/retryutil/retry_util.go index 12fbf0af9..6f879cdff 100644 --- a/pkg/util/retryutil/retry_util.go +++ b/pkg/util/retryutil/retry_util.go @@ -7,6 +7,7 @@ import ( type ConditionFunc func() (bool, error) +// Retry calls ConditionFunc until it returns boolean true, a timeout expires or an error occurs. func Retry(interval time.Duration, timeout time.Duration, f ConditionFunc) error { //TODO: make the retry exponential if timeout < interval { diff --git a/pkg/util/teams/teams.go b/pkg/util/teams/teams.go index e2309065e..09154d594 100644 --- a/pkg/util/teams/teams.go +++ b/pkg/util/teams/teams.go @@ -9,7 +9,8 @@ import ( "github.com/Sirupsen/logrus" ) -type InfrastructureAccount struct { +// InfrastructureAccount defines an account of the team on some infrastructure (i.e AWS, Google) platform. +type infrastructureAccount struct { ID string `json:"id"` Name string `json:"name"` Provider string `json:"provider"` @@ -20,7 +21,8 @@ type InfrastructureAccount struct { Disabled bool `json:"disabled"` } -type Team struct { +// Team defines informaiton for a single team, including the list of members and infrastructure accounts. +type team struct { Dn string `json:"dn"` ID string `json:"id"` TeamName string `json:"id_name"` @@ -34,15 +36,17 @@ type Team struct { DeliveryLead string `json:"delivery_lead"` ParentTeamID string `json:"parent_team_id"` - InfrastructureAccounts []InfrastructureAccount `json:"infrastructure-accounts"` + InfrastructureAccounts []infrastructureAccount `json:"infrastructure-accounts"` } +// type API struct { url string httpClient *http.Client logger *logrus.Entry } +// NewTeamsAPI creates an object to query the team API. func NewTeamsAPI(url string, log *logrus.Logger) *API { t := API{ url: strings.TrimRight(url, "/"), @@ -53,7 +57,8 @@ func NewTeamsAPI(url string, log *logrus.Logger) *API { return &t } -func (t *API) TeamInfo(teamID, token string) (*Team, error) { +// TeamInfo returns information about a given team using its ID and a token to authenticate to the API service. +func (t *API) TeamInfo(teamID, token string) (*team, error) { url := fmt.Sprintf("%s/teams/%s", t.url, teamID) t.logger.Debugf("Request url: %s", url) req, err := http.NewRequest("GET", url, nil) @@ -81,7 +86,7 @@ func (t *API) TeamInfo(teamID, token string) (*Team, error) { return nil, fmt.Errorf("team API query failed with status code %d", resp.StatusCode) } - teamInfo := &Team{} + teamInfo := &team{} d := json.NewDecoder(resp.Body) err = d.Decode(teamInfo) if err != nil { diff --git a/pkg/util/teams/teams_test.go b/pkg/util/teams/teams_test.go index b17927362..f9e8fb940 100644 --- a/pkg/util/teams/teams_test.go +++ b/pkg/util/teams/teams_test.go @@ -17,7 +17,7 @@ var ( var teamsAPItc = []struct { in string inCode int - out *Team + out *team err error }{ {`{ @@ -66,7 +66,7 @@ var teamsAPItc = []struct { "parent_team_id": "111221" }`, 200, - &Team{ + &team{ Dn: "cn=100100,ou=official,ou=foobar,dc=zalando,dc=net", ID: "acid", TeamName: "ACID", @@ -79,7 +79,7 @@ var teamsAPItc = []struct { CostCenter: "00099999", DeliveryLead: "member4", ParentTeamID: "111221", - InfrastructureAccounts: []InfrastructureAccount{ + InfrastructureAccounts: []infrastructureAccount{ { ID: "1234512345", Name: "acid", diff --git a/pkg/util/users/users.go b/pkg/util/users/users.go index 99e602b43..8ff32d305 100644 --- a/pkg/util/users/users.go +++ b/pkg/util/users/users.go @@ -18,9 +18,14 @@ const ( inRoleTemplate = `IN ROLE %s` ) +// DefaultUserSyncStrategy implements a user sync strategy that merges already existing database users +// with those defined in the manifest, altering existing users when necessary. It will never strips +// an existing roles of another role membership, nor it removes the already assigned flag +// (except for the NOLOGIN). TODO: process other NOflags, i.e. NOSUPERUSER correctly. type DefaultUserSyncStrategy struct { } +// ProduceSyncRequests figures out the types of changes that need to happen with the given users. func (s DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserMap, newUsers spec.PgUserMap) (reqs []spec.PgSyncUserRequest) { @@ -55,6 +60,7 @@ func (s DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserMap, return } +// ExecuteSyncRequests makes actual database changes from the requests passed in its arguments. func (s DefaultUserSyncStrategy) ExecuteSyncRequests(reqs []spec.PgSyncUserRequest, db *sql.DB) error { for _, r := range reqs { switch r.Kind { diff --git a/pkg/util/util.go b/pkg/util/util.go index e67a02294..abe4a8237 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -3,7 +3,6 @@ package util import ( "crypto/md5" "encoding/hex" - "fmt" "math/rand" "strings" "time" @@ -24,6 +23,7 @@ func init() { rand.Seed(int64(time.Now().Unix())) } +// RandomPassword generates random alphanumeric password of a given length. func RandomPassword(n int) string { b := make([]byte, n) for i := range b { @@ -33,6 +33,7 @@ func RandomPassword(n int) string { return string(b) } +// NameFromMeta converts a metadata object to the NamespacedName name representation. func NameFromMeta(meta v1.ObjectMeta) spec.NamespacedName { return spec.NamespacedName{ Namespace: meta.Namespace, @@ -40,6 +41,7 @@ func NameFromMeta(meta v1.ObjectMeta) spec.NamespacedName { } } +// PGUserPassword is used to generate md5 password hash for a given user. It does nothing for already hashed passwords. func PGUserPassword(user spec.PgUser) string { if (len(user.Password) == md5.Size*2+len(md5prefix) && user.Password[:3] == md5prefix) || user.Password == "" { // Avoid processing already encrypted or empty passwords @@ -49,17 +51,14 @@ func PGUserPassword(user spec.PgUser) string { return md5prefix + hex.EncodeToString(s[:]) } -func Pretty(x interface{}) (f fmt.Formatter) { - return pretty.Formatter(x) -} - +// PrettyDiff shows the diff between 2 objects in an easy to understand format. It is mainly used for debugging output. func PrettyDiff(a, b interface{}) (result string) { diff := pretty.Diff(a, b) return strings.Join(diff, "\n") } +// SubstractStringSlices finds elements in a that are not in b and return them as a result slice. func SubstractStringSlices(a []string, b []string) (result []string, equal bool) { - // Find elements in a that are not in b and return them as a result slice // Slices are assumed to contain unique elements only OUTER: for _, vala := range a { diff --git a/pkg/util/volumes/ebs.go b/pkg/util/volumes/ebs.go index 4142820f1..8d6ec12b7 100644 --- a/pkg/util/volumes/ebs.go +++ b/pkg/util/volumes/ebs.go @@ -13,10 +13,12 @@ import ( "k8s.io/client-go/pkg/api/v1" ) +// EBSVolumeResizer implements volume resizing interface for AWS EBS volumes. type EBSVolumeResizer struct { connection *ec2.EC2 } +// ConnectToProvider connects to AWS. func (c *EBSVolumeResizer) ConnectToProvider() error { sess, err := session.NewSession(&aws.Config{Region: aws.String(constants.AWS_REGION)}) if err != nil { @@ -26,10 +28,12 @@ func (c *EBSVolumeResizer) ConnectToProvider() error { return nil } +// IsConnectedToProvider checks if AWS connection is established. func (c *EBSVolumeResizer) IsConnectedToProvider() bool { return c.connection != nil } +// VolumeBelongsToProvider checks if the given persistent volume is backed by EBS. func (c *EBSVolumeResizer) VolumeBelongsToProvider(pv *v1.PersistentVolume) bool { return pv.Spec.AWSElasticBlockStore != nil && pv.Annotations[constants.VolumeStorateProvisionerAnnotation] == constants.EBSProvisioner } @@ -47,6 +51,7 @@ func (c *EBSVolumeResizer) GetProviderVolumeID(pv *v1.PersistentVolume) (string, return volumeID[idx:], nil } +// ResizeVolume actually calls AWS API to resize the EBS volume if necessary. func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { /* first check if the volume is already of a requested size */ volumeOutput, err := c.connection.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeId}}) @@ -89,7 +94,8 @@ func (c *EBSVolumeResizer) ResizeVolume(volumeId string, newSize int64) error { return false, fmt.Errorf("describe volume modification didn't return one record for volume \"%s\"", volumeId) } if *out.VolumesModifications[0].VolumeId != volumeId { - return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"") + return false, fmt.Errorf("non-matching volume id when describing modifications: \"%s\" is different from \"%s\"", + *out.VolumesModifications[0].VolumeId, volumeId) } return *out.VolumesModifications[0].ModificationState != constants.EBSVolumeStateModifying, nil }) diff --git a/pkg/util/volumes/volumes.go b/pkg/util/volumes/volumes.go index f35d0b5d3..e592ed9c9 100644 --- a/pkg/util/volumes/volumes.go +++ b/pkg/util/volumes/volumes.go @@ -4,6 +4,7 @@ import ( "k8s.io/client-go/pkg/api/v1" ) +// VolumeResizer defines the set of methods used to implememnt provider-specific resizing of persistent volumes. type VolumeResizer interface { ConnectToProvider() error IsConnectedToProvider() bool