diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index 2d76a6c..cd72b6b 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -1102,6 +1102,95 @@ describe("execute: waitForPod edge cases", () => { }); }); +// ─── execute: grace-period fallback (FAR-23) ───────────────────────────────── + +describe("execute: log-stream-exit grace period (FAR-23)", () => { + // Tests verify that execute() resolves within the grace window even when + // waitForJobCompletion keeps polling after the log stream exits (K8s + // condition propagation lag). + beforeEach(() => { + vi.resetAllMocks(); + vi.useFakeTimers(); + mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); + mockBatchListJobs.mockResolvedValue({ items: [] }); + mockPrepareBundle.mockResolvedValue(makeBundle()); + mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } }); + mockBatchPatchJob.mockResolvedValue({}); + mockBatchDeleteJob.mockResolvedValue({}); + mockCoreDeleteSecret.mockResolvedValue({}); + mockCoreListPods + .mockResolvedValueOnce({ + items: [{ + metadata: { name: "pod-abc" }, + status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, + }], + }) + .mockResolvedValue({ + items: [{ + metadata: { name: "pod-abc" }, + status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] }, + }], + }); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("resolves via grace (jobGone) when log stream exits but job condition never arrives", async () => { + // logApi.log returns immediately (container exited) — log stream exits on first attempt. + mockLogFn.mockImplementation(async () => {}); + // One-shot read returns full Claude output (no reconnects needed for output) + mockCoreReadPodLog.mockResolvedValue(CLAUDE_HAPPY_OUTPUT); + // waitForJobCompletion never detects terminal — simulates K8s condition lag. + mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); // never terminal + + // No timeoutSec → completionTimeoutMs=0 → polls indefinitely without grace. + const executePromise = execute(makeCtx()); + + // Advance past: + // ~4200ms of readPaperclipRuntimeSkillEntries real I/O (multiple small advances) + // 3100ms: streamPodLogs reconnect sleep (attempt=1, stopSignal still false at entry) + // + 30100ms: grace period (LOG_EXIT_COMPLETION_GRACE_MS = 30s) + // + 3100ms: streamPodLogs bail timer (stopSignal was set by grace → bail fires) + await vi.advanceTimersByTimeAsync(2_100); + await vi.advanceTimersByTimeAsync(1_100); + await vi.advanceTimersByTimeAsync(1_000); + await vi.advanceTimersByTimeAsync(3_100); // reconnect sleep + await vi.advanceTimersByTimeAsync(30_100); // grace fires + await vi.advanceTimersByTimeAsync(3_500); // bail timer + margin + const result = await executePromise; + + // Grace fires → jobGone=true → execute proceeds with one-shot logs → success + expect(result.exitCode).toBe(0); + expect(result.sessionId).toBe("sess_test123"); + expect(mockCoreReadPodLog).toHaveBeenCalled(); + }, 60_000); + + it("resolves promptly via real completion when job condition arrives before grace", async () => { + // Log stream exits immediately then job condition arrives well within the grace period. + mockLogFn.mockImplementation( + async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => { + writable.write(CLAUDE_HAPPY_OUTPUT); + }, + ); + // Job condition appears quickly (< 30s grace period) + mockBatchReadJob.mockResolvedValue({ + status: { conditions: [{ type: "Complete", status: "True" }] }, + }); + + const executePromise = execute(makeCtx()); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.exitCode).toBe(0); + expect(result.sessionId).toBe("sess_test123"); + // One-shot fallback should NOT be needed since the stream captured full output + // (grace did not fire, real completion arrived) + expect(result.errorMessage).toBeNull(); + }); +}); + // ─── execute: concurrency guard — multiple orphan sorting ──────────────────── describe("execute: concurrency guard — multiple orphans", () => { diff --git a/src/server/execute.ts b/src/server/execute.ts index 354d502..67e6c8d 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -35,6 +35,13 @@ const POST_TERMINAL_KEEPALIVE_MS = 90_000; // against the K8s client library not propagating writable.destroy() into an // abort of the underlying HTTP request. const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000; +// After the log stream exits (container stopped producing output), wait this +// long for the K8s Job condition to be confirmed before treating the job as +// done. K8s Job conditions can lag pod exit by several seconds or more under +// cluster load. Without this bound, waitForJobCompletion keeps polling while +// streamPodLogs keeps reconnecting — together they can hold execute() open for +// minutes, causing stale "running" status in the UI (FAR-23). +const LOG_EXIT_COMPLETION_GRACE_MS = 30_000; /** * Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node. @@ -357,6 +364,11 @@ export async function streamPodLogsOnce( * * Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect * loops during sustained API partitions. + * + * onFirstStreamExit is called the first time streamPodLogsOnce returns + * (container has exited or stream disconnected). Used by execute() to + * start the LOG_EXIT_COMPLETION_GRACE_MS grace timer (FAR-23) without + * waiting for all reconnects to exhaust. */ async function streamPodLogs( namespace: string, @@ -365,6 +377,7 @@ async function streamPodLogs( kubeconfigPath?: string, stopSignal?: { stopped: boolean }, dedup?: LogLineDedupFilter, + onFirstStreamExit?: () => void, ): Promise { const allChunks: string[] = []; let attempt = 0; @@ -395,6 +408,9 @@ async function streamPodLogs( const preStreamTs = Math.floor(Date.now() / 1000); const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal); + // Signal first stream exit immediately so the grace-period timer in + // execute() can start without waiting for all reconnects to complete. + if (attempt === 0) onFirstStreamExit?.(); if (result) { allChunks.push(result); // Update last-received timestamp to now (the stream just ended, @@ -1063,12 +1079,56 @@ export async function execute(ctx: AdapterExecutionContext): Promise { + // Track when the log stream first exits so the grace-period can fire + // if the K8s Job condition lags behind container exit (FAR-23). + // Set via onFirstStreamExit callback (called after attempt=0 returns) + // rather than in .then() of streamPodLogs, which would create a + // deadlock: streamPodLogs only resolves after stopSignal is set, but + // stopSignal is set by the grace timer which needs logExitTime to be + // non-null. + let logExitTime: number | null = null; + const trackedLogStream = streamPodLogs( + namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup, + () => { logExitTime = Date.now(); }, + ); + + // completionWithGrace races waitForJobCompletion against a grace timer + // that fires LOG_EXIT_COMPLETION_GRACE_MS after the log stream exits. + // This bounds the stale-UI window when K8s Job conditions lag container + // exit (FAR-23): without it, waitForJobCompletion polls indefinitely + // while streamPodLogs reconnects, holding execute() open for minutes. + // logStopSignal.stopped is set on every settled path (fulfilled, rejected, + // or grace) so streamPodLogs stops reconnecting promptly. + type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean }; + let gracePoller: ReturnType | null = null; + const completionWithGrace = new Promise((resolve, reject) => { + let settled = false; + const settleOk = (r: CompletionResult) => { + if (settled) return; + settled = true; + if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } logStopSignal.stopped = true; - return r; - }), + resolve(r); + }; + const settleErr = (err: unknown) => { + if (settled) return; + settled = true; + if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } + logStopSignal.stopped = true; + reject(err); + }; + waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr); + gracePoller = setInterval(() => { + if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) { + void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output (FAR-23)\n`).catch(() => {}); + settleOk({ succeeded: false, timedOut: false, jobGone: true }); + } + }, 1_000); + }); + + const [logResult, completionResult] = await Promise.allSettled([ + trackedLogStream, + completionWithGrace, ]); // Stop the keepalive immediately once the job has reached a terminal