diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 82eb7ba9c..ac26a9114 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -157,7 +157,7 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) { ) if matches := util.FindNamedStringSubmatch(clusterStatusURL, req.URL.Path); matches != nil { - namespace, _ := matches["namespace"] + namespace := matches["namespace"] resp, err = s.controller.ClusterStatus(matches["team"], namespace, matches["cluster"]) } else if matches := util.FindNamedStringSubmatch(teamURL, req.URL.Path); matches != nil { teamClusters := s.controller.TeamClusterList() @@ -174,10 +174,10 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) { resp, err = clusterNames, nil } else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil { - namespace, _ := matches["namespace"] + namespace := matches["namespace"] resp, err = s.controller.ClusterLogs(matches["team"], namespace, matches["cluster"]) } else if matches := util.FindNamedStringSubmatch(clusterHistoryURL, req.URL.Path); matches != nil { - namespace, _ := matches["namespace"] + namespace := matches["namespace"] resp, err = s.controller.ClusterHistory(matches["team"], namespace, matches["cluster"]) } else if req.URL.Path == clustersURL { clusterNamesPerTeam := make(map[string][]string) @@ -194,8 +194,8 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) { s.respond(resp, err, w) } -func mustConvertToUint32(s string) uint32{ - result, err := strconv.Atoi(s); +func mustConvertToUint32(s string) uint32 { + result, err := strconv.Atoi(s) if err != nil { panic(fmt.Errorf("mustConvertToUint32 called for %s: %v", s, err)) } @@ -244,8 +244,6 @@ func (s *Server) databases(w http.ResponseWriter, req *http.Request) { databaseNamesPerCluster := s.controller.ClusterDatabasesMap() s.respond(databaseNamesPerCluster, nil, w) - return - } func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 9ac43e513..8414da8d1 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -450,8 +450,8 @@ func (c *Cluster) compareContainers(setA, setB *v1beta1.StatefulSet) (bool, []st return needsRollUpdate, reasons } -func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) (equal bool) { - equal = true +func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) bool { + equal := true if a != nil { equal = compareResoucesAssumeFirstNotNil(a, b) } @@ -459,7 +459,7 @@ func compareResources(a *v1.ResourceRequirements, b *v1.ResourceRequirements) (e equal = compareResoucesAssumeFirstNotNil(b, a) } - return + return equal } func compareResoucesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.ResourceRequirements) bool { @@ -786,7 +786,8 @@ func (c *Cluster) initInfrastructureRoles() error { } // resolves naming conflicts between existing and new roles by chosing either of them. -func (c *Cluster) resolveNameConflict(currentRole, newRole *spec.PgUser) (result spec.PgUser) { +func (c *Cluster) resolveNameConflict(currentRole, newRole *spec.PgUser) spec.PgUser { + var result spec.PgUser if newRole.Origin >= currentRole.Origin { result = *newRole } else { @@ -794,7 +795,7 @@ func (c *Cluster) resolveNameConflict(currentRole, newRole *spec.PgUser) (result } c.logger.Debugf("resolved a conflict of role %q between %s and %s to %s", newRole.Name, newRole.Origin, currentRole.Origin, result.Origin) - return + return result } func (c *Cluster) shouldAvoidProtectedOrSystemRole(username, purpose string) bool { @@ -838,8 +839,9 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus { } // Switchover does a switchover (via Patroni) to a candidate pod -func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) (err error) { +func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error { + var err error c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate) var wg sync.WaitGroup @@ -858,8 +860,8 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) ( select { case <-stopCh: - case podLabelErr <- func() (err error) { - _, err = c.waitForPodLabel(ch, stopCh, &role) + case podLabelErr <- func() (err2 error) { + _, err2 = c.waitForPodLabel(ch, stopCh, &role) return }(): } @@ -882,7 +884,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) ( // close the label waiting channel no sooner than the waiting goroutine terminates. close(podLabelErr) - return + return err } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index e34c81138..a71fa5e35 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -7,13 +7,13 @@ import ( "github.com/Sirupsen/logrus" + "k8s.io/api/apps/v1beta1" + "k8s.io/api/core/v1" + policybeta1 "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/api/core/v1" - "k8s.io/api/apps/v1beta1" - policybeta1 "k8s.io/api/policy/v1beta1" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -90,7 +90,7 @@ func (c *Cluster) makeDefaultResources() spec.Resources { defaultRequests := spec.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest} defaultLimits := spec.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit} - return spec.Resources{ResourceRequest:defaultRequests, ResourceLimits:defaultLimits} + return spec.Resources{ResourceRequest: defaultRequests, ResourceLimits: defaultLimits} } func generateResourceRequirements(resources spec.Resources, defaultResources spec.Resources) (*v1.ResourceRequirements, error) { @@ -366,7 +366,7 @@ func generateSidecarContainers(sidecars []spec.Sidecar, volumeMounts []v1.VolumeMount, defaultResources spec.Resources, superUserName string, credentialsSecretName string, logger *logrus.Entry) ([]v1.Container, error) { - if sidecars != nil && len(sidecars) > 0 { + if len(sidecars) > 0 { result := make([]v1.Container, 0) for index, sidecar := range sidecars { @@ -699,7 +699,7 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu // generate sidecar containers if sidecarContainers, err = generateSidecarContainers(sideCars, volumeMounts, defaultResources, c.OpConfig.SuperUsername, c.credentialSecretName(c.OpConfig.SuperUsername), c.logger); err != nil { - return nil, fmt.Errorf("could not generate sidecar containers: %v", err) + return nil, fmt.Errorf("could not generate sidecar containers: %v", err) } tolerationSpec := tolerations(&spec.Tolerations, c.OpConfig.PodToleration) @@ -716,7 +716,7 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu int64(c.OpConfig.PodTerminateGracePeriod.Seconds()), c.OpConfig.PodServiceAccountName, c.OpConfig.KubeIAMRole, - effectivePodPriorityClassName); err != nil{ + effectivePodPriorityClassName); err != nil { return nil, fmt.Errorf("could not generate pod template: %v", err) } @@ -726,7 +726,7 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu if volumeClaimTemplate, err = generatePersistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass); err != nil { - return nil, fmt.Errorf("could not generate volume claim template: %v", err) + return nil, fmt.Errorf("could not generate volume claim template: %v", err) } numberOfInstances := c.getNumberOfInstances(spec) @@ -804,11 +804,11 @@ func (c *Cluster) mergeSidecars(sidecars []spec.Sidecar) []spec.Sidecar { return result } -func (c *Cluster) getNumberOfInstances(spec *spec.PostgresSpec) (newcur int32) { +func (c *Cluster) getNumberOfInstances(spec *spec.PostgresSpec) int32 { min := c.OpConfig.MinInstances max := c.OpConfig.MaxInstances cur := spec.NumberOfInstances - newcur = cur + newcur := cur if max >= 0 && newcur > max { newcur = max @@ -820,7 +820,7 @@ func (c *Cluster) getNumberOfInstances(spec *spec.PostgresSpec) (newcur int32) { c.logger.Infof("adjusted number of instances from %d to %d (min: %d, max: %d)", cur, newcur, min, max) } - return + return newcur } func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string) (*v1.PersistentVolumeClaim, error) { @@ -860,8 +860,8 @@ func generatePersistentVolumeClaimTemplate(volumeSize, volumeStorageClass string return volumeClaim, nil } -func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) { - secrets = make(map[string]*v1.Secret, len(c.pgUsers)) +func (c *Cluster) generateUserSecrets() map[string]*v1.Secret { + secrets := make(map[string]*v1.Secret, len(c.pgUsers)) namespace := c.Namespace for username, pgUser := range c.pgUsers { //Skip users with no password i.e. human users (they'll be authenticated using pam) @@ -878,7 +878,7 @@ func (c *Cluster) generateUserSecrets() (secrets map[string]*v1.Secret) { } } - return + return secrets } func (c *Cluster) generateSingleUserSecret(namespace string, pgUser spec.PgUser) *v1.Secret { diff --git a/pkg/cluster/pg.go b/pkg/cluster/pg.go index f570ac81c..9441a4933 100644 --- a/pkg/cluster/pg.go +++ b/pkg/cluster/pg.go @@ -183,32 +183,30 @@ func (c *Cluster) getDatabases() (dbs map[string]string, err error) { dbs[datname] = owner } - return + return dbs, err } // executeCreateDatabase creates new database with the given owner. // The caller is responsible for openinging and closing the database connection. func (c *Cluster) executeCreateDatabase(datname, owner string) error { - if !c.databaseNameOwnerValid(datname, owner) { - return nil - } - c.logger.Infof("creating database %q with owner %q", datname, owner) - - if _, err := c.pgDb.Exec(fmt.Sprintf(createDatabaseSQL, datname, owner)); err != nil { - return fmt.Errorf("could not execute create database: %v", err) - } - return nil + return c.execCreateOrAlterDatabase(datname, owner, createDatabaseSQL, + "creating database", "create database") } // executeCreateDatabase changes the owner of the given database. // The caller is responsible for openinging and closing the database connection. func (c *Cluster) executeAlterDatabaseOwner(datname string, owner string) error { + return c.execCreateOrAlterDatabase(datname, owner, alterDatabaseOwnerSQL, + "changing owner for database", "alter database owner") +} + +func (c *Cluster) execCreateOrAlterDatabase(datname, owner, statement, doing, operation string) error { if !c.databaseNameOwnerValid(datname, owner) { return nil } - c.logger.Infof("changing database %q owner to %q", datname, owner) - if _, err := c.pgDb.Exec(fmt.Sprintf(alterDatabaseOwnerSQL, datname, owner)); err != nil { - return fmt.Errorf("could not execute alter database owner: %v", err) + c.logger.Infof("%s %q owner %q", doing, datname, owner) + if _, err := c.pgDb.Exec(fmt.Sprintf(statement, datname, owner)); err != nil { + return fmt.Errorf("could not execute %s: %v", operation, err) } return nil } diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index de6770faa..beb433fa0 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -183,8 +183,8 @@ func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) { func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { var ( masterCandidatePod *v1.Pod - err error - eol bool + err error + eol bool ) oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) @@ -212,7 +212,7 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { var sset *v1beta1.StatefulSet if sset, err = c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{}); err != nil { - return fmt.Errorf("could not retrieve cluster statefulset: %v", err) + return fmt.Errorf("could not retrieve cluster statefulset: %v", err) } c.Statefulset = sset } @@ -225,7 +225,6 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { c.logger.Warningf("single master pod for cluster %q, migration will cause longer downtime of the master instance", c.clusterName()) } - // there are two cases for each postgres cluster that has its master pod on the node to migrate from: // - the cluster has some replicas - migrate one of those if necessary and failover to it // - there are no replicas - just terminate the master and wait until it respawns diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index f15c4f40a..764dc22e5 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -633,27 +633,3 @@ func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet { func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { return c.PodDisruptionBudget } - -func (c *Cluster) createDatabases() error { - c.setProcessName("creating databases") - - if len(c.Spec.Databases) == 0 { - return nil - } - - if err := c.initDbConn(); err != nil { - return fmt.Errorf("could not init database connection") - } - defer func() { - if err := c.closeDbConn(); err != nil { - c.logger.Errorf("could not close database connection: %v", err) - } - }() - - for datname, owner := range c.Spec.Databases { - if err := c.executeCreateDatabase(datname, owner); err != nil { - return err - } - } - return nil -} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 3b5c3b3d3..ad42eeac5 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -2,11 +2,9 @@ package cluster import ( "fmt" - "reflect" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - policybeta1 "k8s.io/api/policy/v1beta1" "k8s.io/api/core/v1" + policybeta1 "k8s.io/api/policy/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -17,7 +15,8 @@ import ( // Sync syncs the cluster, making sure the actual Kubernetes objects correspond to what is defined in the manifest. // Unlike the update, sync does not error out if some objects do not exist and takes care of creating them. -func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { +func (c *Cluster) Sync(newSpec *spec.Postgresql) error { + var err error c.mu.Lock() defer c.mu.Unlock() @@ -34,7 +33,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { if err = c.initUsers(); err != nil { err = fmt.Errorf("could not init users: %v", err) - return + return err } c.logger.Debugf("syncing secrets") @@ -42,13 +41,13 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { //TODO: mind the secrets of the deleted/new users if err = c.syncSecrets(); err != nil { err = fmt.Errorf("could not sync secrets: %v", err) - return + return err } c.logger.Debugf("syncing services") if err = c.syncServices(); err != nil { err = fmt.Errorf("could not sync services: %v", err) - return + return err } // potentially enlarge volumes before changing the statefulset. By doing that @@ -60,14 +59,14 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { c.logger.Debugf("syncing persistent volumes") if err = c.syncVolumes(); err != nil { err = fmt.Errorf("could not sync persistent volumes: %v", err) - return + return err } c.logger.Debugf("syncing statefulsets") if err = c.syncStatefulSet(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { err = fmt.Errorf("could not sync statefulsets: %v", err) - return + return err } } @@ -76,22 +75,22 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { c.logger.Debugf("syncing roles") if err = c.syncRoles(); err != nil { err = fmt.Errorf("could not sync roles: %v", err) - return + return err } c.logger.Debugf("syncing databases") if err = c.syncDatabases(); err != nil { err = fmt.Errorf("could not sync databases: %v", err) - return + return err } } c.logger.Debug("syncing pod disruption budgets") if err = c.syncPodDisruptionBudget(false); err != nil { err = fmt.Errorf("could not sync pod disruption budget: %v", err) - return + return err } - return + return err } func (c *Cluster) syncServices() error { @@ -153,7 +152,7 @@ func (c *Cluster) syncService(role PostgresRole) error { func (c *Cluster) syncEndpoint(role PostgresRole) error { var ( - ep *v1.Endpoints + ep *v1.Endpoints err error ) c.setProcessName("syncing %s endpoint", role) @@ -187,8 +186,8 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { var ( - pdb *policybeta1.PodDisruptionBudget - err error + pdb *policybeta1.PodDisruptionBudget + err error ) if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { c.PodDisruptionBudget = pdb @@ -257,7 +256,9 @@ func (c *Cluster) syncStatefulSet() error { podsRollingUpdateRequired = (len(pods) > 0) if podsRollingUpdateRequired { c.logger.Warningf("found pods from the previous statefulset: trigger rolling update") - c.applyRollingUpdateFlagforStatefulSet(podsRollingUpdateRequired) + if err := c.applyRollingUpdateFlagforStatefulSet(podsRollingUpdateRequired); err != nil { + return fmt.Errorf("could not set rolling update flag for the statefulset: %v", err) + } } c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta)) @@ -318,7 +319,7 @@ func (c *Cluster) syncStatefulSet() error { // (like max_connections) has changed and if necessary sets it via the Patroni API func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error { var ( - err error + err error pods []v1.Pod ) @@ -394,7 +395,7 @@ func (c *Cluster) syncSecrets() error { // if this secret belongs to the infrastructure role and the password has changed - replace it in the secret if pwdUser.Password != string(secret.Data["password"]) && pwdUser.Origin == spec.RoleOriginInfrastructure { c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name) - if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil { + if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil { return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err) } } else { @@ -469,15 +470,6 @@ func (c *Cluster) syncVolumes() error { return nil } -func (c *Cluster) samePDBWith(pdb *policybeta1.PodDisruptionBudget) (match bool, reason string) { - match = reflect.DeepEqual(pdb.Spec, c.PodDisruptionBudget.Spec) - if !match { - reason = "new service spec doesn't match the current one" - } - - return -} - func (c *Cluster) syncDatabases() error { c.setProcessName("syncing databases") diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 484485b1b..e48375dfd 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -169,7 +169,7 @@ func (c *Controller) initPodServiceAccount() { func (c *Controller) initRoleBinding() { // service account on its own lacks any rights starting with k8s v1.8 - // operator binds it to the cluster role with sufficient priviliges + // operator binds it to the cluster role with sufficient privileges // we assume the role is created by the k8s administrator if c.opConfig.PodServiceAccountRoleBindingDefinition == "" { c.opConfig.PodServiceAccountRoleBindingDefinition = ` @@ -199,9 +199,9 @@ func (c *Controller) initRoleBinding() { switch { case err != nil: - panic(fmt.Errorf("Unable to parse the definiton of the role binding for the pod service account definiton from the operator config map: %v", err)) + panic(fmt.Errorf("Unable to parse the definition of the role binding for the pod service account definition from the operator config map: %v", err)) case groupVersionKind.Kind != "RoleBinding": - panic(fmt.Errorf("role binding definiton in the operator config map defines another type of resource: %v", groupVersionKind.Kind)) + panic(fmt.Errorf("role binding definition in the operator config map defines another type of resource: %v", groupVersionKind.Kind)) default: c.PodServiceAccountRoleBinding = obj.(*rbacv1beta1.RoleBinding) c.PodServiceAccountRoleBinding.Namespace = "" diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index c801f6b21..d3634ff27 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -40,19 +40,9 @@ func (c *Controller) dispatchPodEvent(clusterName spec.NamespacedName, event spe } func (c *Controller) podAdd(obj interface{}) { - pod, ok := obj.(*v1.Pod) - if !ok { - return + if pod, ok := obj.(*v1.Pod); ok { + c.preparePodEventForDispatch(pod, nil, spec.EventAdd) } - - podEvent := spec.PodEvent{ - PodName: util.NameFromMeta(pod.ObjectMeta), - CurPod: pod, - EventType: spec.EventAdd, - ResourceVersion: pod.ResourceVersion, - } - - c.dispatchPodEvent(c.podClusterName(pod), podEvent) } func (c *Controller) podUpdate(prev, cur interface{}) { @@ -66,29 +56,24 @@ func (c *Controller) podUpdate(prev, cur interface{}) { return } + c.preparePodEventForDispatch(curPod, prevPod, spec.EventUpdate) +} + +func (c *Controller) podDelete(obj interface{}) { + + if pod, ok := obj.(*v1.Pod); ok { + c.preparePodEventForDispatch(pod, nil, spec.EventDelete) + } +} + +func (c *Controller) preparePodEventForDispatch(curPod, prevPod *v1.Pod, event spec.EventType) { podEvent := spec.PodEvent{ PodName: util.NameFromMeta(curPod.ObjectMeta), - PrevPod: prevPod, CurPod: curPod, - EventType: spec.EventUpdate, + PrevPod: prevPod, + EventType: event, ResourceVersion: curPod.ResourceVersion, } c.dispatchPodEvent(c.podClusterName(curPod), podEvent) } - -func (c *Controller) podDelete(obj interface{}) { - pod, ok := obj.(*v1.Pod) - if !ok { - return - } - - podEvent := spec.PodEvent{ - PodName: util.NameFromMeta(pod.ObjectMeta), - CurPod: pod, - EventType: spec.EventDelete, - ResourceVersion: pod.ResourceVersion, - } - - c.dispatchPodEvent(c.podClusterName(pod), podEvent) -} diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index f4d56ad6d..58a7afcb2 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -10,6 +10,7 @@ import ( "time" "github.com/Sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -130,7 +131,9 @@ type crdDecoder struct { } func (d *crdDecoder) Close() { - d.close() + if err := d.close(); err != nil { + fmt.Printf("error when closing CRDDecorer: %v\n", err) + } } func (d *crdDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { @@ -526,7 +529,6 @@ func (c *Controller) submitRBACCredentials(event spec.ClusterEvent) error { if err := c.createRoleBindings(namespace); err != nil { return fmt.Errorf("could not create role binding %v : %v", c.PodServiceAccountRoleBinding.Name, err) } - c.namespacesWithDefinedRBAC.Store(namespace, true) return nil } diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index d2ce1efa2..b56b74d23 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -156,7 +156,9 @@ var ( // will not contain any private fields not-reachable to deepcopy. This should be ok, // since Error is never read from a Kubernetes object. func (p *Postgresql) Clone() *Postgresql { - if p == nil {return nil} + if p == nil { + return nil + } c := deepcopy.Copy(p).(*Postgresql) c.Error = nil return c @@ -166,11 +168,12 @@ func (p *Postgresql) DeepCopyInto(out *Postgresql) { if p != nil { *out = deepcopy.Copy(*p).(Postgresql) } - return } func (p *Postgresql) DeepCopy() *Postgresql { - if p == nil { return nil } + if p == nil { + return nil + } out := new(Postgresql) p.DeepCopyInto(out) return out @@ -308,7 +311,6 @@ func validateCloneClusterDescription(clone *CloneDescription) error { type postgresqlListCopy PostgresqlList type postgresqlCopy Postgresql - // UnmarshalJSON converts a JSON into the PostgreSQL object. func (p *Postgresql) UnmarshalJSON(data []byte) error { var tmp postgresqlCopy @@ -359,7 +361,9 @@ func (pl *PostgresqlList) UnmarshalJSON(data []byte) error { } func (pl *PostgresqlList) DeepCopy() *PostgresqlList { - if pl == nil { return nil } + if pl == nil { + return nil + } out := new(PostgresqlList) pl.DeepCopyInto(out) return out @@ -369,7 +373,6 @@ func (pl *PostgresqlList) DeepCopyInto(out *PostgresqlList) { if pl != nil { *out = deepcopy.Copy(*pl).(PostgresqlList) } - return } func (pl *PostgresqlList) DeepCopyObject() runtime.Object { @@ -379,7 +382,6 @@ func (pl *PostgresqlList) DeepCopyObject() runtime.Object { return nil } - func (status PostgresStatus) Success() bool { return status != ClusterStatusAddFailed && status != ClusterStatusUpdateFailed && diff --git a/pkg/util/config/crd_config.go b/pkg/util/config/crd_config.go index 10817f1e3..2a9090514 100644 --- a/pkg/util/config/crd_config.go +++ b/pkg/util/config/crd_config.go @@ -3,11 +3,11 @@ package config import ( "encoding/json" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/zalando-incubator/postgres-operator/pkg/spec" - "github.com/mohae/deepcopy" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + + "github.com/mohae/deepcopy" ) type OperatorConfiguration struct { @@ -159,11 +159,12 @@ func (opc *OperatorConfiguration) DeepCopyInto(out *OperatorConfiguration) { if opc != nil { *out = deepcopy.Copy(*opc).(OperatorConfiguration) } - return } func (opc *OperatorConfiguration) DeepCopy() *OperatorConfiguration { - if opc == nil { return nil } + if opc == nil { + return nil + } out := new(OperatorConfiguration) opc.DeepCopyInto(out) return out @@ -189,11 +190,12 @@ func (opcl *OperatorConfigurationList) DeepCopyInto(out *OperatorConfigurationLi if opcl != nil { *out = deepcopy.Copy(*opcl).(OperatorConfigurationList) } - return } func (opcl *OperatorConfigurationList) DeepCopy() *OperatorConfigurationList { - if opcl == nil { return nil } + if opcl == nil { + return nil + } out := new(OperatorConfigurationList) opcl.DeepCopyInto(out) return out @@ -205,4 +207,3 @@ func (opcl *OperatorConfigurationList) DeepCopyObject() runtime.Object { } return nil } - diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index f2de75ed3..a16fd2dfb 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -47,7 +47,7 @@ func apiURL(masterPod *v1.Pod) string { return fmt.Sprintf("http://%s:%d", masterPod.Status.PodIP, apiPort) } -func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) error { +func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) (err error) { request, err := http.NewRequest(method, url, body) if err != nil { return fmt.Errorf("could not create request: %v", err) @@ -59,7 +59,16 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) if err != nil { return fmt.Errorf("could not make request: %v", err) } - defer resp.Body.Close() + defer func() { + if err2 := resp.Body.Close(); err2 != nil { + if err != nil { + err = fmt.Errorf("could not close request: %v, prior error: %v", err2, err) + } else { + err = fmt.Errorf("could not close request: %v", err2) + } + return + } + }() if resp.StatusCode != http.StatusOK { bodyBytes, err := ioutil.ReadAll(resp.Body) @@ -83,6 +92,7 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error { } //TODO: add an option call /patroni to check if it is necessary to restart the server + //SetPostgresParameters sets Postgres options via Patroni patch API call. func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error { buf := &bytes.Buffer{} diff --git a/pkg/util/teams/teams.go b/pkg/util/teams/teams.go index 8645871ba..2223d2063 100644 --- a/pkg/util/teams/teams.go +++ b/pkg/util/teams/teams.go @@ -76,17 +76,15 @@ func (t *API) TeamInfo(teamID, token string) (tm *Team, err error) { t.logger.Debugf("request url: %s", url) req, err = http.NewRequest("GET", url, nil) if err != nil { - return + return nil, err } req.Header.Add("Authorization", "Bearer "+token) - resp, err = t.httpClient.Do(req) - if err != nil { - return + if resp, err = t.httpClient.Do(req); err != nil { + return nil, err } defer func() { - closeErr := resp.Body.Close() - if closeErr != nil { + if closeErr := resp.Body.Close(); closeErr != nil { err = fmt.Errorf("error when closing response: %v", closeErr) } }() @@ -95,27 +93,20 @@ func (t *API) TeamInfo(teamID, token string) (tm *Team, err error) { d := json.NewDecoder(resp.Body) err = d.Decode(&raw) if err != nil { - err = fmt.Errorf("team API query failed with status code %d and malformed response: %v", resp.StatusCode, err) - return + return nil, fmt.Errorf("team API query failed with status code %d and malformed response: %v", resp.StatusCode, err) } if errMessage, ok := raw["error"]; ok { - err = fmt.Errorf("team API query failed with status code %d and message: '%v'", resp.StatusCode, string(errMessage)) - return + return nil, fmt.Errorf("team API query failed with status code %d and message: '%v'", resp.StatusCode, string(errMessage)) } - err = fmt.Errorf("team API query failed with status code %d", resp.StatusCode) - - return + return nil, fmt.Errorf("team API query failed with status code %d", resp.StatusCode) } tm = &Team{} d := json.NewDecoder(resp.Body) - err = d.Decode(tm) - if err != nil { - err = fmt.Errorf("could not parse team API response: %v", err) - tm = nil - return + if err = d.Decode(tm); err != nil { + return nil, fmt.Errorf("could not parse team API response: %v", err) } - return + return tm, nil } diff --git a/pkg/util/users/users.go b/pkg/util/users/users.go index 9d435fd53..cd76c621d 100644 --- a/pkg/util/users/users.go +++ b/pkg/util/users/users.go @@ -30,8 +30,9 @@ type DefaultUserSyncStrategy struct { // ProduceSyncRequests figures out the types of changes that need to happen with the given users. func (strategy DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserMap, - newUsers spec.PgUserMap) (reqs []spec.PgSyncUserRequest) { + newUsers spec.PgUserMap) []spec.PgSyncUserRequest { + var reqs []spec.PgSyncUserRequest // No existing roles are deleted or stripped of role memebership/flags for name, newUser := range newUsers { dbUser, exists := dbUsers[name] @@ -66,7 +67,7 @@ func (strategy DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserM } } - return + return reqs } // ExecuteSyncRequests makes actual database changes from the requests passed in its arguments. @@ -102,7 +103,7 @@ func (strategy DefaultUserSyncStrategy) alterPgUserSet(user spec.PgUser, db *sql return } -func (strategy DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) (err error) { +func (strategy DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.DB) error { var userFlags []string var userPassword string @@ -120,16 +121,14 @@ func (strategy DefaultUserSyncStrategy) createPgUser(user spec.PgUser, db *sql.D } query := fmt.Sprintf(createUserSQL, user.Name, strings.Join(userFlags, " "), userPassword) - _, err = db.Exec(query) // TODO: Try several times - if err != nil { - err = fmt.Errorf("dB error: %v, query: %s", err, query) - return + if _, err := db.Exec(query); err != nil { // TODO: Try several times + return fmt.Errorf("dB error: %v, query: %s", err, query) } - return + return nil } -func (strategy DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) (err error) { +func (strategy DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB) error { var resultStmt []string if user.Password != "" || len(user.Flags) > 0 { @@ -140,19 +139,16 @@ func (strategy DefaultUserSyncStrategy) alterPgUser(user spec.PgUser, db *sql.DB grantStmt := produceGrantStmt(user) resultStmt = append(resultStmt, grantStmt) } - if len(resultStmt) == 0 { - return nil + + if len(resultStmt) > 0 { + query := fmt.Sprintf(doBlockStmt, strings.Join(resultStmt, ";")) + + if _, err := db.Exec(query); err != nil { // TODO: Try several times + return fmt.Errorf("dB error: %v query %s", err, query) + } } - query := fmt.Sprintf(doBlockStmt, strings.Join(resultStmt, ";")) - - _, err = db.Exec(query) // TODO: Try several times - if err != nil { - err = fmt.Errorf("dB error: %v query %s", err, query) - return - } - - return + return nil } func produceAlterStmt(user spec.PgUser) string { @@ -205,7 +201,7 @@ func quoteParameterValue(name, val string) string { // containing spaces (but something more complex, like double quotes inside double quotes or spaces // in the schema name would break the parsing code in the operator.) if start == '\'' && end == '\'' { - return fmt.Sprintf("%s", val[1:len(val)-1]) + return val[1 : len(val)-1] } return val