fix: prevent process_lost when K8s Job completes (FAR-10) #9

Merged
cpfarhood merged 1 commits from fix/far-10-process-lost-after-job-complete into master 2026-04-23 16:07:33 +00:00
2 changed files with 130 additions and 6 deletions
+60 -1
View File
@@ -1,6 +1,6 @@
import { describe, it, expect } from "vitest";
import type * as k8s from "@kubernetes/client-node";
import { isK8s404, buildPartialRunError, isReattachableOrphan } from "./execute.js";
import { isK8s404, buildPartialRunError, isReattachableOrphan, describePodTerminatedError } from "./execute.js";
function makeJob(opts: {
runId?: string;
@@ -186,3 +186,62 @@ describe("isReattachableOrphan", () => {
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
});
// Regression: FAR-10 — waitForPod must throw on phase=Failed, not return the pod name.
// These tests cover describePodTerminatedError, the helper that waitForPod uses to build
// the error message before throwing. Verifies that phase=Failed with no claude logs
// produces a structured, actionable error instead of silently entering the log-stream path.
describe("describePodTerminatedError", () => {
it("includes exit code and reason when claude container status is available", () => {
const cs = [
{
name: "claude",
state: { terminated: { exitCode: 137, reason: "OOMKilled" } },
},
] as k8s.V1ContainerStatus[];
const msg = describePodTerminatedError("mypod", "Failed", cs);
expect(msg).toContain("137");
expect(msg).toContain("OOMKilled");
expect(msg).toContain("phase=Failed");
});
it("falls back to message field when reason is absent", () => {
const cs = [
{
name: "claude",
state: { terminated: { exitCode: 1, message: "signal: killed" } },
},
] as k8s.V1ContainerStatus[];
const msg = describePodTerminatedError("mypod", "Failed", cs);
expect(msg).toContain("signal: killed");
expect(msg).toContain("1");
});
it("returns generic message when no claude container status is present", () => {
const msg = describePodTerminatedError("mypod", "Failed", []);
expect(msg).toBe("Pod mypod reached phase=Failed");
});
it("ignores non-claude containers", () => {
const cs = [
{
name: "sidecar",
state: { terminated: { exitCode: 0, reason: "Completed" } },
},
] as k8s.V1ContainerStatus[];
const msg = describePodTerminatedError("mypod", "Failed", cs);
expect(msg).toBe("Pod mypod reached phase=Failed");
});
it("handles null exitCode gracefully", () => {
const cs = [
{
name: "claude",
state: { terminated: { exitCode: null, reason: "Error" } },
},
] as unknown as k8s.V1ContainerStatus[];
const msg = describePodTerminatedError("mypod", "Failed", cs);
expect(msg).toContain("unknown");
expect(msg).toContain("Error");
});
});
+70 -5
View File
@@ -16,6 +16,10 @@ const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
// How long to keep refreshing onSpawn after the Job reaches a terminal state.
// Covers the cleanup path (delete job, parse stdout) so a slow K8s API call
// doesn't trip the 5-minute reaper staleness window.
const POST_TERMINAL_KEEPALIVE_MS = 90_000;
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
@@ -96,6 +100,27 @@ export function isReattachableOrphan(
return true;
}
/**
* Build an error message for a pod that reached phase=Failed before or
* instead of streaming logs. Includes the claude container's terminated exit
* code and reason when available so operators can diagnose crashes without
* needing kubectl. Exported for unit tests.
*/
export function describePodTerminatedError(
podName: string,
phase: string,
containerStatuses: k8s.V1ContainerStatus[],
): string {
const mainCs = containerStatuses.find((cs) => cs.name === "claude");
const terminated = mainCs?.state?.terminated;
if (terminated) {
const code = terminated.exitCode ?? "unknown";
const reason = terminated.reason ?? terminated.message ?? "no reason";
return `Pod ${podName} reached phase=${phase}: claude exited ${code} (${reason})`;
}
return `Pod ${podName} reached phase=${phase}`;
}
/**
* Wait for the Job's pod to reach a terminal or running state.
* Returns the pod name once logs can be streamed, or throws on failure.
@@ -147,15 +172,22 @@ async function waitForPod(
for (const cs of containerStatuses) {
if (cs.state?.waiting) details.push(`${cs.name}: waiting (${cs.state.waiting.reason ?? "unknown"})`);
else if (cs.state?.running) details.push(`${cs.name}: running`);
else if (cs.state?.terminated) details.push(`${cs.name}: terminated (exit ${cs.state.terminated.exitCode ?? "?"}, ${cs.state.terminated.reason ?? "no reason"})`);
}
await onLog("stdout", `[paperclip] Pod ${podName}: ${details.join(", ")}\n`);
lastStatus = statusKey;
}
// Ready to stream logs
if (phase === "Running" || phase === "Succeeded" || phase === "Failed") {
if (phase === "Running" || phase === "Succeeded") {
return podName;
}
// phase=Failed means the pod crashed before we could stream logs.
// Throwing here routes the caller into the error path with a structured
// message instead of entering the log-streaming path with a dead pod.
if (phase === "Failed") {
throw new Error(describePodTerminatedError(podName, phase, containerStatuses));
}
// Init containers done + main running (phase may still say Pending briefly)
const allInitsDone = initStatuses.length > 0 && initStatuses.every(
@@ -218,6 +250,7 @@ async function streamPodLogsOnce(
kubeconfigPath?: string,
sinceSeconds?: number,
dedup?: LogLineDedupFilter,
stopSignal?: { stopped: boolean },
): Promise<string> {
const logApi = getLogApi(kubeconfigPath);
const chunks: string[] = [];
@@ -235,6 +268,18 @@ async function streamPodLogsOnce(
},
});
// When the job completion signal fires, destroy the writable to abort the
// in-flight follow stream. Without this, logApi.log can hang indefinitely
// when the pod terminates without closing the HTTP connection cleanly.
let stopPoller: ReturnType<typeof setInterval> | null = null;
if (stopSignal) {
stopPoller = setInterval(() => {
if (stopSignal.stopped && !writable.destroyed) {
writable.destroy();
}
}, 200);
}
try {
await logApi.log(namespace, podName, "claude", writable, {
follow: true,
@@ -242,8 +287,10 @@ async function streamPodLogsOnce(
...(sinceSeconds ? { sinceSeconds } : {}),
});
} catch {
// follow may fail if the container already exited or the API
// connection dropped — not fatal, caller decides whether to retry.
// follow may fail if the container already exited, the API connection
// dropped, or we aborted via writable.destroy() — not fatal.
} finally {
if (stopPoller) clearInterval(stopPoller);
}
return chunks.join("");
@@ -293,7 +340,7 @@ async function streamPodLogs(
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
@@ -739,11 +786,27 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let lastLogAt = Date.now();
let keepaliveTick = 0;
let keepaliveJobTerminal = false;
let keepaliveJobTerminalAt: number | null = null;
keepaliveTimer = setInterval(() => {
// Fire-and-forget the async work; setInterval callbacks must be
// synchronous or the timer will drift.
void (async () => {
if (keepaliveJobTerminal) return;
if (keepaliveJobTerminal) {
// Post-terminal window: keep refreshing onSpawn during cleanup
// (job deletion, log parsing, K8s API calls) so the reaper doesn't
// fire a false process_lost while execute() is still running.
if (
ctx.onSpawn &&
keepaliveJobTerminalAt !== null &&
Date.now() - keepaliveJobTerminalAt <= POST_TERMINAL_KEEPALIVE_MS
) {
keepaliveTick++;
if (keepaliveTick % 6 === 0) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
}
return;
}
// Verify the Job is still alive before announcing or refreshing.
try {
@@ -753,6 +816,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
);
if (terminal) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
return;
}
} catch (err: unknown) {
@@ -762,6 +826,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// window as a safety net.
if (isK8s404(err)) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
return;
}
// Log transient errors but leave keepaliveJobTerminal false so