fix: prevent process_lost when K8s Job completes (FAR-10) #9
@@ -1,6 +1,6 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import { isK8s404, buildPartialRunError, isReattachableOrphan } from "./execute.js";
|
||||
import { isK8s404, buildPartialRunError, isReattachableOrphan, describePodTerminatedError } from "./execute.js";
|
||||
|
||||
function makeJob(opts: {
|
||||
runId?: string;
|
||||
@@ -186,3 +186,62 @@ describe("isReattachableOrphan", () => {
|
||||
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
// Regression: FAR-10 — waitForPod must throw on phase=Failed, not return the pod name.
|
||||
// These tests cover describePodTerminatedError, the helper that waitForPod uses to build
|
||||
// the error message before throwing. Verifies that phase=Failed with no claude logs
|
||||
// produces a structured, actionable error instead of silently entering the log-stream path.
|
||||
describe("describePodTerminatedError", () => {
|
||||
it("includes exit code and reason when claude container status is available", () => {
|
||||
const cs = [
|
||||
{
|
||||
name: "claude",
|
||||
state: { terminated: { exitCode: 137, reason: "OOMKilled" } },
|
||||
},
|
||||
] as k8s.V1ContainerStatus[];
|
||||
const msg = describePodTerminatedError("mypod", "Failed", cs);
|
||||
expect(msg).toContain("137");
|
||||
expect(msg).toContain("OOMKilled");
|
||||
expect(msg).toContain("phase=Failed");
|
||||
});
|
||||
|
||||
it("falls back to message field when reason is absent", () => {
|
||||
const cs = [
|
||||
{
|
||||
name: "claude",
|
||||
state: { terminated: { exitCode: 1, message: "signal: killed" } },
|
||||
},
|
||||
] as k8s.V1ContainerStatus[];
|
||||
const msg = describePodTerminatedError("mypod", "Failed", cs);
|
||||
expect(msg).toContain("signal: killed");
|
||||
expect(msg).toContain("1");
|
||||
});
|
||||
|
||||
it("returns generic message when no claude container status is present", () => {
|
||||
const msg = describePodTerminatedError("mypod", "Failed", []);
|
||||
expect(msg).toBe("Pod mypod reached phase=Failed");
|
||||
});
|
||||
|
||||
it("ignores non-claude containers", () => {
|
||||
const cs = [
|
||||
{
|
||||
name: "sidecar",
|
||||
state: { terminated: { exitCode: 0, reason: "Completed" } },
|
||||
},
|
||||
] as k8s.V1ContainerStatus[];
|
||||
const msg = describePodTerminatedError("mypod", "Failed", cs);
|
||||
expect(msg).toBe("Pod mypod reached phase=Failed");
|
||||
});
|
||||
|
||||
it("handles null exitCode gracefully", () => {
|
||||
const cs = [
|
||||
{
|
||||
name: "claude",
|
||||
state: { terminated: { exitCode: null, reason: "Error" } },
|
||||
},
|
||||
] as unknown as k8s.V1ContainerStatus[];
|
||||
const msg = describePodTerminatedError("mypod", "Failed", cs);
|
||||
expect(msg).toContain("unknown");
|
||||
expect(msg).toContain("Error");
|
||||
});
|
||||
});
|
||||
|
||||
+70
-5
@@ -16,6 +16,10 @@ const POLL_INTERVAL_MS = 2000;
|
||||
const KEEPALIVE_INTERVAL_MS = 15_000;
|
||||
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
|
||||
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
|
||||
// How long to keep refreshing onSpawn after the Job reaches a terminal state.
|
||||
// Covers the cleanup path (delete job, parse stdout) so a slow K8s API call
|
||||
// doesn't trip the 5-minute reaper staleness window.
|
||||
const POST_TERMINAL_KEEPALIVE_MS = 90_000;
|
||||
|
||||
/**
|
||||
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
|
||||
@@ -96,6 +100,27 @@ export function isReattachableOrphan(
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an error message for a pod that reached phase=Failed before or
|
||||
* instead of streaming logs. Includes the claude container's terminated exit
|
||||
* code and reason when available so operators can diagnose crashes without
|
||||
* needing kubectl. Exported for unit tests.
|
||||
*/
|
||||
export function describePodTerminatedError(
|
||||
podName: string,
|
||||
phase: string,
|
||||
containerStatuses: k8s.V1ContainerStatus[],
|
||||
): string {
|
||||
const mainCs = containerStatuses.find((cs) => cs.name === "claude");
|
||||
const terminated = mainCs?.state?.terminated;
|
||||
if (terminated) {
|
||||
const code = terminated.exitCode ?? "unknown";
|
||||
const reason = terminated.reason ?? terminated.message ?? "no reason";
|
||||
return `Pod ${podName} reached phase=${phase}: claude exited ${code} (${reason})`;
|
||||
}
|
||||
return `Pod ${podName} reached phase=${phase}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the Job's pod to reach a terminal or running state.
|
||||
* Returns the pod name once logs can be streamed, or throws on failure.
|
||||
@@ -147,15 +172,22 @@ async function waitForPod(
|
||||
for (const cs of containerStatuses) {
|
||||
if (cs.state?.waiting) details.push(`${cs.name}: waiting (${cs.state.waiting.reason ?? "unknown"})`);
|
||||
else if (cs.state?.running) details.push(`${cs.name}: running`);
|
||||
else if (cs.state?.terminated) details.push(`${cs.name}: terminated (exit ${cs.state.terminated.exitCode ?? "?"}, ${cs.state.terminated.reason ?? "no reason"})`);
|
||||
}
|
||||
await onLog("stdout", `[paperclip] Pod ${podName}: ${details.join(", ")}\n`);
|
||||
lastStatus = statusKey;
|
||||
}
|
||||
|
||||
// Ready to stream logs
|
||||
if (phase === "Running" || phase === "Succeeded" || phase === "Failed") {
|
||||
if (phase === "Running" || phase === "Succeeded") {
|
||||
return podName;
|
||||
}
|
||||
// phase=Failed means the pod crashed before we could stream logs.
|
||||
// Throwing here routes the caller into the error path with a structured
|
||||
// message instead of entering the log-streaming path with a dead pod.
|
||||
if (phase === "Failed") {
|
||||
throw new Error(describePodTerminatedError(podName, phase, containerStatuses));
|
||||
}
|
||||
|
||||
// Init containers done + main running (phase may still say Pending briefly)
|
||||
const allInitsDone = initStatuses.length > 0 && initStatuses.every(
|
||||
@@ -218,6 +250,7 @@ async function streamPodLogsOnce(
|
||||
kubeconfigPath?: string,
|
||||
sinceSeconds?: number,
|
||||
dedup?: LogLineDedupFilter,
|
||||
stopSignal?: { stopped: boolean },
|
||||
): Promise<string> {
|
||||
const logApi = getLogApi(kubeconfigPath);
|
||||
const chunks: string[] = [];
|
||||
@@ -235,6 +268,18 @@ async function streamPodLogsOnce(
|
||||
},
|
||||
});
|
||||
|
||||
// When the job completion signal fires, destroy the writable to abort the
|
||||
// in-flight follow stream. Without this, logApi.log can hang indefinitely
|
||||
// when the pod terminates without closing the HTTP connection cleanly.
|
||||
let stopPoller: ReturnType<typeof setInterval> | null = null;
|
||||
if (stopSignal) {
|
||||
stopPoller = setInterval(() => {
|
||||
if (stopSignal.stopped && !writable.destroyed) {
|
||||
writable.destroy();
|
||||
}
|
||||
}, 200);
|
||||
}
|
||||
|
||||
try {
|
||||
await logApi.log(namespace, podName, "claude", writable, {
|
||||
follow: true,
|
||||
@@ -242,8 +287,10 @@ async function streamPodLogsOnce(
|
||||
...(sinceSeconds ? { sinceSeconds } : {}),
|
||||
});
|
||||
} catch {
|
||||
// follow may fail if the container already exited or the API
|
||||
// connection dropped — not fatal, caller decides whether to retry.
|
||||
// follow may fail if the container already exited, the API connection
|
||||
// dropped, or we aborted via writable.destroy() — not fatal.
|
||||
} finally {
|
||||
if (stopPoller) clearInterval(stopPoller);
|
||||
}
|
||||
|
||||
return chunks.join("");
|
||||
@@ -293,7 +340,7 @@ async function streamPodLogs(
|
||||
}
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
@@ -739,11 +786,27 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
let lastLogAt = Date.now();
|
||||
let keepaliveTick = 0;
|
||||
let keepaliveJobTerminal = false;
|
||||
let keepaliveJobTerminalAt: number | null = null;
|
||||
keepaliveTimer = setInterval(() => {
|
||||
// Fire-and-forget the async work; setInterval callbacks must be
|
||||
// synchronous or the timer will drift.
|
||||
void (async () => {
|
||||
if (keepaliveJobTerminal) return;
|
||||
if (keepaliveJobTerminal) {
|
||||
// Post-terminal window: keep refreshing onSpawn during cleanup
|
||||
// (job deletion, log parsing, K8s API calls) so the reaper doesn't
|
||||
// fire a false process_lost while execute() is still running.
|
||||
if (
|
||||
ctx.onSpawn &&
|
||||
keepaliveJobTerminalAt !== null &&
|
||||
Date.now() - keepaliveJobTerminalAt <= POST_TERMINAL_KEEPALIVE_MS
|
||||
) {
|
||||
keepaliveTick++;
|
||||
if (keepaliveTick % 6 === 0) {
|
||||
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Verify the Job is still alive before announcing or refreshing.
|
||||
try {
|
||||
@@ -753,6 +816,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
);
|
||||
if (terminal) {
|
||||
keepaliveJobTerminal = true;
|
||||
keepaliveJobTerminalAt = Date.now();
|
||||
return;
|
||||
}
|
||||
} catch (err: unknown) {
|
||||
@@ -762,6 +826,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// window as a safety net.
|
||||
if (isK8s404(err)) {
|
||||
keepaliveJobTerminal = true;
|
||||
keepaliveJobTerminalAt = Date.now();
|
||||
return;
|
||||
}
|
||||
// Log transient errors but leave keepaliveJobTerminal false so
|
||||
|
||||
Reference in New Issue
Block a user