From ac7b13231466556266e730dcabda5b9d0ea8c965 Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Fri, 3 Aug 2018 11:09:45 +0200 Subject: [PATCH] Refactoring inspired by gometalinter. (#357) Among other things, fix a few issues with deepcopy implementation. --- pkg/apiserver/apiserver.go | 66 +++++---- pkg/cluster/cluster.go | 2 +- pkg/cluster/k8sres.go | 54 ++++---- pkg/cluster/pg.go | 17 ++- pkg/cluster/pod.go | 55 ++++---- pkg/cluster/resources.go | 4 +- pkg/cluster/sync.go | 216 ++++++++++++++++-------------- pkg/cluster/util.go | 4 +- pkg/controller/controller.go | 4 +- pkg/controller/postgresql_test.go | 4 +- pkg/controller/util.go | 32 ++--- pkg/spec/postgresql.go | 64 ++++----- pkg/spec/types.go | 1 - pkg/util/config/crd_config.go | 58 ++++---- pkg/util/patroni/patroni.go | 4 +- 15 files changed, 312 insertions(+), 273 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 1dceefd52..82eb7ba9c 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -92,12 +92,14 @@ func New(controller controllerInformer, port int, logger *logrus.Logger) *Server // Run starts the HTTP server func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { + + var err error + defer wg.Done() go func() { - err := s.http.ListenAndServe() - if err != http.ErrServerClosed { - s.logger.Fatalf("Could not start http server: %v", err) + if err2 := s.http.ListenAndServe(); err2 != http.ErrServerClosed { + s.logger.Fatalf("Could not start http server: %v", err2) } }() s.logger.Infof("listening on %s", s.http.Addr) @@ -106,21 +108,27 @@ func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() - err := s.http.Shutdown(ctx) + if err = s.http.Shutdown(ctx); err == nil { + s.logger.Infoln("Http server shut down") + return + } if err == context.DeadlineExceeded { s.logger.Warningf("Shutdown timeout exceeded. closing http server") - s.http.Close() - } else if err != nil { - s.logger.Errorf("Could not shutdown http server: %v", err) + if err = s.http.Close(); err != nil { + s.logger.Errorf("could not close http connection: %v", err) + } + return } - s.logger.Infoln("Http server shut down") + s.logger.Errorf("Could not shutdown http server: %v", err) } func (s *Server) respond(obj interface{}, err error, w http.ResponseWriter) { w.Header().Set("Content-Type", "application/json") if err != nil { w.WriteHeader(http.StatusInternalServerError) - json.NewEncoder(w).Encode(map[string]interface{}{"error": err.Error()}) + if err2 := json.NewEncoder(w).Encode(map[string]interface{}{"error": err.Error()}); err2 != nil { + s.logger.Errorf("could not encode error response %q: %v", err, err2) + } return } @@ -186,6 +194,14 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) { s.respond(resp, err, w) } +func mustConvertToUint32(s string) uint32{ + result, err := strconv.Atoi(s); + if err != nil { + panic(fmt.Errorf("mustConvertToUint32 called for %s: %v", s, err)) + } + return uint32(result) +} + func (s *Server) workers(w http.ResponseWriter, req *http.Request) { var ( resp interface{} @@ -195,30 +211,30 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) { if workerAllQueue.MatchString(req.URL.Path) { s.allQueues(w, req) return - } else if matches := util.FindNamedStringSubmatch(workerLogsURL, req.URL.Path); matches != nil { - workerID, _ := strconv.Atoi(matches["id"]) + } + if workerAllStatus.MatchString(req.URL.Path) { + s.allWorkers(w, req) + return + } + + err = fmt.Errorf("page not found") + + if matches := util.FindNamedStringSubmatch(workerLogsURL, req.URL.Path); matches != nil { + workerID := mustConvertToUint32(matches["id"]) + resp, err = s.controller.WorkerLogs(workerID) - resp, err = s.controller.WorkerLogs(uint32(workerID)) } else if matches := util.FindNamedStringSubmatch(workerEventsQueueURL, req.URL.Path); matches != nil { - workerID, _ := strconv.Atoi(matches["id"]) + workerID := mustConvertToUint32(matches["id"]) + resp, err = s.controller.ListQueue(workerID) - resp, err = s.controller.ListQueue(uint32(workerID)) } else if matches := util.FindNamedStringSubmatch(workerStatusURL, req.URL.Path); matches != nil { var workerStatus *spec.WorkerStatus - workerID, _ := strconv.Atoi(matches["id"]) - workerStatus, err = s.controller.WorkerStatus(uint32(workerID)) - if workerStatus == nil { - resp = "idle" - } else { + workerID := mustConvertToUint32(matches["id"]) + resp = "idle" + if workerStatus, err = s.controller.WorkerStatus(workerID); workerStatus != nil { resp = workerStatus } - } else if workerAllStatus.MatchString(req.URL.Path) { - s.allWorkers(w, req) - return - } else { - s.respond(nil, fmt.Errorf("page not found"), w) - return } s.respond(resp, err, w) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index e15f5a63f..c2af3f9c8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -284,7 +284,7 @@ func (c *Cluster) Create() error { } c.logger.Infof("pods are ready") - // create database objects unless we are running without pods or disabled that feature explicitely + // create database objects unless we are running without pods or disabled that feature explicitly if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0) { if err = c.createRoles(); err != nil { return fmt.Errorf("could not create users: %v", err) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 26308cd30..917be763f 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -88,7 +88,7 @@ func (c *Cluster) makeDefaultResources() spec.Resources { defaultRequests := spec.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest} defaultLimits := spec.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit} - return spec.Resources{defaultRequests, defaultLimits} + return spec.Resources{ResourceRequest:defaultRequests, ResourceLimits:defaultLimits} } func generateResourceRequirements(resources spec.Resources, defaultResources spec.Resources) (*v1.ResourceRequirements, error) { @@ -537,10 +537,10 @@ func deduplicateEnvVars(input []v1.EnvVar, containerName string, logger *logrus. for i, va := range input { if names[va.Name] == 0 { - names[va.Name] += 1 + names[va.Name]++ result = append(result, input[i]) } else if names[va.Name] == 1 { - names[va.Name] += 1 + names[va.Name]++ logger.Warningf("variable %q is defined in %q more than once, the subsequent definitions are ignored", va.Name, containerName) } @@ -626,6 +626,12 @@ func makeResources(cpuRequest, memoryRequest, cpuLimit, memoryLimit string) spec func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.StatefulSet, error) { + var ( + err error + sidecarContainers []v1.Container + podTemplate *v1.PodTemplateSpec + volumeClaimTemplate *v1.PersistentVolumeClaim + ) defaultResources := c.makeDefaultResources() resourceRequirements, err := generateResourceRequirements(spec.Resources, defaultResources) @@ -633,21 +639,19 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu return nil, fmt.Errorf("could not generate resource requirements: %v", err) } - if err != nil { - return nil, fmt.Errorf("could not generate Scalyr sidecar resource requirements: %v", err) - } customPodEnvVarsList := make([]v1.EnvVar, 0) if c.OpConfig.PodEnvironmentConfigMap != "" { - if cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(c.OpConfig.PodEnvironmentConfigMap, metav1.GetOptions{}); err != nil { + var cm *v1.ConfigMap + cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(c.OpConfig.PodEnvironmentConfigMap, metav1.GetOptions{}) + if err != nil { return nil, fmt.Errorf("could not read PodEnvironmentConfigMap: %v", err) - } else { - for k, v := range cm.Data { - customPodEnvVarsList = append(customPodEnvVarsList, v1.EnvVar{Name: k, Value: v}) - } - sort.Slice(customPodEnvVarsList, - func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name }) } + for k, v := range cm.Data { + customPodEnvVarsList = append(customPodEnvVarsList, v1.EnvVar{Name: k, Value: v}) + } + sort.Slice(customPodEnvVarsList, + func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name }) } spiloConfiguration := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, c.OpConfig.PamRoleName, c.logger) @@ -686,16 +690,15 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu } // generate sidecar containers - sidecarContainers, err := generateSidecarContainers(sideCars, volumeMounts, defaultResources, - c.OpConfig.SuperUsername, c.credentialSecretName(c.OpConfig.SuperUsername), c.logger) - if err != nil { - return nil, fmt.Errorf("could not generate sidecar containers: %v", err) + if sidecarContainers, err = generateSidecarContainers(sideCars, volumeMounts, defaultResources, + c.OpConfig.SuperUsername, c.credentialSecretName(c.OpConfig.SuperUsername), c.logger); err != nil { + return nil, fmt.Errorf("could not generate sidecar containers: %v", err) } tolerationSpec := tolerations(&spec.Tolerations, c.OpConfig.PodToleration) // generate pod template for the statefulset, based on the spilo container and sidecards - podTemplate, err := generatePodTemplate( + if podTemplate, err = generatePodTemplate( c.Namespace, c.labelsSet(true), spiloContainer, @@ -704,14 +707,13 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu nodeAffinity(c.OpConfig.NodeReadinessLabel), int64(c.OpConfig.PodTerminateGracePeriod.Seconds()), c.OpConfig.PodServiceAccountName, - c.OpConfig.KubeIAMRole) - - if err != nil { - return nil, fmt.Errorf("could not generate pod template: %v", err) + c.OpConfig.KubeIAMRole); err != nil { + return nil, fmt.Errorf("could not generate pod template: %v", err) } - volumeClaimTemplate, err := generatePersistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass) - if err != nil { - return nil, fmt.Errorf("could not generate volume claim template: %v", err) + + if volumeClaimTemplate, err = generatePersistentVolumeClaimTemplate(spec.Volume.Size, + spec.Volume.StorageClass); err != nil { + return nil, fmt.Errorf("could not generate volume claim template: %v", err) } numberOfInstances := c.getNumberOfInstances(spec) @@ -1033,7 +1035,7 @@ func (c *Cluster) generateCloneEnvironment(description *spec.CloneDescription) [ result = append(result, v1.EnvVar{Name: "CLONE_METHOD", Value: "CLONE_WITH_WALE"}) result = append(result, v1.EnvVar{Name: "CLONE_WAL_S3_BUCKET", Value: c.OpConfig.WALES3Bucket}) result = append(result, v1.EnvVar{Name: "CLONE_TARGET_TIME", Value: description.EndTimestamp}) - result = append(result, v1.EnvVar{Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX", Value: getBucketScopeSuffix(description.Uid)}) + result = append(result, v1.EnvVar{Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX", Value: getBucketScopeSuffix(description.UID)}) result = append(result, v1.EnvVar{Name: "CLONE_WAL_BUCKET_SCOPE_PREFIX", Value: ""}) } diff --git a/pkg/cluster/pg.go b/pkg/cluster/pg.go index 09c2e16c1..f570ac81c 100644 --- a/pkg/cluster/pg.go +++ b/pkg/cluster/pg.go @@ -153,12 +153,10 @@ func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUser // getDatabases returns the map of current databases with owners // The caller is responsible for opening and closing the database connection -func (c *Cluster) getDatabases() (map[string]string, error) { +func (c *Cluster) getDatabases() (dbs map[string]string, err error) { var ( rows *sql.Rows - err error ) - dbs := make(map[string]string) if rows, err = c.pgDb.Query(getDatabasesSQL); err != nil { return nil, fmt.Errorf("could not query database: %v", err) @@ -166,21 +164,26 @@ func (c *Cluster) getDatabases() (map[string]string, error) { defer func() { if err2 := rows.Close(); err2 != nil { - err = fmt.Errorf("error when closing query cursor: %v", err2) + if err != nil { + err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err) + } else { + err = fmt.Errorf("error when closing query cursor: %v", err2) + } } }() + dbs = make(map[string]string) + for rows.Next() { var datname, owner string - err := rows.Scan(&datname, &owner) - if err != nil { + if err = rows.Scan(&datname, &owner); err != nil { return nil, fmt.Errorf("error when processing row: %v", err) } dbs[datname] = owner } - return dbs, err + return } // executeCreateDatabase creates new database with the given owner. diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index b0b73315d..8d15e6a9a 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -9,6 +9,7 @@ import ( "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" + "k8s.io/api/apps/v1beta1" ) func (c *Cluster) listPods() ([]v1.Pod, error) { @@ -182,6 +183,8 @@ func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) { func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { var ( masterCandidatePod *v1.Pod + err error + eol bool ) oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) @@ -192,9 +195,10 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { c.logger.Infof("migrating master pod %q", podName) - if eol, err := c.podIsEndOfLife(oldMaster); err != nil { + if eol, err = c.podIsEndOfLife(oldMaster); err != nil { return fmt.Errorf("could not get node %q: %v", oldMaster.Spec.NodeName, err) - } else if !eol { + } + if !eol { c.logger.Debugf("pod is already on a live node") return nil } @@ -205,41 +209,44 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error { } // we must have a statefulset in the cluster for the migration to work if c.Statefulset == nil { - sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("could not retrieve cluster statefulset: %v", err) + var sset *v1beta1.StatefulSet + if sset, err = c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), + metav1.GetOptions{}); err != nil { + return fmt.Errorf("could not retrieve cluster statefulset: %v", err) } c.Statefulset = sset } // We may not have a cached statefulset if the initial cluster sync has aborted, revert to the spec in that case. - if *c.Statefulset.Spec.Replicas == 1 { - c.logger.Warningf("single master pod for cluster %q, migration will cause longer downtime of the master instance", c.clusterName()) - } else { - masterCandidatePod, err = c.masterCandidate(oldMaster.Spec.NodeName) - if err != nil { + if *c.Statefulset.Spec.Replicas > 1 { + if masterCandidatePod, err = c.masterCandidate(oldMaster.Spec.NodeName); err != nil { return fmt.Errorf("could not get new master candidate: %v", err) } + } else { + c.logger.Warningf("single master pod for cluster %q, migration will cause longer downtime of the master instance", c.clusterName()) } + // there are two cases for each postgres cluster that has its master pod on the node to migrate from: // - the cluster has some replicas - migrate one of those if necessary and failover to it // - there are no replicas - just terminate the master and wait until it respawns // in both cases the result is the new master up and running on a new node. - if masterCandidatePod != nil { - pod, err := c.movePodFromEndOfLifeNode(masterCandidatePod) - if err != nil { - return fmt.Errorf("could not move pod: %v", err) - } - masterCandidateName := util.NameFromMeta(pod.ObjectMeta) - if err := c.Switchover(oldMaster, masterCandidateName); err != nil { - return fmt.Errorf("could not failover to pod %q: %v", masterCandidateName, err) - } - } else { + if masterCandidatePod == nil { if _, err = c.movePodFromEndOfLifeNode(oldMaster); err != nil { return fmt.Errorf("could not move pod: %v", err) } + return nil } + + if masterCandidatePod, err = c.movePodFromEndOfLifeNode(masterCandidatePod); err != nil { + return fmt.Errorf("could not move pod: %v", err) + } + + masterCandidateName := util.NameFromMeta(masterCandidatePod.ObjectMeta) + if err := c.Switchover(oldMaster, masterCandidateName); err != nil { + return fmt.Errorf("could not failover to pod %q: %v", masterCandidateName, err) + } + return nil } @@ -281,12 +288,12 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { if err := c.waitForPodDeletion(ch); err != nil { return nil, err } - if pod, err := c.waitForPodLabel(ch, stopChan, nil); err != nil { + pod, err := c.waitForPodLabel(ch, stopChan, nil) + if err != nil { return nil, err - } else { - c.logger.Infof("pod %q has been recreated", podName) - return pod, nil } + c.logger.Infof("pod %q has been recreated", podName) + return pod, nil } func (c *Cluster) recreatePods() error { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index c965d65d5..6aab9d3fc 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -168,7 +168,7 @@ func (c *Cluster) getRollingUpdateFlagFromStatefulSet(sset *v1beta1.StatefulSet, if flag, err = strconv.ParseBool(stringFlag); err != nil { c.logger.Warnf("error when parsing %q annotation for the statefulset %q: expected boolean value, got %q\n", RollingUpdateStatefulsetAnnotationKey, - types.NamespacedName{sset.Namespace, sset.Name}, + types.NamespacedName{Namespace: sset.Namespace, Name: sset.Name}, stringFlag) flag = defaultValue } @@ -491,7 +491,7 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset if len(endPointAddresses) > 0 { result = append(result, v1.EndpointSubset{ Addresses: endPointAddresses, - Ports: []v1.EndpointPort{{"postgresql", 5432, "TCP"}}, + Ports: []v1.EndpointPort{{Name: "postgresql", Port: 5432, Protocol: "TCP"}}, }) } else if role == Master { c.logger.Warningf("master is not running, generated master endpoint does not contain any addresses") diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index d56dd9a9a..d487695a5 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -6,6 +6,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" policybeta1 "k8s.io/api/policy/v1beta1" + "k8s.io/api/policy/v1beta1" + "k8s.io/api/core/v1" "github.com/zalando-incubator/postgres-operator/pkg/spec" "github.com/zalando-incubator/postgres-operator/pkg/util" @@ -70,7 +72,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { } } - // create database objects unless we are running without pods or disabled that feature explicitely + // create database objects unless we are running without pods or disabled that feature explicitly if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0) { c.logger.Debugf("syncing roles") if err = c.syncRoles(); err != nil { @@ -110,118 +112,119 @@ func (c *Cluster) syncServices() error { } func (c *Cluster) syncService(role PostgresRole) error { + var ( + svc *v1.Service + err error + ) c.setProcessName("syncing %s service", role) - svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}) - if err == nil { + if svc, err = c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}); err == nil { c.Services[role] = svc desiredSvc := c.generateService(role, &c.Spec) - match, reason := k8sutil.SameService(svc, desiredSvc) - if match { - return nil + if match, reason := k8sutil.SameService(svc, desiredSvc); !match { + c.logServiceChanges(role, svc, desiredSvc, false, reason) + if err = c.updateService(role, desiredSvc); err != nil { + return fmt.Errorf("could not update %s service to match desired state: %v", role, err) + } + c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) } - c.logServiceChanges(role, svc, desiredSvc, false, reason) - - if err := c.updateService(role, desiredSvc); err != nil { - return fmt.Errorf("could not update %s service to match desired state: %v", role, err) - } - c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta)) - return nil - } else if !k8sutil.ResourceNotFound(err) { + } + if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s service: %v", role, err) } + // no existing service, create new one c.Services[role] = nil - c.logger.Infof("could not find the cluster's %s service", role) - if svc, err := c.createService(role); err != nil { - if k8sutil.ResourceAlreadyExists(err) { - c.logger.Infof("%s service %q already exists", role, util.NameFromMeta(svc.ObjectMeta)) - svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}) - if err == nil { - c.Services[role] = svc - } else { - c.logger.Infof("could not fetch existing %s service: %v", role, err) - } - } else { + if svc, err = c.createService(role); err == nil { + c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) + } else { + if !k8sutil.ResourceAlreadyExists(err) { return fmt.Errorf("could not create missing %s service: %v", role, err) } - } else { - c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta)) - c.Services[role] = svc + c.logger.Infof("%s service %q already exists", role, util.NameFromMeta(svc.ObjectMeta)) + if svc, err = c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}); err != nil { + return fmt.Errorf("could not fetch existing %s service: %v", role, err) + } } - + c.Services[role] = svc return nil } func (c *Cluster) syncEndpoint(role PostgresRole) error { + var ( + ep *v1.Endpoints + err error + ) c.setProcessName("syncing %s endpoint", role) - ep, err := c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}) - if err == nil { - + if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}); err == nil { + // TODO: No syncing of endpoints here, is this covered completely by updateService? c.Endpoints[role] = ep return nil - } else if !k8sutil.ResourceNotFound(err) { + } + if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s endpoint: %v", role, err) } + // no existing endpoint, create new one c.Endpoints[role] = nil - c.logger.Infof("could not find the cluster's %s endpoint", role) - if ep, err := c.createEndpoint(role); err != nil { - if k8sutil.ResourceAlreadyExists(err) { - c.logger.Infof("%s endpoint %q already exists", role, util.NameFromMeta(ep.ObjectMeta)) - ep, err := c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}) - if err == nil { - c.Endpoints[role] = ep - } else { - c.logger.Infof("could not fetch existing %s endpoint: %v", role, err) - } - } else { + if ep, err = c.createEndpoint(role); err == nil { + c.logger.Infof("created missing %s endpoint %q", role, util.NameFromMeta(ep.ObjectMeta)) + } else { + if !k8sutil.ResourceAlreadyExists(err) { return fmt.Errorf("could not create missing %s endpoint: %v", role, err) } - } else { - c.logger.Infof("created missing %s endpoint %q", role, util.NameFromMeta(ep.ObjectMeta)) - c.Endpoints[role] = ep + c.logger.Infof("%s endpoint %q already exists", role, util.NameFromMeta(ep.ObjectMeta)) + if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}); err != nil { + return fmt.Errorf("could not fetch existing %s endpoint: %v", role, err) + } } - + c.Endpoints[role] = ep return nil } func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { - pdb, err := c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}) - if err == nil { + var ( + pdb *v1beta1.PodDisruptionBudget + err error + ) + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { c.PodDisruptionBudget = pdb newPDB := c.generatePodDisruptionBudget() if match, reason := k8sutil.SamePDB(pdb, newPDB); !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) - if err := c.updatePodDisruptionBudget(newPDB); err != nil { + if err = c.updatePodDisruptionBudget(newPDB); err != nil { return err } } else { c.PodDisruptionBudget = pdb } - return nil - } else if !k8sutil.ResourceNotFound(err) { + + } + if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get pod disruption budget: %v", err) } + // no existing pod disruption budget, create new one c.PodDisruptionBudget = nil - c.logger.Infof("could not find the cluster's pod disruption budget") + if pdb, err = c.createPodDisruptionBudget(); err != nil { - if k8sutil.ResourceAlreadyExists(err) { - c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) - } else { + if !k8sutil.ResourceAlreadyExists(err) { return fmt.Errorf("could not create pod disruption budget: %v", err) } - } else { - c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) - c.PodDisruptionBudget = pdb + c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) + } } + c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) + c.PodDisruptionBudget = pdb + return nil } @@ -315,6 +318,11 @@ func (c *Cluster) syncStatefulSet() error { // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters // (like max_connections) has changed and if necessary sets it via the Patroni API func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error { + var ( + err error + pods []v1.Pod + ) + // we need to extract those options from the cluster manifest. optionsToSet := make(map[string]string) pgOptions := c.Spec.Parameters @@ -325,47 +333,55 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error { } } - if len(optionsToSet) > 0 { - pods, err := c.listPods() - if err != nil { - return err - } - if len(pods) == 0 { - return fmt.Errorf("could not call Patroni API: cluster has no pods") - } - for _, pod := range pods { - podName := util.NameFromMeta(pod.ObjectMeta) - c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v", - podName, optionsToSet) - if err := c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil { - return nil - } else { - c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err) - } - } - return fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)", - len(pods)) + if len(optionsToSet) == 0 { + return nil } - return nil + + if pods, err = c.listPods(); err != nil { + return err + } + if len(pods) == 0 { + return fmt.Errorf("could not call Patroni API: cluster has no pods") + } + // try all pods until the first one that is successful, as it doesn't matter which pod + // carries the request to change configuration through + for _, pod := range pods { + podName := util.NameFromMeta(pod.ObjectMeta) + c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v", + podName, optionsToSet) + if err = c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil { + return nil + } + c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err) + } + return fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)", + len(pods)) } func (c *Cluster) syncSecrets() error { + var ( + err error + secret *v1.Secret + ) c.setProcessName("syncing secrets") secrets := c.generateUserSecrets() for secretUsername, secretSpec := range secrets { - secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec) + if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec); err == nil { + c.Secrets[secret.UID] = secret + c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) + continue + } if k8sutil.ResourceAlreadyExists(err) { var userMap map[string]spec.PgUser - curSecret, err2 := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}) - if err2 != nil { - return fmt.Errorf("could not get current secret: %v", err2) + if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}); err != nil { + return fmt.Errorf("could not get current secret: %v", err) } - if secretUsername != string(curSecret.Data["username"]) { + if secretUsername != string(secret.Data["username"]) { c.logger.Warningf("secret %q does not contain the role %q", secretSpec.Name, secretUsername) continue } - c.logger.Debugf("secret %q already exists, fetching its password", util.NameFromMeta(curSecret.ObjectMeta)) + c.logger.Debugf("secret %q already exists, fetching its password", util.NameFromMeta(secret.ObjectMeta)) if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name { secretUsername = constants.SuperuserKeyName userMap = c.systemUsers @@ -377,35 +393,28 @@ func (c *Cluster) syncSecrets() error { } pwdUser := userMap[secretUsername] // if this secret belongs to the infrastructure role and the password has changed - replace it in the secret - if pwdUser.Password != string(curSecret.Data["password"]) && pwdUser.Origin == spec.RoleOriginInfrastructure { + if pwdUser.Password != string(secret.Data["password"]) && pwdUser.Origin == spec.RoleOriginInfrastructure { c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name) - if _, err := c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil { + if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil { return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err) } } else { // for non-infrastructure role - update the role with the password from the secret - pwdUser.Password = string(curSecret.Data["password"]) + pwdUser.Password = string(secret.Data["password"]) userMap[secretUsername] = pwdUser } - - continue } else { - if err != nil { - return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) - } - c.Secrets[secret.UID] = secret - c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID) + return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err) } } return nil } -func (c *Cluster) syncRoles() error { +func (c *Cluster) syncRoles() (err error) { c.setProcessName("syncing roles") var ( - err error dbUsers spec.PgUserMap userNames []string ) @@ -414,9 +423,14 @@ func (c *Cluster) syncRoles() error { if err != nil { return fmt.Errorf("could not init db connection: %v", err) } + defer func() { - if err := c.closeDbConn(); err != nil { - c.logger.Errorf("could not close db connection: %v", err) + if err2 := c.closeDbConn(); err2 != nil { + if err == nil { + err = fmt.Errorf("could not close database connection: %v", err2) + } else { + err = fmt.Errorf("could not close database connection: %v (prior error: %v)", err2, err) + } } }() diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 8f2c6c389..3d474e01c 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -372,7 +372,7 @@ func (c *Cluster) waitStatefulsetPodsReady() error { } // Returns labels used to create or list k8s objects such as pods -// For backward compatability, shouldAddExtraLabels must be false +// For backward compatibility, shouldAddExtraLabels must be false // when listing k8s objects. See operator PR #252 func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set { lbls := make(map[string]string) @@ -390,7 +390,7 @@ func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set { } func (c *Cluster) labelsSelector() *metav1.LabelSelector { - return &metav1.LabelSelector{c.labelsSet(false), nil} + return &metav1.LabelSelector{MatchLabels: c.labelsSet(false), MatchExpressions: nil} } func (c *Cluster) roleLabelsSet(role PostgresRole) labels.Set { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index df2fa9d04..01a8e60da 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -151,9 +151,9 @@ func (c *Controller) initPodServiceAccount() { switch { case err != nil: - panic(fmt.Errorf("Unable to parse pod service account definiton from the operator config map: %v", err)) + panic(fmt.Errorf("Unable to parse pod service account definition from the operator config map: %v", err)) case groupVersionKind.Kind != "ServiceAccount": - panic(fmt.Errorf("pod service account definiton in the operator config map defines another type of resource: %v", groupVersionKind.Kind)) + panic(fmt.Errorf("pod service account definition in the operator config map defines another type of resource: %v", groupVersionKind.Kind)) default: c.PodServiceAccount = obj.(*v1.ServiceAccount) if c.PodServiceAccount.Name != c.opConfig.PodServiceAccountName { diff --git a/pkg/controller/postgresql_test.go b/pkg/controller/postgresql_test.go index 7fa7d842f..d5d5669af 100644 --- a/pkg/controller/postgresql_test.go +++ b/pkg/controller/postgresql_test.go @@ -7,8 +7,8 @@ import ( ) var ( - True bool = true - False bool = false + True = true + False = false ) func TestMergeDeprecatedPostgreSQLSpecParameters(t *testing.T) { diff --git a/pkg/controller/util.go b/pkg/controller/util.go index fceea3ba8..4479fe718 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -114,7 +114,7 @@ func readDecodedRole(s string) (*spec.PgUser, error) { return &result, nil } -func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (result map[string]spec.PgUser, err error) { +func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (map[string]spec.PgUser, error) { if *rolesSecret == (spec.NamespacedName{}) { // we don't have infrastructure roles defined, bail out return nil, nil @@ -129,7 +129,7 @@ func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (r } secretData := infraRolesSecret.Data - result = make(map[string]spec.PgUser) + result := make(map[string]spec.PgUser) Users: // in worst case we would have one line per user for i := 1; i <= len(secretData); i++ { @@ -171,22 +171,22 @@ Users: if infraRolesMap, err := c.KubeClient.ConfigMaps(rolesSecret.Namespace).Get(rolesSecret.Name, metav1.GetOptions{}); err == nil { // we have a configmap with username - json description, let's read and decode it for role, s := range infraRolesMap.Data { - if roleDescr, err := readDecodedRole(s); err != nil { + roleDescr, err := readDecodedRole(s) + if err != nil { return nil, fmt.Errorf("could not decode role description: %v", err) - } else { - // check if we have a a password in a configmap - c.logger.Debugf("found role description for role %q: %+v", role, roleDescr) - if passwd, ok := secretData[role]; ok { - roleDescr.Password = string(passwd) - delete(secretData, role) - } else { - c.logger.Warningf("infrastructure role %q has no password defined and is ignored", role) - continue - } - roleDescr.Name = role - roleDescr.Origin = spec.RoleOriginInfrastructure - result[role] = *roleDescr } + // check if we have a a password in a configmap + c.logger.Debugf("found role description for role %q: %+v", role, roleDescr) + if passwd, ok := secretData[role]; ok { + roleDescr.Password = string(passwd) + delete(secretData, role) + } else { + c.logger.Warningf("infrastructure role %q has no password defined and is ignored", role) + continue + } + roleDescr.Name = role + roleDescr.Origin = spec.RoleOriginInfrastructure + result[role] = *roleDescr } } diff --git a/pkg/spec/postgresql.go b/pkg/spec/postgresql.go index a164da75a..61dbf6f8c 100644 --- a/pkg/spec/postgresql.go +++ b/pkg/spec/postgresql.go @@ -58,7 +58,7 @@ type Patroni struct { // CloneDescription describes which cluster the new should clone and up to which point in time type CloneDescription struct { ClusterName string `json:"cluster,omitempty"` - Uid string `json:"uid,omitempty"` + UID string `json:"uid,omitempty"` EndTimestamp string `json:"timestamp,omitempty"` } @@ -119,7 +119,7 @@ type PostgresSpec struct { EnableMasterLoadBalancer *bool `json:"enableMasterLoadBalancer,omitempty"` EnableReplicaLoadBalancer *bool `json:"enableReplicaLoadBalancer,omitempty"` - // deprecated load balancer settings mantained for backward compatibility + // deprecated load balancer settings maintained for backward compatibility // see "Load balancers" operator docs UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"` ReplicaLoadBalancer *bool `json:"replicaLoadBalancer,omitempty"` @@ -161,22 +161,22 @@ func (p *Postgresql) Clone() *Postgresql { return c } -func (in *Postgresql) DeepCopyInto(out *Postgresql) { - if in != nil { - out = deepcopy.Copy(in).(*Postgresql) +func (p *Postgresql) DeepCopyInto(out *Postgresql) { + if p != nil { + *out = deepcopy.Copy(*p).(Postgresql) } return } -func (in *Postgresql) DeepCopy() *Postgresql { - if in == nil { return nil } +func (p *Postgresql) DeepCopy() *Postgresql { + if p == nil { return nil } out := new(Postgresql) - in.DeepCopyInto(out) + p.DeepCopyInto(out) return out } -func (in *Postgresql) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { +func (p *Postgresql) DeepCopyObject() runtime.Object { + if c := p.DeepCopy(); c != nil { return c } return nil @@ -309,28 +309,6 @@ type postgresqlListCopy PostgresqlList type postgresqlCopy Postgresql -func (in *PostgresqlList) DeepCopy() *PostgresqlList { - if in == nil { return nil } - out := new(PostgresqlList) - in.DeepCopyInto(out) - return out -} - -func (in *PostgresqlList) DeepCopyInto(out *PostgresqlList) { - if in != nil { - out = deepcopy.Copy(in).(*PostgresqlList) - } - return -} - -func (in *PostgresqlList) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { - return c - } - return nil -} - - // UnmarshalJSON converts a JSON into the PostgreSQL object. func (p *Postgresql) UnmarshalJSON(data []byte) error { var tmp postgresqlCopy @@ -380,6 +358,28 @@ func (pl *PostgresqlList) UnmarshalJSON(data []byte) error { return nil } +func (pl *PostgresqlList) DeepCopy() *PostgresqlList { + if pl == nil { return nil } + out := new(PostgresqlList) + pl.DeepCopyInto(out) + return out +} + +func (pl *PostgresqlList) DeepCopyInto(out *PostgresqlList) { + if pl != nil { + *out = deepcopy.Copy(*pl).(PostgresqlList) + } + return +} + +func (pl *PostgresqlList) DeepCopyObject() runtime.Object { + if c := pl.DeepCopy(); c != nil { + return c + } + return nil +} + + func (status PostgresStatus) Success() bool { return status != ClusterStatusAddFailed && status != ClusterStatusUpdateFailed && diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 37d01fc9a..fc0dfe237 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -278,5 +278,4 @@ func (d *Duration) UnmarshalJSON(b []byte) error { default: return fmt.Errorf("could not recognize type %T as a valid type to unmarshal to Duration", val) } - return nil } diff --git a/pkg/util/config/crd_config.go b/pkg/util/config/crd_config.go index 465c27637..b8b2e13c3 100644 --- a/pkg/util/config/crd_config.go +++ b/pkg/util/config/crd_config.go @@ -154,6 +154,27 @@ func (opc *OperatorConfiguration) UnmarshalJSON(data []byte) error { return nil } +func (opc *OperatorConfiguration) DeepCopyInto(out *OperatorConfiguration) { + if opc != nil { + *out = deepcopy.Copy(*opc).(OperatorConfiguration) + } + return +} + +func (opc *OperatorConfiguration) DeepCopy() *OperatorConfiguration { + if opc == nil { return nil } + out := new(OperatorConfiguration) + opc.DeepCopyInto(out) + return out +} + +func (opc *OperatorConfiguration) DeepCopyObject() runtime.Object { + if c := opc.DeepCopy(); c != nil { + return c + } + return nil +} + func (opcl *OperatorConfigurationList) UnmarshalJSON(data []byte) error { var ref OperatorConfigurationListCopy if err := json.Unmarshal(data, &ref); err != nil { @@ -163,43 +184,22 @@ func (opcl *OperatorConfigurationList) UnmarshalJSON(data []byte) error { return nil } -func (in *OperatorConfiguration) DeepCopyInto(out *OperatorConfiguration) { - if in != nil { - out = deepcopy.Copy(in).(*OperatorConfiguration) +func (opcl *OperatorConfigurationList) DeepCopyInto(out *OperatorConfigurationList) { + if opcl != nil { + *out = deepcopy.Copy(*opcl).(OperatorConfigurationList) } return } -func (in *OperatorConfiguration) DeepCopy() *OperatorConfiguration { - if in == nil { return nil } - out := new(OperatorConfiguration) - in.DeepCopyInto(out) - return out -} - -func (in *OperatorConfiguration) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { - return c - } - return nil -} - -func (in *OperatorConfigurationList) DeepCopyInto(out *OperatorConfigurationList) { - if in != nil { - out = deepcopy.Copy(in).(*OperatorConfigurationList) - } - return -} - -func (in *OperatorConfigurationList) DeepCopy() *OperatorConfigurationList { - if in == nil { return nil } +func (opcl *OperatorConfigurationList) DeepCopy() *OperatorConfigurationList { + if opcl == nil { return nil } out := new(OperatorConfigurationList) - in.DeepCopyInto(out) + opcl.DeepCopyInto(out) return out } -func (in *OperatorConfigurationList) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { +func (opcl *OperatorConfigurationList) DeepCopyObject() runtime.Object { + if c := opcl.DeepCopy(); c != nil { return c } return nil diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index 28011c858..f2de75ed3 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -80,12 +80,10 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error { return fmt.Errorf("could not encode json: %v", err) } return p.httpPostOrPatch(http.MethodPost, apiURL(master)+failoverPath, buf) - - return nil } //TODO: add an option call /patroni to check if it is necessary to restart the server -// SetPostgresParameters sets Postgres options via Patroni patch API call. +//SetPostgresParameters sets Postgres options via Patroni patch API call. func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error { buf := &bytes.Buffer{} err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"postgresql": {"parameters": parameters}})