Make code around recreating pods and creating objects in the database less brittle (#213)
There used to be a masterLess flag that was supposed to indicate whether the cluster it belongs to runs without the acting master by design. At some point, as we didn't really have support for such clusters, the flag has been misused to indicate there is no master in the cluster. However, that was not done consistently (a cluster without all pods running would never be masterless, even when the master is not among the running pods) and it was based on the wrong assumption that the masterless cluster will remain masterless until the next attempt to change that flag, ignoring the possibility of master coming up or some node doing a successful promotion. Therefore, this PR gets rid of that flag completely. When the cluster is running with 0 instances, there is obviously no master and it makes no sense to create any database objects inside the non-existing master. Therefore, this PR introduces an additional check for that. recreatePods were assuming that the roles of the pods recorded when the function has stared will not change; for instance, terminated replica pods should start as replicas. Revisit that assumption by looking at the actual role of the re-spawned pods; that avoids a failover if some replica has promoted to the master role while being re-spawned. In addition, if the failover from the old master was unsuccessful, we used to stop and leave the old master running on an old pod, without recording this fact anywhere. This PR makes the failover failure emit a warning, but not stop recreating the last master pod; in the worst case, the running master will be terminated, however, this case is rather unlikely one. As a side effect, make waitForPodLabel return the pod definition it waited for, avoiding extra API calls in recreatePods and movePodFromEndOfLifeNode
This commit is contained in:
parent
1f71c8d72f
commit
cca73e30b7
|
|
@ -66,7 +66,6 @@ type Cluster struct {
|
||||||
podSubscribersMu sync.RWMutex
|
podSubscribersMu sync.RWMutex
|
||||||
pgDb *sql.DB
|
pgDb *sql.DB
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
masterLess bool
|
|
||||||
userSyncStrategy spec.UserSyncer
|
userSyncStrategy spec.UserSyncer
|
||||||
deleteOptions *metav1.DeleteOptions
|
deleteOptions *metav1.DeleteOptions
|
||||||
podEventsQueue *cache.FIFO
|
podEventsQueue *cache.FIFO
|
||||||
|
|
@ -109,7 +108,6 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql
|
||||||
Secrets: make(map[types.UID]*v1.Secret),
|
Secrets: make(map[types.UID]*v1.Secret),
|
||||||
Services: make(map[PostgresRole]*v1.Service),
|
Services: make(map[PostgresRole]*v1.Service),
|
||||||
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
|
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
|
||||||
masterLess: false,
|
|
||||||
userSyncStrategy: users.DefaultUserSyncStrategy{},
|
userSyncStrategy: users.DefaultUserSyncStrategy{},
|
||||||
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
|
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
|
||||||
podEventsQueue: podEventsQueue,
|
podEventsQueue: podEventsQueue,
|
||||||
|
|
@ -276,7 +274,8 @@ func (c *Cluster) Create() error {
|
||||||
}
|
}
|
||||||
c.logger.Infof("pods are ready")
|
c.logger.Infof("pods are ready")
|
||||||
|
|
||||||
if !(c.masterLess || c.databaseAccessDisabled()) {
|
// create database objects unless we are running without pods or disabled that feature explicitely
|
||||||
|
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0) {
|
||||||
if err = c.createRoles(); err != nil {
|
if err = c.createRoles(); err != nil {
|
||||||
return fmt.Errorf("could not create users: %v", err)
|
return fmt.Errorf("could not create users: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -286,10 +285,6 @@ func (c *Cluster) Create() error {
|
||||||
return fmt.Errorf("could not sync databases: %v", err)
|
return fmt.Errorf("could not sync databases: %v", err)
|
||||||
}
|
}
|
||||||
c.logger.Infof("databases have been successfully created")
|
c.logger.Infof("databases have been successfully created")
|
||||||
} else {
|
|
||||||
if c.masterLess {
|
|
||||||
c.logger.Warnln("cluster is masterless")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.listResources(); err != nil {
|
if err := c.listResources(); err != nil {
|
||||||
|
|
@ -487,14 +482,6 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
|
||||||
c.logger.Errorf("could not sync secrets: %v", err)
|
c.logger.Errorf("could not sync secrets: %v", err)
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if !c.databaseAccessDisabled() {
|
|
||||||
c.logger.Debugf("syncing roles")
|
|
||||||
if err := c.syncRoles(); err != nil {
|
|
||||||
c.logger.Errorf("could not sync roles: %v", err)
|
|
||||||
updateFailed = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Volume
|
// Volume
|
||||||
|
|
@ -534,13 +521,20 @@ func (c *Cluster) Update(oldSpec, newSpec *spec.Postgresql) error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Databases
|
// Roles and Databases
|
||||||
if !reflect.DeepEqual(oldSpec.Spec.Databases, newSpec.Spec.Databases) {
|
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0) {
|
||||||
c.logger.Infof("syncing databases")
|
c.logger.Debugf("syncing roles")
|
||||||
if err := c.syncDatabases(); err != nil {
|
if err := c.syncRoles(); err != nil {
|
||||||
c.logger.Errorf("could not sync databases: %v", err)
|
c.logger.Errorf("could not sync roles: %v", err)
|
||||||
updateFailed = true
|
updateFailed = true
|
||||||
}
|
}
|
||||||
|
if !reflect.DeepEqual(oldSpec.Spec.Databases, newSpec.Spec.Databases) {
|
||||||
|
c.logger.Infof("syncing databases")
|
||||||
|
if err := c.syncDatabases(); err != nil {
|
||||||
|
c.logger.Errorf("could not sync databases: %v", err)
|
||||||
|
updateFailed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -786,9 +780,11 @@ func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedNam
|
||||||
|
|
||||||
role := Master
|
role := Master
|
||||||
|
|
||||||
|
_, err := c.waitForPodLabel(ch, &role)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
case podLabelErr <- c.waitForPodLabel(ch, &role):
|
case podLabelErr <- err:
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -111,9 +111,14 @@ func (c *Cluster) registerPodSubscriber(podName spec.NamespacedName) chan spec.P
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) {
|
func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) {
|
||||||
|
var (
|
||||||
|
eol bool
|
||||||
|
err error
|
||||||
|
newPod *v1.Pod
|
||||||
|
)
|
||||||
podName := util.NameFromMeta(pod.ObjectMeta)
|
podName := util.NameFromMeta(pod.ObjectMeta)
|
||||||
|
|
||||||
if eol, err := c.podIsEndOfLife(pod); err != nil {
|
if eol, err = c.podIsEndOfLife(pod); err != nil {
|
||||||
return nil, fmt.Errorf("could not get node %q: %v", pod.Spec.NodeName, err)
|
return nil, fmt.Errorf("could not get node %q: %v", pod.Spec.NodeName, err)
|
||||||
} else if !eol {
|
} else if !eol {
|
||||||
c.logger.Infof("pod %q is already on a live node", podName)
|
c.logger.Infof("pod %q is already on a live node", podName)
|
||||||
|
|
@ -123,20 +128,15 @@ func (c *Cluster) movePodFromEndOfLifeNode(pod *v1.Pod) (*v1.Pod, error) {
|
||||||
c.setProcessName("moving pod %q out of end-of-life node %q", podName, pod.Spec.NodeName)
|
c.setProcessName("moving pod %q out of end-of-life node %q", podName, pod.Spec.NodeName)
|
||||||
c.logger.Infof("moving pod %q out of the end-of-life node %q", podName, pod.Spec.NodeName)
|
c.logger.Infof("moving pod %q out of the end-of-life node %q", podName, pod.Spec.NodeName)
|
||||||
|
|
||||||
if err := c.recreatePod(podName); err != nil {
|
if newPod, err = c.recreatePod(podName); err != nil {
|
||||||
return nil, fmt.Errorf("could not move pod: %v", err)
|
return nil, fmt.Errorf("could not move pod: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
newPod, err := c.KubeClient.Pods(podName.Namespace).Get(podName.Name, metav1.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not get pod: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if newPod.Spec.NodeName == pod.Spec.NodeName {
|
if newPod.Spec.NodeName == pod.Spec.NodeName {
|
||||||
return nil, fmt.Errorf("pod %q remained on the same node", podName)
|
return nil, fmt.Errorf("pod %q remained on the same node", podName)
|
||||||
}
|
}
|
||||||
|
|
||||||
if eol, err := c.podIsEndOfLife(newPod); err != nil {
|
if eol, err = c.podIsEndOfLife(newPod); err != nil {
|
||||||
return nil, fmt.Errorf("could not get node %q: %v", pod.Spec.NodeName, err)
|
return nil, fmt.Errorf("could not get node %q: %v", pod.Spec.NodeName, err)
|
||||||
} else if eol {
|
} else if eol {
|
||||||
c.logger.Warningf("pod %q moved to end-of-life node %q", podName, newPod.Spec.NodeName)
|
c.logger.Warningf("pod %q moved to end-of-life node %q", podName, newPod.Spec.NodeName)
|
||||||
|
|
@ -247,23 +247,23 @@ func (c *Cluster) MigrateReplicaPod(podName spec.NamespacedName, fromNodeName st
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) recreatePod(podName spec.NamespacedName) error {
|
func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
|
||||||
ch := c.registerPodSubscriber(podName)
|
ch := c.registerPodSubscriber(podName)
|
||||||
defer c.unregisterPodSubscriber(podName)
|
defer c.unregisterPodSubscriber(podName)
|
||||||
|
|
||||||
if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil {
|
if err := c.KubeClient.Pods(podName.Namespace).Delete(podName.Name, c.deleteOptions); err != nil {
|
||||||
return fmt.Errorf("could not delete pod: %v", err)
|
return nil, fmt.Errorf("could not delete pod: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.waitForPodDeletion(ch); err != nil {
|
if err := c.waitForPodDeletion(ch); err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := c.waitForPodLabel(ch, nil); err != nil {
|
if pod, err := c.waitForPodLabel(ch, nil); err != nil {
|
||||||
return err
|
return nil, err
|
||||||
|
} else {
|
||||||
|
c.logger.Infof("pod %q has been recreated", podName)
|
||||||
|
return pod, nil
|
||||||
}
|
}
|
||||||
c.logger.Infof("pod %q has been recreated", podName)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) recreatePods() error {
|
func (c *Cluster) recreatePods() error {
|
||||||
|
|
@ -281,7 +281,9 @@ func (c *Cluster) recreatePods() error {
|
||||||
}
|
}
|
||||||
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))
|
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))
|
||||||
|
|
||||||
var masterPod *v1.Pod
|
var (
|
||||||
|
masterPod, newMasterPod, newPod *v1.Pod
|
||||||
|
)
|
||||||
replicas := make([]spec.NamespacedName, 0)
|
replicas := make([]spec.NamespacedName, 0)
|
||||||
for i, pod := range pods.Items {
|
for i, pod := range pods.Items {
|
||||||
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
|
role := PostgresRole(pod.Labels[c.OpConfig.PodRoleLabel])
|
||||||
|
|
@ -292,27 +294,29 @@ func (c *Cluster) recreatePods() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
podName := util.NameFromMeta(pods.Items[i].ObjectMeta)
|
podName := util.NameFromMeta(pods.Items[i].ObjectMeta)
|
||||||
if err := c.recreatePod(podName); err != nil {
|
if newPod, err = c.recreatePod(podName); err != nil {
|
||||||
return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
|
return fmt.Errorf("could not recreate replica pod %q: %v", util.NameFromMeta(pod.ObjectMeta), err)
|
||||||
}
|
}
|
||||||
|
if newRole := PostgresRole(newPod.Labels[c.OpConfig.PodRoleLabel]); newRole == Replica {
|
||||||
replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta))
|
replicas = append(replicas, util.NameFromMeta(pod.ObjectMeta))
|
||||||
|
} else if newRole == Master {
|
||||||
|
newMasterPod = newPod
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if masterPod == nil {
|
if masterPod != nil {
|
||||||
c.logger.Warningln("no master pod in the cluster")
|
// failover if we have not observed a master pod when re-creating former replicas.
|
||||||
} else {
|
if newMasterPod == nil && len(replicas) > 0 {
|
||||||
if len(replicas) > 0 {
|
if err := c.ManualFailover(masterPod, masterCandidate(replicas)); err != nil {
|
||||||
err := c.ManualFailover(masterPod, masterCandidate(replicas))
|
c.logger.Warningf("could not perform failover: %v", err)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not perform manual failover: %v", err)
|
|
||||||
}
|
}
|
||||||
|
} else if newMasterPod == nil && len(replicas) == 0 {
|
||||||
|
c.logger.Warningf("cannot switch master role before re-creating the pod: no replicas")
|
||||||
}
|
}
|
||||||
//TODO: specify master, leave new master empty
|
c.logger.Infof("recreating old master pod %q", util.NameFromMeta(masterPod.ObjectMeta))
|
||||||
c.logger.Infof("recreating master pod %q", util.NameFromMeta(masterPod.ObjectMeta))
|
|
||||||
|
|
||||||
if err := c.recreatePod(util.NameFromMeta(masterPod.ObjectMeta)); err != nil {
|
if _, err := c.recreatePod(util.NameFromMeta(masterPod.ObjectMeta)); err != nil {
|
||||||
return fmt.Errorf("could not recreate master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err)
|
return fmt.Errorf("could not recreate old master pod %q: %v", util.NameFromMeta(masterPod.ObjectMeta), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,8 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !c.databaseAccessDisabled() {
|
// create database objects unless we are running without pods or disabled that feature explicitely
|
||||||
|
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0) {
|
||||||
c.logger.Debugf("syncing roles")
|
c.logger.Debugf("syncing roles")
|
||||||
if err = c.syncRoles(); err != nil {
|
if err = c.syncRoles(); err != nil {
|
||||||
err = fmt.Errorf("could not sync roles: %v", err)
|
err = fmt.Errorf("could not sync roles: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -223,7 +223,7 @@ func (c *Cluster) getTeamMembers() ([]string, error) {
|
||||||
return teamInfo.Members, nil
|
return teamInfo.Members, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRole) error {
|
func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRole) (*v1.Pod, error) {
|
||||||
timeout := time.After(c.OpConfig.PodLabelWaitTimeout)
|
timeout := time.After(c.OpConfig.PodLabelWaitTimeout)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|
@ -232,13 +232,13 @@ func (c *Cluster) waitForPodLabel(podEvents chan spec.PodEvent, role *PostgresRo
|
||||||
|
|
||||||
if role == nil {
|
if role == nil {
|
||||||
if podRole == Master || podRole == Replica {
|
if podRole == Master || podRole == Replica {
|
||||||
return nil
|
return podEvent.CurPod, nil
|
||||||
}
|
}
|
||||||
} else if *role == podRole {
|
} else if *role == podRole {
|
||||||
return nil
|
return podEvent.CurPod, nil
|
||||||
}
|
}
|
||||||
case <-timeout:
|
case <-timeout:
|
||||||
return fmt.Errorf("pod label wait timeout")
|
return nil, fmt.Errorf("pod label wait timeout")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -313,15 +313,12 @@ func (c *Cluster) waitPodLabelsReady() error {
|
||||||
return false, fmt.Errorf("too many masters")
|
return false, fmt.Errorf("too many masters")
|
||||||
}
|
}
|
||||||
if len(replicaPods.Items) == podsNumber {
|
if len(replicaPods.Items) == podsNumber {
|
||||||
c.masterLess = true
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil
|
return len(masterPods.Items)+len(replicaPods.Items) == podsNumber, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
//TODO: wait for master for a while and then set masterLess flag
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,6 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not encode json: %v", err)
|
return fmt.Errorf("could not encode json: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
request, err := http.NewRequest(http.MethodPost, apiURL(master)+failoverPath, buf)
|
request, err := http.NewRequest(http.MethodPost, apiURL(master)+failoverPath, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create request: %v", err)
|
return fmt.Errorf("could not create request: %v", err)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue