diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index b4f435a..8d7b8a7 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -1512,6 +1512,54 @@ describe("execute: log-stream-exit grace period (FAR-23)", () => { // (grace did not fire, real completion arrived) expect(result.errorMessage).toBeNull(); }); + + it("does NOT fire grace when stream drops mid-output and reconnects with more output (FAR-107)", async () => { + // Reproduces Nancy / Privileged Escalation symptom: the K8s log API drops + // the streaming connection mid-run; streamPodLogs reconnects and the + // container is still producing. Before the fix, the grace timer was + // armed on first stream exit and fired 30s later regardless of whether + // output had resumed, surfacing claude_truncated even though the pod was + // still phase=Running. + let attemptIndex = 0; + mockLogFn.mockImplementation( + async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => { + if (attemptIndex === 0) { + // Stream a partial init line then "drop" the connection without a + // result event — this is the transient API disconnect. + writable.write(JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_test123" }) + "\n"); + attemptIndex++; + return; + } + // Reconnect produces the rest of the stream including the result event. + writable.write(CLAUDE_HAPPY_OUTPUT); + }, + ); + // Job condition arrives only after the reconnect produces output, well + // beyond the 30s grace window; the old code would have grace-fired at + // ~30s and treated the run as truncated. + let readJobCalls = 0; + mockBatchReadJob.mockImplementation(async () => { + readJobCalls++; + // Stay non-terminal until the reconnect has had time to run and the + // grace window has fully elapsed since the FIRST disconnect. + if (readJobCalls < 25) return { status: { conditions: [] } }; + return { status: { conditions: [{ type: "Complete", status: "True" }] } }; + }); + + const executePromise = execute(makeCtx()); + // t=3000: first reconnect sleep fires → second streamPodLogsOnce attempt + await vi.advanceTimersByTimeAsync(3_100); + // Drive past the old (buggy) 30s grace boundary without firing real completion + await vi.advanceTimersByTimeAsync(35_000); + // Then let the Job's Complete condition land + await vi.advanceTimersByTimeAsync(20_000); + const result = await executePromise; + + // Run completed normally — grace must not have falsely truncated it. + expect(result.exitCode).toBe(0); + expect(result.errorCode).toBeUndefined(); + expect(result.sessionId).toBe("sess_test123"); + }, 80_000); }); // ─── execute: concurrency guard — multiple orphan sorting ──────────────────── diff --git a/src/server/execute.ts b/src/server/execute.ts index 640465e..7d3f095 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -481,10 +481,18 @@ export async function streamPodLogsOnce( * Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect * loops during sustained API partitions. * - * onFirstStreamExit is called the first time streamPodLogsOnce returns - * (container has exited or stream disconnected). Used by execute() to - * start the LOG_EXIT_COMPLETION_GRACE_MS grace timer (FAR-23) without - * waiting for all reconnects to exhaust. + * `activity` tracks stream liveness so execute()'s grace timer can + * distinguish a transient K8s log-API reconnect from a real container + * exit (FAR-107). Two signals: + * - `streamHasExited` becomes true on the first return from + * streamPodLogsOnce. Until then we are still in the warm-up window + * and waitForJobCompletion is the authoritative signal — grace must + * not fire. + * - `lastActiveAt` advances every time a streamPodLogsOnce attempt + * returns non-empty output (the container is still producing). + * The grace timer fires only once GRACE_MS have passed since the + * last chunk, so output that resumes after a transient drop keeps + * the run alive. */ async function streamPodLogs( namespace: string, @@ -493,7 +501,7 @@ async function streamPodLogs( kubeconfigPath?: string, stopSignal?: { stopped: boolean }, dedup?: LogLineDedupFilter, - onFirstStreamExit?: () => void, + activity?: { lastActiveAt: number; streamHasExited: boolean }, ): Promise { const allChunks: string[] = []; let attempt = 0; @@ -524,14 +532,15 @@ async function streamPodLogs( const preStreamTs = Math.floor(Date.now() / 1000); const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal); - // Signal first stream exit immediately so the grace-period timer in - // execute() can start without waiting for all reconnects to complete. - if (attempt === 0) onFirstStreamExit?.(); + if (activity) activity.streamHasExited = true; if (result) { allChunks.push(result); // Update last-received timestamp to now (the stream just ended, // so any log lines in `result` were received up to this moment). lastLogReceivedAt = Math.floor(Date.now() / 1000); + // Refresh stream liveness so the grace timer in execute() does not + // fire while output is still flowing through reconnects (FAR-107). + if (activity) activity.lastActiveAt = Date.now(); } else if (attempt === 0) { // First attempt returned nothing — update timestamp so reconnect // window stays reasonable. @@ -1340,17 +1349,16 @@ export async function execute(ctx: AdapterExecutionContext): Promise { logExitTime = Date.now(); }, + streamActivity, ); // completionWithGrace races waitForJobCompletion against a grace timer @@ -1380,7 +1388,17 @@ export async function execute(ctx: AdapterExecutionContext): Promise { - if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) { + // Only consider grace once the stream has exited at least once. + // Until then we are still in the warm-up window and + // waitForJobCompletion is the authoritative signal. Once the + // stream has exited, fire only after GRACE_MS of inactivity + // measured against the last received chunk — output that resumes + // through a reconnect resets the clock so transient drops do not + // truncate live runs (FAR-107). + if ( + streamActivity.streamHasExited && + Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS + ) { // Stop the grace poller immediately so we don't double-fire while the // verification read below is in flight. if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }