diff --git a/src/server/execute.ts b/src/server/execute.ts index 7d3f095..e0439a6 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -401,6 +401,7 @@ export async function streamPodLogsOnce( sinceSeconds?: number, dedup?: LogLineDedupFilter, stopSignal?: { stopped: boolean }, + activity?: { lastActiveAt: number }, ): Promise { const logApi = getLogApi(kubeconfigPath); const chunks: string[] = []; @@ -409,6 +410,13 @@ export async function streamPodLogsOnce( write(chunk: Buffer, _encoding, callback) { const text = chunk.toString("utf-8"); chunks.push(text); + // Refresh stream liveness on every chunk received from the container. + // This MUST happen here (not just after streamPodLogsOnce returns) — + // a streaming attempt that never disconnects can produce output for + // hours, and the grace timer in execute() will fire 30s after the + // FIRST disconnect even if a new long-running attempt is currently + // streaming, unless we keep this timestamp fresh per-chunk (FAR-107). + if (activity) activity.lastActiveAt = Date.now(); const emitted = dedup ? dedup.filter(text) : text; if (!emitted) { callback(); @@ -531,7 +539,7 @@ async function streamPodLogs( } const preStreamTs = Math.floor(Date.now() / 1000); - const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal); + const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal, activity); if (activity) activity.streamHasExited = true; if (result) { allChunks.push(result); @@ -1387,6 +1395,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise { // Only consider grace once the stream has exited at least once. // Until then we are still in the warm-up window and @@ -1395,21 +1404,41 @@ export async function execute(ctx: AdapterExecutionContext): Promise= LOG_EXIT_COMPLETION_GRACE_MS ) { - // Stop the grace poller immediately so we don't double-fire while the - // verification read below is in flight. - if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } - // The log stream exiting only means the container stopped producing - // output — it does NOT prove the Job was deleted. Verify Job - // presence with a one-shot read so we can distinguish: - // (a) Job 404 → truly gone (TTL or external deletion) - // (b) Job still present → K8s condition propagation lag (FAR-23) - // Without this check we mis-classify (b) as "deleted externally" and - // emit a false-positive k8s_job_deleted_externally error (FAR-107). + graceCheckInFlight = true; void (async () => { + try { + // Pod-phase gate (FAR-107): if the pod is still Running/Pending + // the container is alive — Claude can be silent for >30s during + // long tool calls (web fetches, slow upstream APIs). Refresh + // the stream-activity timer, leave the poller armed, and let + // waitForJobCompletion remain the authoritative signal. Only + // proceed with the grace settlement when the pod has actually + // reached a terminal phase or is gone. + const podLookup = await lookupPodState(namespace, jobName, kubeconfigPath); + if (!podLookup.podMissing && (podLookup.phase === "Running" || podLookup.phase === "Pending")) { + streamActivity.lastActiveAt = Date.now(); + graceCheckInFlight = false; + return; + } + } catch (err) { + await onLog("stderr", `[paperclip] grace gate: pod state lookup failed (${err instanceof Error ? err.message : String(err)}) — falling through to Job-presence check\n`).catch(() => {}); + } + // Pod is no longer Running — proceed with Job-presence verification. + // Stop the grace poller immediately so we don't double-fire while the + // verification read below is in flight. + if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } + // The log stream exiting only means the container stopped producing + // output — it does NOT prove the Job was deleted. Verify Job + // presence with a one-shot read so we can distinguish: + // (a) Job 404 → truly gone (TTL or external deletion) + // (b) Job still present → K8s condition propagation lag (FAR-23) + // Without this check we mis-classify (b) as "deleted externally" and + // emit a false-positive k8s_job_deleted_externally error (FAR-107). try { await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace }); await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {});