From b3c1519cf5af284813b92c818a650b4fa83b96e4 Mon Sep 17 00:00:00 2001 From: Gandalf the Greybeard Date: Thu, 23 Apr 2026 15:59:51 +0000 Subject: [PATCH] fix: prevent process_lost when K8s Job completes (FAR-10) Four stacked bugs caused the adapter to hang after K8s Job completion, allowing the 5-minute reaper to mark runs process_lost even when the Job actually succeeded. - streamPodLogsOnce: add stopSignal polling loop that destroys the writable every 200ms once the job-completion branch fires, aborting any in-flight follow stream that would otherwise hang indefinitely - waitForPod: treat phase=Failed as a terminal error (throw via describePodTerminatedError) instead of entering the log-stream path with a dead pod (new helper is exported for unit tests) - waitForPod: surface cs.state?.terminated in the per-tick detail line so operators see exit code / reason without needing kubectl - keepalive: add POST_TERMINAL_KEEPALIVE_MS (90s) window after Job goes terminal so onSpawn keeps refreshing updatedAt during cleanup; if execute() genuinely stalls past 90s the reaper will still catch it Regression tests added for describePodTerminatedError (phase=Failed with and without claude container status). Co-Authored-By: Paperclip --- src/server/execute.test.ts | 61 ++++++++++++++++++++++++++++++- src/server/execute.ts | 75 +++++++++++++++++++++++++++++++++++--- 2 files changed, 130 insertions(+), 6 deletions(-) diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index 1822a37..e4d7614 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect } from "vitest"; import type * as k8s from "@kubernetes/client-node"; -import { isK8s404, buildPartialRunError, isReattachableOrphan } from "./execute.js"; +import { isK8s404, buildPartialRunError, isReattachableOrphan, describePodTerminatedError } from "./execute.js"; function makeJob(opts: { runId?: string; @@ -186,3 +186,62 @@ describe("isReattachableOrphan", () => { expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false); }); }); + +// Regression: FAR-10 — waitForPod must throw on phase=Failed, not return the pod name. +// These tests cover describePodTerminatedError, the helper that waitForPod uses to build +// the error message before throwing. Verifies that phase=Failed with no claude logs +// produces a structured, actionable error instead of silently entering the log-stream path. +describe("describePodTerminatedError", () => { + it("includes exit code and reason when claude container status is available", () => { + const cs = [ + { + name: "claude", + state: { terminated: { exitCode: 137, reason: "OOMKilled" } }, + }, + ] as k8s.V1ContainerStatus[]; + const msg = describePodTerminatedError("mypod", "Failed", cs); + expect(msg).toContain("137"); + expect(msg).toContain("OOMKilled"); + expect(msg).toContain("phase=Failed"); + }); + + it("falls back to message field when reason is absent", () => { + const cs = [ + { + name: "claude", + state: { terminated: { exitCode: 1, message: "signal: killed" } }, + }, + ] as k8s.V1ContainerStatus[]; + const msg = describePodTerminatedError("mypod", "Failed", cs); + expect(msg).toContain("signal: killed"); + expect(msg).toContain("1"); + }); + + it("returns generic message when no claude container status is present", () => { + const msg = describePodTerminatedError("mypod", "Failed", []); + expect(msg).toBe("Pod mypod reached phase=Failed"); + }); + + it("ignores non-claude containers", () => { + const cs = [ + { + name: "sidecar", + state: { terminated: { exitCode: 0, reason: "Completed" } }, + }, + ] as k8s.V1ContainerStatus[]; + const msg = describePodTerminatedError("mypod", "Failed", cs); + expect(msg).toBe("Pod mypod reached phase=Failed"); + }); + + it("handles null exitCode gracefully", () => { + const cs = [ + { + name: "claude", + state: { terminated: { exitCode: null, reason: "Error" } }, + }, + ] as unknown as k8s.V1ContainerStatus[]; + const msg = describePodTerminatedError("mypod", "Failed", cs); + expect(msg).toContain("unknown"); + expect(msg).toContain("Error"); + }); +}); diff --git a/src/server/execute.ts b/src/server/execute.ts index bdaf1b8..026da2a 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -16,6 +16,10 @@ const POLL_INTERVAL_MS = 2000; const KEEPALIVE_INTERVAL_MS = 15_000; const LOG_STREAM_RECONNECT_DELAY_MS = 3_000; const MAX_LOG_RECONNECT_ATTEMPTS = 50; +// How long to keep refreshing onSpawn after the Job reaches a terminal state. +// Covers the cleanup path (delete job, parse stdout) so a slow K8s API call +// doesn't trip the 5-minute reaper staleness window. +const POST_TERMINAL_KEEPALIVE_MS = 90_000; /** * Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node. @@ -96,6 +100,27 @@ export function isReattachableOrphan( return true; } +/** + * Build an error message for a pod that reached phase=Failed before or + * instead of streaming logs. Includes the claude container's terminated exit + * code and reason when available so operators can diagnose crashes without + * needing kubectl. Exported for unit tests. + */ +export function describePodTerminatedError( + podName: string, + phase: string, + containerStatuses: k8s.V1ContainerStatus[], +): string { + const mainCs = containerStatuses.find((cs) => cs.name === "claude"); + const terminated = mainCs?.state?.terminated; + if (terminated) { + const code = terminated.exitCode ?? "unknown"; + const reason = terminated.reason ?? terminated.message ?? "no reason"; + return `Pod ${podName} reached phase=${phase}: claude exited ${code} (${reason})`; + } + return `Pod ${podName} reached phase=${phase}`; +} + /** * Wait for the Job's pod to reach a terminal or running state. * Returns the pod name once logs can be streamed, or throws on failure. @@ -147,15 +172,22 @@ async function waitForPod( for (const cs of containerStatuses) { if (cs.state?.waiting) details.push(`${cs.name}: waiting (${cs.state.waiting.reason ?? "unknown"})`); else if (cs.state?.running) details.push(`${cs.name}: running`); + else if (cs.state?.terminated) details.push(`${cs.name}: terminated (exit ${cs.state.terminated.exitCode ?? "?"}, ${cs.state.terminated.reason ?? "no reason"})`); } await onLog("stdout", `[paperclip] Pod ${podName}: ${details.join(", ")}\n`); lastStatus = statusKey; } // Ready to stream logs - if (phase === "Running" || phase === "Succeeded" || phase === "Failed") { + if (phase === "Running" || phase === "Succeeded") { return podName; } + // phase=Failed means the pod crashed before we could stream logs. + // Throwing here routes the caller into the error path with a structured + // message instead of entering the log-streaming path with a dead pod. + if (phase === "Failed") { + throw new Error(describePodTerminatedError(podName, phase, containerStatuses)); + } // Init containers done + main running (phase may still say Pending briefly) const allInitsDone = initStatuses.length > 0 && initStatuses.every( @@ -218,6 +250,7 @@ async function streamPodLogsOnce( kubeconfigPath?: string, sinceSeconds?: number, dedup?: LogLineDedupFilter, + stopSignal?: { stopped: boolean }, ): Promise { const logApi = getLogApi(kubeconfigPath); const chunks: string[] = []; @@ -235,6 +268,18 @@ async function streamPodLogsOnce( }, }); + // When the job completion signal fires, destroy the writable to abort the + // in-flight follow stream. Without this, logApi.log can hang indefinitely + // when the pod terminates without closing the HTTP connection cleanly. + let stopPoller: ReturnType | null = null; + if (stopSignal) { + stopPoller = setInterval(() => { + if (stopSignal.stopped && !writable.destroyed) { + writable.destroy(); + } + }, 200); + } + try { await logApi.log(namespace, podName, "claude", writable, { follow: true, @@ -242,8 +287,10 @@ async function streamPodLogsOnce( ...(sinceSeconds ? { sinceSeconds } : {}), }); } catch { - // follow may fail if the container already exited or the API - // connection dropped — not fatal, caller decides whether to retry. + // follow may fail if the container already exited, the API connection + // dropped, or we aborted via writable.destroy() — not fatal. + } finally { + if (stopPoller) clearInterval(stopPoller); } return chunks.join(""); @@ -293,7 +340,7 @@ async function streamPodLogs( } const preStreamTs = Math.floor(Date.now() / 1000); - const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup); + const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal); if (result) { allChunks.push(result); // Update last-received timestamp to now (the stream just ended, @@ -739,11 +786,27 @@ export async function execute(ctx: AdapterExecutionContext): Promise { // Fire-and-forget the async work; setInterval callbacks must be // synchronous or the timer will drift. void (async () => { - if (keepaliveJobTerminal) return; + if (keepaliveJobTerminal) { + // Post-terminal window: keep refreshing onSpawn during cleanup + // (job deletion, log parsing, K8s API calls) so the reaper doesn't + // fire a false process_lost while execute() is still running. + if ( + ctx.onSpawn && + keepaliveJobTerminalAt !== null && + Date.now() - keepaliveJobTerminalAt <= POST_TERMINAL_KEEPALIVE_MS + ) { + keepaliveTick++; + if (keepaliveTick % 6 === 0) { + void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {}); + } + } + return; + } // Verify the Job is still alive before announcing or refreshing. try { @@ -753,6 +816,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise