diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 6dcf92eed..36837d440 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -66,7 +66,6 @@ type Cluster struct { podSubscribersMu sync.RWMutex pgDb *sql.DB mu sync.Mutex - masterLess bool userSyncStrategy spec.UserSyncer deleteOptions *metav1.DeleteOptions podEventsQueue *cache.FIFO @@ -109,7 +108,6 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql Secrets: make(map[types.UID]*v1.Secret), Services: make(map[PostgresRole]*v1.Service), Endpoints: make(map[PostgresRole]*v1.Endpoints)}, - masterLess: false, userSyncStrategy: users.DefaultUserSyncStrategy{}, deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents}, podEventsQueue: podEventsQueue, @@ -276,7 +274,8 @@ func (c *Cluster) Create() error { } c.logger.Infof("pods are ready") - if !(c.masterLess || c.databaseAccessDisabled()) { + // create database objects unless we are running without pods or disabled that feature explicitely + if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0) { if err = c.createRoles(); err != nil { return fmt.Errorf("could not create users: %v", err) } @@ -286,10 +285,6 @@ func (c *Cluster) Create() error { return fmt.Errorf("could not sync databases: %v", err) } c.logger.Infof("databases have been successfully created") - } else { - if c.masterLess { - c.logger.Warnln("cluster is masterless") - } } if err := c.listResources(); err != nil { @@ -487,14 +482,6 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error { c.logger.Errorf("could not sync secrets: %v", err) updateFailed = true } - - if !c.databaseAccessDisabled() { - c.logger.Debugf("syncing roles") - if err := c.syncRoles(); err != nil { - c.logger.Errorf("could not sync roles: %v", err) - updateFailed = true - } - } } // Volume @@ -534,13 +521,20 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error { } }() - // Databases - if !reflect.DeepEqual(oldSpec.Spec.Databases, newSpec.Spec.Databases) { - c.logger.Infof("syncing databases") - if err := c.syncDatabases(); err != nil { - c.logger.Errorf("could not sync databases: %v", err) + // Roles and Databases + if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0) { + c.logger.Debugf("syncing roles") + if err := c.syncRoles(); err != nil { + c.logger.Errorf("could not sync roles: %v", err) updateFailed = true } + if !reflect.DeepEqual(oldSpec.Spec.Databases, newSpec.Spec.Databases) { + c.logger.Infof("syncing databases") + if err := c.syncDatabases(); err != nil { + c.logger.Errorf("could not sync databases: %v", err) + updateFailed = true + } + } } return nil @@ -786,9 +780,11 @@ func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedNam role := Master + _, err := c.waitForPodLabel(ch, &role) + select { case <-stopCh: - case podLabelErr <- c.waitForPodLabel(ch, &role): + case podLabelErr <- err: } }() diff --git a/pkg/cluster/pod.go b/pkg/cluster/pod.go index 79f33f188..68fdaa420 100644 --- a/pkg/cluster/pod.go +++ b/pkg/cluster/pod.go @@ -111,9 +111,14 @@ func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.P } func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) { + var ( + eol bool + err error + newPod *v1.Pod + ) podName := util.NameFromMeta(pod.ObjectMeta) - if eol, err := c.podIsEndOfLife(pod); err != nil { + if eol, err = c.podIsEndOfLife(pod); err != nil { return nil, fmt.Errorf("could not get node %q: %v", pod.Spec.NodeName, err) } else if !eol { c.logger.Infof("pod %q is already on a live node", podName) @@ -123,20 +128,15 @@ func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) { c.setProcessName("moving pod %q out of end-of-life node %q", podName, pod.Spec.NodeName) c.logger.Infof("moving pod %q out of the end-of-life node %q", podName, pod.Spec.NodeName) - if err := c.recreatePod(podName); err != nil { + if newPod, err = c.recreatePod(podName); err != nil { return nil, fmt.Errorf("could not move pod: %v", err) } - newPod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("could not get pod: %v", err) - } - if newPod.Spec.NodeName == pod.Spec.NodeName { return nil, fmt.Errorf("pod %q remained on the same node", podName) } - if eol, err := c.podIsEndOfLife(newPod); err != nil { + if eol, err = c.podIsEndOfLife(newPod); err != nil { return nil, fmt.Errorf("could not get node %q: %v", pod.Spec.NodeName, err) } else if eol { c.logger.Warningf("pod %q moved to end-of-life node %q", podName, newPod.Spec.NodeName) @@ -247,23 +247,23 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st return nil } -func (c *Cluster) recreatePod(podName spec.NamespacedName) error { +func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) { ch := c.registerPodSubscriber(podName) defer c.unregisterPodSubscriber(podName) if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil { - return fmt.Errorf("could not delete pod: %v", err) + return nil, fmt.Errorf("could not delete pod: %v", err) } if err := c.waitForPodDeletion(ch); err != nil { - return err + return nil, err } - if err := c.waitForPodLabel(ch, nil); err != nil { - return err + if pod, err := c.waitForPodLabel(ch, nil); err != nil { + return nil, err + } else { + c.logger.Infof("pod %q has been recreated", podName) + return pod, nil } - c.logger.Infof("pod %q has been recreated", podName) - - return nil } func (c *Cluster) recreatePods() error { @@ -281,7 +281,9 @@ func (c *Cluster) recreatePods() error { } c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items)) - var masterPod *v1.Pod + var ( + masterPod, newMasterPod, newPod *v1.Pod + ) replicas := make([]spec.NamespacedName, 0) for i, pod := range pods.Items { role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel]) @@ -292,27 +294,29 @@ func (c *Cluster) recreatePods() error { } podName := util.NameFromMeta(pods.Items[i].ObjectMeta) - if err := c.recreatePod(podName); err != nil { + if newPod, err = c.recreatePod(podName); err != nil { return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err) } - - replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta)) + if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica { + replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta)) + } else if newRole == Master { + newMasterPod = newPod + } } - if masterPod == nil { - c.logger.Warningln("no master pod in the cluster") - } else { - if len(replicas) > 0 { - err := c.ManualFailover(masterPod, masterCandidate(replicas)) - if err != nil { - return fmt.Errorf("could not perform manual failover: %v", err) + if masterPod != nil { + // failover if we have not observed a master pod when re-creating former replicas. + if newMasterPod == nil && len(replicas) > 0 { + if err := c.ManualFailover(masterPod, masterCandidate(replicas)); err != nil { + c.logger.Warningf("could not perform failover: %v", err) } + } else if newMasterPod == nil && len(replicas) == 0 { + c.logger.Warningf("cannot switch master role before re-creating the pod: no replicas") } - //TODO: specify master, leave new master empty - c.logger.Infof("recreating master pod %q", util.NameFromMeta(masterPod.ObjectMeta)) + c.logger.Infof("recreating old master pod %q", util.NameFromMeta(masterPod.ObjectMeta)) - if err := c.recreatePod(util.NameFromMeta(masterPod.ObjectMeta)); err != nil { - return fmt.Errorf("could not recreate master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err) + if _, err := c.recreatePod(util.NameFromMeta(masterPod.ObjectMeta)); err != nil { + return fmt.Errorf("could not recreate old master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err) } } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 26388eede..b6865b7b6 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -58,7 +58,8 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) { } } - if !c.databaseAccessDisabled() { + // create database objects unless we are running without pods or disabled that feature explicitely + if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0) { c.logger.Debugf("syncing roles") if err = c.syncRoles(); err != nil { err = fmt.Errorf("could not sync roles: %v", err) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 9bd292271..372e14b52 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -223,7 +223,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) { return teamInfo.Members, nil } -func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRole) error { +func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRole) (*v1.Pod, error) { timeout := time.After(c.OpConfig.PodLabelWaitTimeout) for { select { @@ -232,13 +232,13 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRo if role == nil { if podRole == Master || podRole == Replica { - return nil + return podEvent.CurPod, nil } } else if *role == podRole { - return nil + return podEvent.CurPod, nil } case <-timeout: - return fmt.Errorf("pod label wait timeout") + return nil, fmt.Errorf("pod label wait timeout") } } } @@ -313,15 +313,12 @@ func (c *Cluster) waitPodLabelsReady() error { return false, fmt.Errorf("too many masters") } if len(replicaPods.Items) == podsNumber { - c.masterLess = true return true, nil } return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil }) - //TODO: wait for master for a while and then set masterLess flag - return err } diff --git a/pkg/util/patroni/patroni.go b/pkg/util/patroni/patroni.go index b27a20766..f5cbbf678 100644 --- a/pkg/util/patroni/patroni.go +++ b/pkg/util/patroni/patroni.go @@ -53,7 +53,6 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error { if err != nil { return fmt.Errorf("could not encode json: %v", err) } - request, err := http.NewRequest(http.MethodPost, apiURL(master)+failoverPath, buf) if err != nil { return fmt.Errorf("could not create request: %v", err)