Restart the listener if pod is evicted or OOMKilled is set on the pod

This commit is contained in:
Nikola Jokic 2025-12-04 11:53:32 +01:00
parent 540269880f
commit 4b9b330666
No known key found for this signature in database
GPG Key ID: 554517D3D15A5D2F
2 changed files with 163 additions and 26 deletions

View File

@ -211,7 +211,14 @@ func (r *AutoscalingListenerReconciler) Reconcile(ctx context.Context, req ctrl.
// TODO: make sure the role binding has the up-to-date role and service account
listenerPod := new(corev1.Pod)
if err := r.Get(ctx, client.ObjectKey{Namespace: autoscalingListener.Namespace, Name: autoscalingListener.Name}, listenerPod); err != nil {
if err := r.Get(
ctx,
client.ObjectKey{
Namespace: autoscalingListener.Namespace,
Name: autoscalingListener.Name,
},
listenerPod,
); err != nil {
if !kerrors.IsNotFound(err) {
log.Error(err, "Unable to get listener pod", "namespace", autoscalingListener.Namespace, "name", autoscalingListener.Name)
return ctrl.Result{}, err
@ -229,37 +236,39 @@ func (r *AutoscalingListenerReconciler) Reconcile(ctx context.Context, req ctrl.
cs := listenerContainerStatus(listenerPod)
switch {
case listenerPod.Status.Reason == "Evicted":
log.Info(
"Listener pod is evicted",
"phase", listenerPod.Status.Phase,
"reason", listenerPod.Status.Reason,
"message", listenerPod.Status.Message,
)
return ctrl.Result{}, r.deleteListenerPod(ctx, autoscalingListener, listenerPod, log)
case listenerPod.Status.Reason == "OOMKilled":
log.Info(
"Listener pod is OOMKilled",
"phase", listenerPod.Status.Phase,
"reason", listenerPod.Status.Reason,
"message", listenerPod.Status.Message,
)
return ctrl.Result{}, r.deleteListenerPod(ctx, autoscalingListener, listenerPod, log)
case cs == nil:
log.Info("Listener pod is not ready", "namespace", listenerPod.Namespace, "name", listenerPod.Name)
return ctrl.Result{}, nil
case cs.State.Terminated != nil:
log.Info("Listener pod is terminated", "namespace", listenerPod.Namespace, "name", listenerPod.Name, "reason", cs.State.Terminated.Reason, "message", cs.State.Terminated.Message)
log.Info(
"Listener pod is terminated",
"namespace", listenerPod.Namespace,
"name", listenerPod.Name,
"reason", cs.State.Terminated.Reason,
"message", cs.State.Terminated.Message,
)
if err := r.publishRunningListener(autoscalingListener, false); err != nil {
log.Error(err, "Unable to publish runner listener down metric", "namespace", listenerPod.Namespace, "name", listenerPod.Name)
}
return ctrl.Result{}, r.deleteListenerPod(ctx, autoscalingListener, listenerPod, log)
if listenerPod.DeletionTimestamp.IsZero() {
log.Info("Deleting the listener pod", "namespace", listenerPod.Namespace, "name", listenerPod.Name)
if err := r.Delete(ctx, listenerPod); err != nil && !kerrors.IsNotFound(err) {
log.Error(err, "Unable to delete the listener pod", "namespace", listenerPod.Namespace, "name", listenerPod.Name)
return ctrl.Result{}, err
}
// delete the listener config secret as well, so it gets recreated when the listener pod is recreated, with any new data if it exists
var configSecret corev1.Secret
err := r.Get(ctx, types.NamespacedName{Namespace: autoscalingListener.Namespace, Name: scaleSetListenerConfigName(autoscalingListener)}, &configSecret)
switch {
case err == nil && configSecret.DeletionTimestamp.IsZero():
log.Info("Deleting the listener config secret")
if err := r.Delete(ctx, &configSecret); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to delete listener config secret: %w", err)
}
case !kerrors.IsNotFound(err):
return ctrl.Result{}, fmt.Errorf("failed to get the listener config secret: %w", err)
}
}
return ctrl.Result{}, nil
case cs.State.Running != nil:
if err := r.publishRunningListener(autoscalingListener, true); err != nil {
log.Error(err, "Unable to publish running listener", "namespace", listenerPod.Namespace, "name", listenerPod.Name)
@ -269,10 +278,39 @@ func (r *AutoscalingListenerReconciler) Reconcile(ctx context.Context, req ctrl.
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}
func (r *AutoscalingListenerReconciler) deleteListenerPod(ctx context.Context, autoscalingListener *v1alpha1.AutoscalingListener, listenerPod *corev1.Pod, log logr.Logger) error {
if err := r.publishRunningListener(autoscalingListener, false); err != nil {
log.Error(err, "Unable to publish runner listener down metric", "namespace", listenerPod.Namespace, "name", listenerPod.Name)
}
if listenerPod.DeletionTimestamp.IsZero() {
log.Info("Deleting the listener pod", "namespace", listenerPod.Namespace, "name", listenerPod.Name)
if err := r.Delete(ctx, listenerPod); err != nil && !kerrors.IsNotFound(err) {
log.Error(err, "Unable to delete the listener pod", "namespace", listenerPod.Namespace, "name", listenerPod.Name)
return err
}
// delete the listener config secret as well, so it gets recreated when the listener pod is recreated, with any new data if it exists
var configSecret corev1.Secret
err := r.Get(ctx, types.NamespacedName{Namespace: autoscalingListener.Namespace, Name: scaleSetListenerConfigName(autoscalingListener)}, &configSecret)
switch {
case err == nil && configSecret.DeletionTimestamp.IsZero():
log.Info("Deleting the listener config secret")
if err := r.Delete(ctx, &configSecret); err != nil {
return fmt.Errorf("failed to delete listener config secret: %w", err)
}
case !kerrors.IsNotFound(err):
return fmt.Errorf("failed to get the listener config secret: %w", err)
}
}
return nil
}
func (r *AutoscalingListenerReconciler) cleanupResources(ctx context.Context, autoscalingListener *v1alpha1.AutoscalingListener, logger logr.Logger) (requeue bool, err error) {
logger.Info("Cleaning up the listener pod")
listenerPod := new(corev1.Pod)

View File

@ -671,6 +671,105 @@ var _ = Describe("Test AutoScalingListener customization", func() {
autoscalingListenerTestInterval,
).ShouldNot(BeEquivalentTo(oldPodUID), "Pod should be created")
})
It("Should re-create pod when the listener pod is evicted", func() {
pod := new(corev1.Pod)
Eventually(
func() (string, error) {
err := k8sClient.Get(
ctx,
client.ObjectKey{
Name: autoscalingListener.Name,
Namespace: autoscalingListener.Namespace,
},
pod,
)
if err != nil {
return "", err
}
return pod.Name, nil
},
autoscalingListenerTestTimeout,
autoscalingListenerTestInterval,
).Should(
BeEquivalentTo(autoscalingListener.Name),
"Pod should be created",
)
updated := pod.DeepCopy()
oldPodUID := string(pod.UID)
updated.Status.Reason = "Evicted"
err := k8sClient.Status().Update(ctx, updated)
Expect(err).NotTo(HaveOccurred(), "failed to update pod status")
pod = new(corev1.Pod)
Eventually(
func() (string, error) {
err := k8sClient.Get(ctx, client.ObjectKey{Name: autoscalingListener.Name, Namespace: autoscalingListener.Namespace}, pod)
if err != nil {
return "", err
}
return string(pod.UID), nil
},
autoscalingListenerTestTimeout,
autoscalingListenerTestInterval,
).ShouldNot(
BeEquivalentTo(oldPodUID),
"Pod should be created",
)
})
It("Should re-create pod when the listener pod is OOMKilled", func() {
pod := new(corev1.Pod)
Eventually(
func() (string, error) {
err := k8sClient.Get(
ctx,
client.ObjectKey{
Name: autoscalingListener.Name,
Namespace: autoscalingListener.Namespace,
},
pod,
)
if err != nil {
return "", err
}
return pod.Name, nil
},
autoscalingListenerTestTimeout,
autoscalingListenerTestInterval,
).Should(BeEquivalentTo(autoscalingListener.Name), "Pod should be created")
updated := pod.DeepCopy()
oldPodUID := string(pod.UID)
updated.Status.Reason = "OOMKilled"
err := k8sClient.Status().Update(ctx, updated)
Expect(err).NotTo(HaveOccurred(), "failed to update pod status")
Eventually(
func() (string, error) {
pod = new(corev1.Pod)
err := k8sClient.Get(
ctx,
client.ObjectKey{
Name: autoscalingListener.Name,
Namespace: autoscalingListener.Namespace,
},
pod,
)
if err != nil {
return "", err
}
return string(pod.UID), nil
},
autoscalingListenerTestTimeout,
autoscalingListenerTestInterval,
).ShouldNot(BeEquivalentTo(oldPodUID), "Pod should be created")
})
})
})