Refactoring inspired by gometalinter. (#357)

Among other things, fix a few issues with deepcopy implementation.
This commit is contained in:
Oleksii Kliukin 2018-08-03 11:09:45 +02:00 committed by GitHub
parent d0f4148cd3
commit ac7b132314
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 312 additions and 273 deletions

View File

@ -92,12 +92,14 @@ func New(controller controllerInformer, port int, logger *logrus.Logger) *Server
// Run starts the HTTP server
func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
var err error
defer wg.Done()
go func() {
err := s.http.ListenAndServe()
if err != http.ErrServerClosed {
s.logger.Fatalf("Could not start http server: %v", err)
if err2 := s.http.ListenAndServe(); err2 != http.ErrServerClosed {
s.logger.Fatalf("Could not start http server: %v", err2)
}
}()
s.logger.Infof("listening on %s", s.http.Addr)
@ -106,21 +108,27 @@ func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
err := s.http.Shutdown(ctx)
if err = s.http.Shutdown(ctx); err == nil {
s.logger.Infoln("Http server shut down")
return
}
if err == context.DeadlineExceeded {
s.logger.Warningf("Shutdown timeout exceeded. closing http server")
s.http.Close()
} else if err != nil {
s.logger.Errorf("Could not shutdown http server: %v", err)
if err = s.http.Close(); err != nil {
s.logger.Errorf("could not close http connection: %v", err)
}
s.logger.Infoln("Http server shut down")
return
}
s.logger.Errorf("Could not shutdown http server: %v", err)
}
func (s *Server) respond(obj interface{}, err error, w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]interface{}{"error": err.Error()})
if err2 := json.NewEncoder(w).Encode(map[string]interface{}{"error": err.Error()}); err2 != nil {
s.logger.Errorf("could not encode error response %q: %v", err, err2)
}
return
}
@ -186,6 +194,14 @@ func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
s.respond(resp, err, w)
}
func mustConvertToUint32(s string) uint32{
result, err := strconv.Atoi(s);
if err != nil {
panic(fmt.Errorf("mustConvertToUint32 called for %s: %v", s, err))
}
return uint32(result)
}
func (s *Server) workers(w http.ResponseWriter, req *http.Request) {
var (
resp interface{}
@ -195,30 +211,30 @@ func (s *Server) workers(w http.ResponseWriter, req *http.Request) {
if workerAllQueue.MatchString(req.URL.Path) {
s.allQueues(w, req)
return
} else if matches := util.FindNamedStringSubmatch(workerLogsURL, req.URL.Path); matches != nil {
workerID, _ := strconv.Atoi(matches["id"])
}
if workerAllStatus.MatchString(req.URL.Path) {
s.allWorkers(w, req)
return
}
err = fmt.Errorf("page not found")
if matches := util.FindNamedStringSubmatch(workerLogsURL, req.URL.Path); matches != nil {
workerID := mustConvertToUint32(matches["id"])
resp, err = s.controller.WorkerLogs(workerID)
resp, err = s.controller.WorkerLogs(uint32(workerID))
} else if matches := util.FindNamedStringSubmatch(workerEventsQueueURL, req.URL.Path); matches != nil {
workerID, _ := strconv.Atoi(matches["id"])
workerID := mustConvertToUint32(matches["id"])
resp, err = s.controller.ListQueue(workerID)
resp, err = s.controller.ListQueue(uint32(workerID))
} else if matches := util.FindNamedStringSubmatch(workerStatusURL, req.URL.Path); matches != nil {
var workerStatus *spec.WorkerStatus
workerID, _ := strconv.Atoi(matches["id"])
workerStatus, err = s.controller.WorkerStatus(uint32(workerID))
if workerStatus == nil {
workerID := mustConvertToUint32(matches["id"])
resp = "idle"
} else {
if workerStatus, err = s.controller.WorkerStatus(workerID); workerStatus != nil {
resp = workerStatus
}
} else if workerAllStatus.MatchString(req.URL.Path) {
s.allWorkers(w, req)
return
} else {
s.respond(nil, fmt.Errorf("page not found"), w)
return
}
s.respond(resp, err, w)

View File

@ -284,7 +284,7 @@ func (c *Cluster) Create() error {
}
c.logger.Infof("pods are ready")
// create database objects unless we are running without pods or disabled that feature explicitely
// create database objects unless we are running without pods or disabled that feature explicitly
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0) {
if err = c.createRoles(); err != nil {
return fmt.Errorf("could not create users: %v", err)

View File

@ -88,7 +88,7 @@ func (c *Cluster) makeDefaultResources() spec.Resources {
defaultRequests := spec.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest}
defaultLimits := spec.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit}
return spec.Resources{defaultRequests, defaultLimits}
return spec.Resources{ResourceRequest:defaultRequests, ResourceLimits:defaultLimits}
}
func generateResourceRequirements(resources spec.Resources, defaultResources spec.Resources) (*v1.ResourceRequirements, error) {
@ -537,10 +537,10 @@ func deduplicateEnvVars(input []v1.EnvVar, containerName string, logger *logrus.
for i, va := range input {
if names[va.Name] == 0 {
names[va.Name] += 1
names[va.Name]++
result = append(result, input[i])
} else if names[va.Name] == 1 {
names[va.Name] += 1
names[va.Name]++
logger.Warningf("variable %q is defined in %q more than once, the subsequent definitions are ignored",
va.Name, containerName)
}
@ -626,6 +626,12 @@ func makeResources(cpuRequest, memoryRequest, cpuLimit, memoryLimit string) spec
func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.StatefulSet, error) {
var (
err error
sidecarContainers []v1.Container
podTemplate *v1.PodTemplateSpec
volumeClaimTemplate *v1.PersistentVolumeClaim
)
defaultResources := c.makeDefaultResources()
resourceRequirements, err := generateResourceRequirements(spec.Resources, defaultResources)
@ -633,22 +639,20 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu
return nil, fmt.Errorf("could not generate resource requirements: %v", err)
}
if err != nil {
return nil, fmt.Errorf("could not generate Scalyr sidecar resource requirements: %v", err)
}
customPodEnvVarsList := make([]v1.EnvVar, 0)
if c.OpConfig.PodEnvironmentConfigMap != "" {
if cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(c.OpConfig.PodEnvironmentConfigMap, metav1.GetOptions{}); err != nil {
var cm *v1.ConfigMap
cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(c.OpConfig.PodEnvironmentConfigMap, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("could not read PodEnvironmentConfigMap: %v", err)
} else {
}
for k, v := range cm.Data {
customPodEnvVarsList = append(customPodEnvVarsList, v1.EnvVar{Name: k, Value: v})
}
sort.Slice(customPodEnvVarsList,
func(i, j int) bool { return customPodEnvVarsList[i].Name < customPodEnvVarsList[j].Name })
}
}
spiloConfiguration := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, c.OpConfig.PamRoleName, c.logger)
@ -686,16 +690,15 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu
}
// generate sidecar containers
sidecarContainers, err := generateSidecarContainers(sideCars, volumeMounts, defaultResources,
c.OpConfig.SuperUsername, c.credentialSecretName(c.OpConfig.SuperUsername), c.logger)
if err != nil {
if sidecarContainers, err = generateSidecarContainers(sideCars, volumeMounts, defaultResources,
c.OpConfig.SuperUsername, c.credentialSecretName(c.OpConfig.SuperUsername), c.logger); err != nil {
return nil, fmt.Errorf("could not generate sidecar containers: %v", err)
}
tolerationSpec := tolerations(&spec.Tolerations, c.OpConfig.PodToleration)
// generate pod template for the statefulset, based on the spilo container and sidecards
podTemplate, err := generatePodTemplate(
if podTemplate, err = generatePodTemplate(
c.Namespace,
c.labelsSet(true),
spiloContainer,
@ -704,13 +707,12 @@ func (c *Cluster) generateStatefulSet(spec *spec.PostgresSpec) (*v1beta1.Statefu
nodeAffinity(c.OpConfig.NodeReadinessLabel),
int64(c.OpConfig.PodTerminateGracePeriod.Seconds()),
c.OpConfig.PodServiceAccountName,
c.OpConfig.KubeIAMRole)
if err != nil {
c.OpConfig.KubeIAMRole); err != nil {
return nil, fmt.Errorf("could not generate pod template: %v", err)
}
volumeClaimTemplate, err := generatePersistentVolumeClaimTemplate(spec.Volume.Size, spec.Volume.StorageClass)
if err != nil {
if volumeClaimTemplate, err = generatePersistentVolumeClaimTemplate(spec.Volume.Size,
spec.Volume.StorageClass); err != nil {
return nil, fmt.Errorf("could not generate volume claim template: %v", err)
}
@ -1033,7 +1035,7 @@ func (c *Cluster) generateCloneEnvironment(description *spec.CloneDescription) [
result = append(result, v1.EnvVar{Name: "CLONE_METHOD", Value: "CLONE_WITH_WALE"})
result = append(result, v1.EnvVar{Name: "CLONE_WAL_S3_BUCKET", Value: c.OpConfig.WALES3Bucket})
result = append(result, v1.EnvVar{Name: "CLONE_TARGET_TIME", Value: description.EndTimestamp})
result = append(result, v1.EnvVar{Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX", Value: getBucketScopeSuffix(description.Uid)})
result = append(result, v1.EnvVar{Name: "CLONE_WAL_BUCKET_SCOPE_SUFFIX", Value: getBucketScopeSuffix(description.UID)})
result = append(result, v1.EnvVar{Name: "CLONE_WAL_BUCKET_SCOPE_PREFIX", Value: ""})
}

View File

@ -153,12 +153,10 @@ func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUser
// getDatabases returns the map of current databases with owners
// The caller is responsible for opening and closing the database connection
func (c *Cluster) getDatabases() (map[string]string, error) {
func (c *Cluster) getDatabases() (dbs map[string]string, err error) {
var (
rows *sql.Rows
err error
)
dbs := make(map[string]string)
if rows, err = c.pgDb.Query(getDatabasesSQL); err != nil {
return nil, fmt.Errorf("could not query database: %v", err)
@ -166,21 +164,26 @@ func (c *Cluster) getDatabases() (map[string]string, error) {
defer func() {
if err2 := rows.Close(); err2 != nil {
if err != nil {
err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err)
} else {
err = fmt.Errorf("error when closing query cursor: %v", err2)
}
}
}()
dbs = make(map[string]string)
for rows.Next() {
var datname, owner string
err := rows.Scan(&datname, &owner)
if err != nil {
if err = rows.Scan(&datname, &owner); err != nil {
return nil, fmt.Errorf("error when processing row: %v", err)
}
dbs[datname] = owner
}
return dbs, err
return
}
// executeCreateDatabase creates new database with the given owner.

View File

@ -9,6 +9,7 @@ import (
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util"
"k8s.io/api/apps/v1beta1"
)
func (c *Cluster) listPods() ([]v1.Pod, error) {
@ -182,6 +183,8 @@ func (c *Cluster) masterCandidate(oldNodeName string) (*v1.Pod, error) {
func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
var (
masterCandidatePod *v1.Pod
err error
eol bool
)
oldMaster, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{})
@ -192,9 +195,10 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
c.logger.Infof("migrating master pod %q", podName)
if eol, err := c.podIsEndOfLife(oldMaster); err != nil {
if eol, err = c.podIsEndOfLife(oldMaster); err != nil {
return fmt.Errorf("could not get node %q: %v", oldMaster.Spec.NodeName, err)
} else if !eol {
}
if !eol {
c.logger.Debugf("pod is already on a live node")
return nil
}
@ -205,41 +209,44 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
}
// we must have a statefulset in the cluster for the migration to work
if c.Statefulset == nil {
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{})
if err != nil {
var sset *v1beta1.StatefulSet
if sset, err = c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(),
metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not retrieve cluster statefulset: %v", err)
}
c.Statefulset = sset
}
// We may not have a cached statefulset if the initial cluster sync has aborted, revert to the spec in that case.
if *c.Statefulset.Spec.Replicas == 1 {
c.logger.Warningf("single master pod for cluster %q, migration will cause longer downtime of the master instance", c.clusterName())
} else {
masterCandidatePod, err = c.masterCandidate(oldMaster.Spec.NodeName)
if err != nil {
if *c.Statefulset.Spec.Replicas > 1 {
if masterCandidatePod, err = c.masterCandidate(oldMaster.Spec.NodeName); err != nil {
return fmt.Errorf("could not get new master candidate: %v", err)
}
} else {
c.logger.Warningf("single master pod for cluster %q, migration will cause longer downtime of the master instance", c.clusterName())
}
// there are two cases for each postgres cluster that has its master pod on the node to migrate from:
// - the cluster has some replicas - migrate one of those if necessary and failover to it
// - there are no replicas - just terminate the master and wait until it respawns
// in both cases the result is the new master up and running on a new node.
if masterCandidatePod != nil {
pod, err := c.movePodFromEndOfLifeNode(masterCandidatePod)
if err != nil {
return fmt.Errorf("could not move pod: %v", err)
}
masterCandidateName := util.NameFromMeta(pod.ObjectMeta)
if err := c.Switchover(oldMaster, masterCandidateName); err != nil {
return fmt.Errorf("could not failover to pod %q: %v", masterCandidateName, err)
}
} else {
if masterCandidatePod == nil {
if _, err = c.movePodFromEndOfLifeNode(oldMaster); err != nil {
return fmt.Errorf("could not move pod: %v", err)
}
return nil
}
if masterCandidatePod, err = c.movePodFromEndOfLifeNode(masterCandidatePod); err != nil {
return fmt.Errorf("could not move pod: %v", err)
}
masterCandidateName := util.NameFromMeta(masterCandidatePod.ObjectMeta)
if err := c.Switchover(oldMaster, masterCandidateName); err != nil {
return fmt.Errorf("could not failover to pod %q: %v", masterCandidateName, err)
}
return nil
}
@ -281,12 +288,12 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
if err := c.waitForPodDeletion(ch); err != nil {
return nil, err
}
if pod, err := c.waitForPodLabel(ch, stopChan, nil); err != nil {
pod, err := c.waitForPodLabel(ch, stopChan, nil)
if err != nil {
return nil, err
} else {
}
c.logger.Infof("pod %q has been recreated", podName)
return pod, nil
}
}
func (c *Cluster) recreatePods() error {

View File

@ -168,7 +168,7 @@ func (c *Cluster) getRollingUpdateFlagFromStatefulSet(sset *v1beta1.StatefulSet,
if flag, err = strconv.ParseBool(stringFlag); err != nil {
c.logger.Warnf("error when parsing %q annotation for the statefulset %q: expected boolean value, got %q\n",
RollingUpdateStatefulsetAnnotationKey,
types.NamespacedName{sset.Namespace, sset.Name},
types.NamespacedName{Namespace: sset.Namespace, Name: sset.Name},
stringFlag)
flag = defaultValue
}
@ -491,7 +491,7 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset
if len(endPointAddresses) > 0 {
result = append(result, v1.EndpointSubset{
Addresses: endPointAddresses,
Ports: []v1.EndpointPort{{"postgresql", 5432, "TCP"}},
Ports: []v1.EndpointPort{{Name: "postgresql", Port: 5432, Protocol: "TCP"}},
})
} else if role == Master {
c.logger.Warningf("master is not running, generated master endpoint does not contain any addresses")

View File

@ -6,6 +6,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
policybeta1 "k8s.io/api/policy/v1beta1"
"k8s.io/api/policy/v1beta1"
"k8s.io/api/core/v1"
"github.com/zalando-incubator/postgres-operator/pkg/spec"
"github.com/zalando-incubator/postgres-operator/pkg/util"
@ -70,7 +72,7 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
}
}
// create database objects unless we are running without pods or disabled that feature explicitely
// create database objects unless we are running without pods or disabled that feature explicitly
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0) {
c.logger.Debugf("syncing roles")
if err = c.syncRoles(); err != nil {
@ -110,117 +112,118 @@ func (c *Cluster) syncServices() error {
}
func (c *Cluster) syncService(role PostgresRole) error {
var (
svc *v1.Service
err error
)
c.setProcessName("syncing %s service", role)
svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{})
if err == nil {
if svc, err = c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}); err == nil {
c.Services[role] = svc
desiredSvc := c.generateService(role, &c.Spec)
match, reason := k8sutil.SameService(svc, desiredSvc)
if match {
return nil
}
if match, reason := k8sutil.SameService(svc, desiredSvc); !match {
c.logServiceChanges(role, svc, desiredSvc, false, reason)
if err := c.updateService(role, desiredSvc); err != nil {
if err = c.updateService(role, desiredSvc); err != nil {
return fmt.Errorf("could not update %s service to match desired state: %v", role, err)
}
c.logger.Infof("%s service %q is in the desired state now", role, util.NameFromMeta(desiredSvc.ObjectMeta))
}
return nil
} else if !k8sutil.ResourceNotFound(err) {
}
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get %s service: %v", role, err)
}
// no existing service, create new one
c.Services[role] = nil
c.logger.Infof("could not find the cluster's %s service", role)
if svc, err := c.createService(role); err != nil {
if k8sutil.ResourceAlreadyExists(err) {
c.logger.Infof("%s service %q already exists", role, util.NameFromMeta(svc.ObjectMeta))
svc, err := c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{})
if err == nil {
c.Services[role] = svc
} else {
c.logger.Infof("could not fetch existing %s service: %v", role, err)
}
if svc, err = c.createService(role); err == nil {
c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta))
} else {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create missing %s service: %v", role, err)
}
} else {
c.logger.Infof("created missing %s service %q", role, util.NameFromMeta(svc.ObjectMeta))
c.Services[role] = svc
c.logger.Infof("%s service %q already exists", role, util.NameFromMeta(svc.ObjectMeta))
if svc, err = c.KubeClient.Services(c.Namespace).Get(c.serviceName(role), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %s service: %v", role, err)
}
}
c.Services[role] = svc
return nil
}
func (c *Cluster) syncEndpoint(role PostgresRole) error {
var (
ep *v1.Endpoints
err error
)
c.setProcessName("syncing %s endpoint", role)
ep, err := c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{})
if err == nil {
if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}); err == nil {
// TODO: No syncing of endpoints here, is this covered completely by updateService?
c.Endpoints[role] = ep
return nil
} else if !k8sutil.ResourceNotFound(err) {
}
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get %s endpoint: %v", role, err)
}
// no existing endpoint, create new one
c.Endpoints[role] = nil
c.logger.Infof("could not find the cluster's %s endpoint", role)
if ep, err := c.createEndpoint(role); err != nil {
if k8sutil.ResourceAlreadyExists(err) {
c.logger.Infof("%s endpoint %q already exists", role, util.NameFromMeta(ep.ObjectMeta))
ep, err := c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{})
if err == nil {
c.Endpoints[role] = ep
} else {
c.logger.Infof("could not fetch existing %s endpoint: %v", role, err)
}
if ep, err = c.createEndpoint(role); err == nil {
c.logger.Infof("created missing %s endpoint %q", role, util.NameFromMeta(ep.ObjectMeta))
} else {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create missing %s endpoint: %v", role, err)
}
} else {
c.logger.Infof("created missing %s endpoint %q", role, util.NameFromMeta(ep.ObjectMeta))
c.Endpoints[role] = ep
c.logger.Infof("%s endpoint %q already exists", role, util.NameFromMeta(ep.ObjectMeta))
if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(c.endpointName(role), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %s endpoint: %v", role, err)
}
}
c.Endpoints[role] = ep
return nil
}
func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
pdb, err := c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{})
if err == nil {
var (
pdb *v1beta1.PodDisruptionBudget
err error
)
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil {
c.PodDisruptionBudget = pdb
newPDB := c.generatePodDisruptionBudget()
if match, reason := k8sutil.SamePDB(pdb, newPDB); !match {
c.logPDBChanges(pdb, newPDB, isUpdate, reason)
if err := c.updatePodDisruptionBudget(newPDB); err != nil {
if err = c.updatePodDisruptionBudget(newPDB); err != nil {
return err
}
} else {
c.PodDisruptionBudget = pdb
}
return nil
} else if !k8sutil.ResourceNotFound(err) {
}
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get pod disruption budget: %v", err)
}
// no existing pod disruption budget, create new one
c.PodDisruptionBudget = nil
c.logger.Infof("could not find the cluster's pod disruption budget")
if pdb, err = c.createPodDisruptionBudget(); err != nil {
if k8sutil.ResourceAlreadyExists(err) {
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
} else {
if !k8sutil.ResourceAlreadyExists(err) {
return fmt.Errorf("could not create pod disruption budget: %v", err)
}
} else {
c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta))
if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta))
}
}
c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta))
c.PodDisruptionBudget = pdb
}
return nil
}
@ -315,6 +318,11 @@ func (c *Cluster) syncStatefulSet() error {
// checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
// (like max_connections) has changed and if necessary sets it via the Patroni API
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error {
var (
err error
pods []v1.Pod
)
// we need to extract those options from the cluster manifest.
optionsToSet := make(map[string]string)
pgOptions := c.Spec.Parameters
@ -325,47 +333,55 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error {
}
}
if len(optionsToSet) > 0 {
pods, err := c.listPods()
if err != nil {
if len(optionsToSet) == 0 {
return nil
}
if pods, err = c.listPods(); err != nil {
return err
}
if len(pods) == 0 {
return fmt.Errorf("could not call Patroni API: cluster has no pods")
}
// try all pods until the first one that is successful, as it doesn't matter which pod
// carries the request to change configuration through
for _, pod := range pods {
podName := util.NameFromMeta(pod.ObjectMeta)
c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v",
podName, optionsToSet)
if err := c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil {
if err = c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil {
return nil
} else {
c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err)
}
c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err)
}
return fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)",
len(pods))
}
return nil
}
func (c *Cluster) syncSecrets() error {
var (
err error
secret *v1.Secret
)
c.setProcessName("syncing secrets")
secrets := c.generateUserSecrets()
for secretUsername, secretSpec := range secrets {
secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec)
if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec); err == nil {
c.Secrets[secret.UID] = secret
c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID)
continue
}
if k8sutil.ResourceAlreadyExists(err) {
var userMap map[string]spec.PgUser
curSecret, err2 := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{})
if err2 != nil {
return fmt.Errorf("could not get current secret: %v", err2)
if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not get current secret: %v", err)
}
if secretUsername != string(curSecret.Data["username"]) {
if secretUsername != string(secret.Data["username"]) {
c.logger.Warningf("secret %q does not contain the role %q", secretSpec.Name, secretUsername)
continue
}
c.logger.Debugf("secret %q already exists, fetching its password", util.NameFromMeta(curSecret.ObjectMeta))
c.logger.Debugf("secret %q already exists, fetching its password", util.NameFromMeta(secret.ObjectMeta))
if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name {
secretUsername = constants.SuperuserKeyName
userMap = c.systemUsers
@ -377,35 +393,28 @@ func (c *Cluster) syncSecrets() error {
}
pwdUser := userMap[secretUsername]
// if this secret belongs to the infrastructure role and the password has changed - replace it in the secret
if pwdUser.Password != string(curSecret.Data["password"]) && pwdUser.Origin == spec.RoleOriginInfrastructure {
if pwdUser.Password != string(secret.Data["password"]) && pwdUser.Origin == spec.RoleOriginInfrastructure {
c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name)
if _, err := c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil {
if secret, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil {
return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err)
}
} else {
// for non-infrastructure role - update the role with the password from the secret
pwdUser.Password = string(curSecret.Data["password"])
pwdUser.Password = string(secret.Data["password"])
userMap[secretUsername] = pwdUser
}
continue
} else {
if err != nil {
return fmt.Errorf("could not create secret for user %q: %v", secretUsername, err)
}
c.Secrets[secret.UID] = secret
c.logger.Debugf("created new secret %q, uid: %q", util.NameFromMeta(secret.ObjectMeta), secret.UID)
}
}
return nil
}
func (c *Cluster) syncRoles() error {
func (c *Cluster) syncRoles() (err error) {
c.setProcessName("syncing roles")
var (
err error
dbUsers spec.PgUserMap
userNames []string
)
@ -414,9 +423,14 @@ func (c *Cluster) syncRoles() error {
if err != nil {
return fmt.Errorf("could not init db connection: %v", err)
}
defer func() {
if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close db connection: %v", err)
if err2 := c.closeDbConn(); err2 != nil {
if err == nil {
err = fmt.Errorf("could not close database connection: %v", err2)
} else {
err = fmt.Errorf("could not close database connection: %v (prior error: %v)", err2, err)
}
}
}()

View File

@ -372,7 +372,7 @@ func (c *Cluster) waitStatefulsetPodsReady() error {
}
// Returns labels used to create or list k8s objects such as pods
// For backward compatability, shouldAddExtraLabels must be false
// For backward compatibility, shouldAddExtraLabels must be false
// when listing k8s objects. See operator PR #252
func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set {
lbls := make(map[string]string)
@ -390,7 +390,7 @@ func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set {
}
func (c *Cluster) labelsSelector() *metav1.LabelSelector {
return &metav1.LabelSelector{c.labelsSet(false), nil}
return &metav1.LabelSelector{MatchLabels: c.labelsSet(false), MatchExpressions: nil}
}
func (c *Cluster) roleLabelsSet(role PostgresRole) labels.Set {

View File

@ -151,9 +151,9 @@ func (c *Controller) initPodServiceAccount() {
switch {
case err != nil:
panic(fmt.Errorf("Unable to parse pod service account definiton from the operator config map: %v", err))
panic(fmt.Errorf("Unable to parse pod service account definition from the operator config map: %v", err))
case groupVersionKind.Kind != "ServiceAccount":
panic(fmt.Errorf("pod service account definiton in the operator config map defines another type of resource: %v", groupVersionKind.Kind))
panic(fmt.Errorf("pod service account definition in the operator config map defines another type of resource: %v", groupVersionKind.Kind))
default:
c.PodServiceAccount = obj.(*v1.ServiceAccount)
if c.PodServiceAccount.Name != c.opConfig.PodServiceAccountName {

View File

@ -7,8 +7,8 @@ import (
)
var (
True bool = true
False bool = false
True = true
False = false
)
func TestMergeDeprecatedPostgreSQLSpecParameters(t *testing.T) {

View File

@ -114,7 +114,7 @@ func readDecodedRole(s string) (*spec.PgUser, error) {
return &result, nil
}
func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (result map[string]spec.PgUser, err error) {
func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (map[string]spec.PgUser, error) {
if *rolesSecret == (spec.NamespacedName{}) {
// we don't have infrastructure roles defined, bail out
return nil, nil
@ -129,7 +129,7 @@ func (c *Controller) getInfrastructureRoles(rolesSecret *spec.NamespacedName) (r
}
secretData := infraRolesSecret.Data
result = make(map[string]spec.PgUser)
result := make(map[string]spec.PgUser)
Users:
// in worst case we would have one line per user
for i := 1; i <= len(secretData); i++ {
@ -171,9 +171,10 @@ Users:
if infraRolesMap, err := c.KubeClient.ConfigMaps(rolesSecret.Namespace).Get(rolesSecret.Name, metav1.GetOptions{}); err == nil {
// we have a configmap with username - json description, let's read and decode it
for role, s := range infraRolesMap.Data {
if roleDescr, err := readDecodedRole(s); err != nil {
roleDescr, err := readDecodedRole(s)
if err != nil {
return nil, fmt.Errorf("could not decode role description: %v", err)
} else {
}
// check if we have a a password in a configmap
c.logger.Debugf("found role description for role %q: %+v", role, roleDescr)
if passwd, ok := secretData[role]; ok {
@ -188,7 +189,6 @@ Users:
result[role] = *roleDescr
}
}
}
if len(secretData) > 0 {
c.logger.Warningf("%d unprocessed entries in the infrastructure roles secret,"+

View File

@ -58,7 +58,7 @@ type Patroni struct {
// CloneDescription describes which cluster the new should clone and up to which point in time
type CloneDescription struct {
ClusterName string `json:"cluster,omitempty"`
Uid string `json:"uid,omitempty"`
UID string `json:"uid,omitempty"`
EndTimestamp string `json:"timestamp,omitempty"`
}
@ -119,7 +119,7 @@ type PostgresSpec struct {
EnableMasterLoadBalancer *bool `json:"enableMasterLoadBalancer,omitempty"`
EnableReplicaLoadBalancer *bool `json:"enableReplicaLoadBalancer,omitempty"`
// deprecated load balancer settings mantained for backward compatibility
// deprecated load balancer settings maintained for backward compatibility
// see "Load balancers" operator docs
UseLoadBalancer *bool `json:"useLoadBalancer,omitempty"`
ReplicaLoadBalancer *bool `json:"replicaLoadBalancer,omitempty"`
@ -161,22 +161,22 @@ func (p *Postgresql) Clone() *Postgresql {
return c
}
func (in *Postgresql) DeepCopyInto(out *Postgresql) {
if in != nil {
out = deepcopy.Copy(in).(*Postgresql)
func (p *Postgresql) DeepCopyInto(out *Postgresql) {
if p != nil {
*out = deepcopy.Copy(*p).(Postgresql)
}
return
}
func (in *Postgresql) DeepCopy() *Postgresql {
if in == nil { return nil }
func (p *Postgresql) DeepCopy() *Postgresql {
if p == nil { return nil }
out := new(Postgresql)
in.DeepCopyInto(out)
p.DeepCopyInto(out)
return out
}
func (in *Postgresql) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
func (p *Postgresql) DeepCopyObject() runtime.Object {
if c := p.DeepCopy(); c != nil {
return c
}
return nil
@ -309,28 +309,6 @@ type postgresqlListCopy PostgresqlList
type postgresqlCopy Postgresql
func (in *PostgresqlList) DeepCopy() *PostgresqlList {
if in == nil { return nil }
out := new(PostgresqlList)
in.DeepCopyInto(out)
return out
}
func (in *PostgresqlList) DeepCopyInto(out *PostgresqlList) {
if in != nil {
out = deepcopy.Copy(in).(*PostgresqlList)
}
return
}
func (in *PostgresqlList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// UnmarshalJSON converts a JSON into the PostgreSQL object.
func (p *Postgresql) UnmarshalJSON(data []byte) error {
var tmp postgresqlCopy
@ -380,6 +358,28 @@ func (pl *PostgresqlList) UnmarshalJSON(data []byte) error {
return nil
}
func (pl *PostgresqlList) DeepCopy() *PostgresqlList {
if pl == nil { return nil }
out := new(PostgresqlList)
pl.DeepCopyInto(out)
return out
}
func (pl *PostgresqlList) DeepCopyInto(out *PostgresqlList) {
if pl != nil {
*out = deepcopy.Copy(*pl).(PostgresqlList)
}
return
}
func (pl *PostgresqlList) DeepCopyObject() runtime.Object {
if c := pl.DeepCopy(); c != nil {
return c
}
return nil
}
func (status PostgresStatus) Success() bool {
return status != ClusterStatusAddFailed &&
status != ClusterStatusUpdateFailed &&

View File

@ -278,5 +278,4 @@ func (d *Duration) UnmarshalJSON(b []byte) error {
default:
return fmt.Errorf("could not recognize type %T as a valid type to unmarshal to Duration", val)
}
return nil
}

View File

@ -154,6 +154,27 @@ func (opc *OperatorConfiguration) UnmarshalJSON(data []byte) error {
return nil
}
func (opc *OperatorConfiguration) DeepCopyInto(out *OperatorConfiguration) {
if opc != nil {
*out = deepcopy.Copy(*opc).(OperatorConfiguration)
}
return
}
func (opc *OperatorConfiguration) DeepCopy() *OperatorConfiguration {
if opc == nil { return nil }
out := new(OperatorConfiguration)
opc.DeepCopyInto(out)
return out
}
func (opc *OperatorConfiguration) DeepCopyObject() runtime.Object {
if c := opc.DeepCopy(); c != nil {
return c
}
return nil
}
func (opcl *OperatorConfigurationList) UnmarshalJSON(data []byte) error {
var ref OperatorConfigurationListCopy
if err := json.Unmarshal(data, &ref); err != nil {
@ -163,43 +184,22 @@ func (opcl *OperatorConfigurationList) UnmarshalJSON(data []byte) error {
return nil
}
func (in *OperatorConfiguration) DeepCopyInto(out *OperatorConfiguration) {
if in != nil {
out = deepcopy.Copy(in).(*OperatorConfiguration)
func (opcl *OperatorConfigurationList) DeepCopyInto(out *OperatorConfigurationList) {
if opcl != nil {
*out = deepcopy.Copy(*opcl).(OperatorConfigurationList)
}
return
}
func (in *OperatorConfiguration) DeepCopy() *OperatorConfiguration {
if in == nil { return nil }
out := new(OperatorConfiguration)
in.DeepCopyInto(out)
return out
}
func (in *OperatorConfiguration) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
func (in *OperatorConfigurationList) DeepCopyInto(out *OperatorConfigurationList) {
if in != nil {
out = deepcopy.Copy(in).(*OperatorConfigurationList)
}
return
}
func (in *OperatorConfigurationList) DeepCopy() *OperatorConfigurationList {
if in == nil { return nil }
func (opcl *OperatorConfigurationList) DeepCopy() *OperatorConfigurationList {
if opcl == nil { return nil }
out := new(OperatorConfigurationList)
in.DeepCopyInto(out)
opcl.DeepCopyInto(out)
return out
}
func (in *OperatorConfigurationList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
func (opcl *OperatorConfigurationList) DeepCopyObject() runtime.Object {
if c := opcl.DeepCopy(); c != nil {
return c
}
return nil

View File

@ -80,12 +80,10 @@ func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
return fmt.Errorf("could not encode json: %v", err)
}
return p.httpPostOrPatch(http.MethodPost, apiURL(master)+failoverPath, buf)
return nil
}
//TODO: add an option call /patroni to check if it is necessary to restart the server
// SetPostgresParameters sets Postgres options via Patroni patch API call.
//SetPostgresParameters sets Postgres options via Patroni patch API call.
func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error {
buf := &bytes.Buffer{}
err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"postgresql": {"parameters": parameters}})