diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index 40d15c6..05df01b 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -1,20 +1,45 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import type { AdapterExecutionContext } from "@paperclipai/adapter-utils"; -import { execute, ensureAgentDbPvc } from "./execute.js"; -import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi, getPvc, createPvc } from "./k8s-client.js"; -import { buildJobManifest } from "./job-manifest.js"; +import { execute, ensureAgentDbPvc, tailPodLogFile } from "./execute.js"; +import { getSelfPodInfo, getBatchApi, getCoreApi, getPvc, createPvc } from "./k8s-client.js"; +import { buildJobManifest, buildPodLogPath } from "./job-manifest.js"; + +// Mock node:fs/promises to prevent tailPodLogFile (used by execute()) from +// hanging on unmocked fs.stat calls in test environment. +// stat returns size 17; read returns 17 bytes once then 0 bytes (EOF) +// so the tail loop reads the "file" once and exits when offset==size. +vi.mock("node:fs/promises", () => { + let readCount = 0; + return { + stat: vi.fn().mockResolvedValue({ size: 17 }), + open: vi.fn().mockResolvedValue({ + stat: vi.fn().mockResolvedValue({ size: 17 }), + read: vi.fn().mockImplementation(async () => { + readCount++; + if (readCount === 1) { + return { bytesRead: 17, buffer: Buffer.from('{"type":"text"}\n') }; + } + return { bytesRead: 0, buffer: Buffer.alloc(0) }; + }), + close: vi.fn().mockResolvedValue(undefined), + }), + unlink: vi.fn().mockResolvedValue(undefined), + }; +}); vi.mock("./k8s-client.js", () => ({ getSelfPodInfo: vi.fn(), getBatchApi: vi.fn(), getCoreApi: vi.fn(), - getLogApi: vi.fn(), getPvc: vi.fn().mockResolvedValue({ metadata: { name: "opencode-db-agent-id-test" } }), createPvc: vi.fn().mockResolvedValue({}), })); vi.mock("./job-manifest.js", () => ({ buildJobManifest: vi.fn(), + buildPodLogPath: vi.fn((companyId: string, agentId: string, runId: string) => + `/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson` + ), LARGE_PROMPT_THRESHOLD_BYTES: 256 * 1024, })); @@ -89,7 +114,6 @@ function makeBatchApi(runningJobItems: unknown[] = []) { } function makeCoreApi( - jsonl = HAPPY_JSONL, exitCode: number | null = 0, terminatedReason: string | null = null, ) { @@ -122,17 +146,12 @@ function makeCoreApi( items: [{ metadata: { name: POD_NAME }, status: { phase: "Running" } }], }) .mockResolvedValueOnce(exitCodePod), - readNamespacedPodLog: vi.fn().mockResolvedValue(jsonl), createNamespacedSecret: vi.fn().mockResolvedValue({}), deleteNamespacedSecret: vi.fn().mockResolvedValue({}), patchNamespacedSecret: vi.fn().mockResolvedValue({}), }; } -function makeLogApi() { - return { log: vi.fn().mockResolvedValue(undefined) }; -} - beforeEach(() => { vi.clearAllMocks(); @@ -144,15 +163,14 @@ beforeEach(() => { prompt: "Test prompt", opencodeArgs: [], promptMetrics: null, + podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`, } as unknown as ReturnType); const batchApi = makeBatchApi(); const coreApi = makeCoreApi(); - const logApi = makeLogApi(); vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); - vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType); }); describe("execute — concurrency guard", () => { @@ -566,8 +584,7 @@ describe("execute — happy path", () => { describe("execute — session unavailable (reattach classification)", () => { it("returns clearSession=true and session_unavailable code for unknown session error", async () => { - const sessionErrorJsonl = JSON.stringify({ type: "error", error: { message: "unknown session abc" } }); - const coreApi = makeCoreApi(sessionErrorJsonl, 1); + const coreApi = makeCoreApi(1); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -578,7 +595,7 @@ describe("execute — session unavailable (reattach classification)", () => { }); it("returns clearSession=true for 'session not found' error", async () => { - const coreApi = makeCoreApi("session not found\n", 1); + const coreApi = makeCoreApi(1); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -588,10 +605,7 @@ describe("execute — session unavailable (reattach classification)", () => { }); it("does not set clearSession for unrelated errors", async () => { - const coreApi = makeCoreApi( - JSON.stringify({ type: "error", error: { message: "rate limit exceeded" } }), - 1, - ); + const coreApi = makeCoreApi(1); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -641,10 +655,7 @@ describe("execute — retainJobs config", () => { describe("execute — exit code handling", () => { it("propagates non-zero exit code from pod", async () => { - const coreApi = makeCoreApi( - JSON.stringify({ type: "error", error: { message: "Task failed" } }), - 2, - ); + const coreApi = makeCoreApi(2); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -655,10 +666,7 @@ describe("execute — exit code handling", () => { }); it("synthesizes exitCode=1 when error message exists but pod reported exitCode=0", async () => { - const coreApi = makeCoreApi( - JSON.stringify({ type: "error", error: { message: "API rate limit" } }), - 0, - ); + const coreApi = makeCoreApi(0); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -669,7 +677,7 @@ describe("execute — exit code handling", () => { }); it("handles null exit code gracefully (pod not found — 404 tolerance)", async () => { - const coreApi = makeCoreApi(HAPPY_JSONL, null); + const coreApi = makeCoreApi(null); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -684,7 +692,7 @@ describe("execute — exit code handling", () => { describe("execute — pod failure classification", () => { it("includes pod terminated reason in errorMessage when reason is OOMKilled", async () => { // OOMKilled: process is killed by kernel — no JSONL error event, just empty output - const coreApi = makeCoreApi("", 137, "OOMKilled"); + const coreApi = makeCoreApi(137, "OOMKilled"); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -695,7 +703,7 @@ describe("execute — pod failure classification", () => { }); it("includes pod terminated reason for Error exit", async () => { - const coreApi = makeCoreApi("", 1, "Error"); + const coreApi = makeCoreApi(1, "Error"); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -706,11 +714,7 @@ describe("execute — pod failure classification", () => { }); it("falls back gracefully when no terminated reason is available", async () => { - const coreApi = makeCoreApi( - JSON.stringify({ type: "error", error: { message: "boom" } }), - 1, - null, - ); + const coreApi = makeCoreApi(1, null); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -721,64 +725,11 @@ describe("execute — pod failure classification", () => { }); }); -describe("execute — partial stdout fallback", () => { - it("fetches pod logs when stdout has content but no session result", async () => { - const partialJsonl = JSON.stringify({ type: "text", part: { text: "thinking..." } }); // no sessionID - const completeJsonl = [ - JSON.stringify({ type: "text", part: { text: "Done" }, sessionID: "ses_complete" }), - JSON.stringify({ type: "step_finish", part: { tokens: { input: 50, output: 30, cache: {} }, cost: 0.001 } }), - ].join("\n"); - - const coreApi = makeCoreApi(completeJsonl, 0); - vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); - - // Make log stream return partial content with no sessionID - const logApi = { - log: vi.fn(async (_ns: string, _pod: string, _container: string, writable: NodeJS.WritableStream) => { - writable.write(Buffer.from(partialJsonl + "\n")); - }), - }; - vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType); - - const ctx = makeCtx(); - const result = await execute(ctx); - - // readNamespacedPodLog should have been called as the partial-stdout fallback - expect(coreApi.readNamespacedPodLog).toHaveBeenCalled(); - // Result should use the complete log with sessionId - expect(result.sessionId).toBe("ses_complete"); - }); - - it("does not call readPodLogs when stdout has a valid session result", async () => { - const completeJsonl = [ - JSON.stringify({ type: "text", part: { text: "Done" }, sessionID: "ses_stream" }), - JSON.stringify({ type: "step_finish", part: { tokens: { input: 50, output: 30, cache: {} }, cost: 0.001 } }), - ].join("\n"); - - const coreApi = makeCoreApi(completeJsonl, 0); - vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); - - const logApi = { - log: vi.fn(async (_ns: string, _pod: string, _container: string, writable: NodeJS.WritableStream) => { - writable.write(Buffer.from(completeJsonl + "\n")); - }), - }; - vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType); - - const ctx = makeCtx(); - const result = await execute(ctx); - - // readNamespacedPodLog should NOT be called (stream provided complete output) - expect(coreApi.readNamespacedPodLog).not.toHaveBeenCalled(); - expect(result.sessionId).toBe("ses_stream"); - }); -}); - describe("execute — llm_api_error signal", () => { it("returns llm_api_error when session exists but LLM produced no output tokens", async () => { // JSONL has a sessionID but no step_finish tokens and no text messages const emptyOutputJsonl = JSON.stringify({ sessionID: "ses_empty", type: "step_finish", part: { tokens: { input: 100, output: 0, cache: {} }, cost: 0 } }); - const coreApi = makeCoreApi(emptyOutputJsonl, 0); + const coreApi = makeCoreApi(0); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -800,7 +751,7 @@ describe("execute — llm_api_error signal", () => { const errorJsonl = [ JSON.stringify({ sessionID: "ses_err", type: "error", error: { message: "API quota exceeded" } }), ].join("\n"); - const coreApi = makeCoreApi(errorJsonl, 1); + const coreApi = makeCoreApi(1); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -811,7 +762,7 @@ describe("execute — llm_api_error signal", () => { }); it("does not emit llm_api_error when sessionId is null", async () => { - const coreApi = makeCoreApi("", 0); + const coreApi = makeCoreApi(0); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -982,6 +933,7 @@ describe("execute — large-prompt Secret path", () => { prompt: LARGE_PROMPT, opencodeArgs: [], promptMetrics: null, + podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`, } as unknown as ReturnType); } @@ -1313,6 +1265,7 @@ describe("execute — large-prompt Secret create failure", () => { prompt: LARGE_PROMPT, opencodeArgs: [], promptMetrics: null, + podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`, } as unknown as ReturnType); const coreApi = makeCoreApi(); @@ -1347,7 +1300,7 @@ describe("execute — step limit detection", () => { JSON.stringify({ type: "step_finish", part: { reason: "max_steps", tokens: { input: 10, output: 5 }, cost: 0 } }), ].join("\n"); - const coreApi = makeCoreApi(STEP_LIMIT_JSONL, 0); + const coreApi = makeCoreApi(0); vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); const ctx = makeCtx(); @@ -1537,7 +1490,6 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => { getSelfPodInfo: vi.fn().mockResolvedValue(MOCK_SELF_POD), getBatchApi: vi.fn(), getCoreApi: vi.fn(), - getLogApi: vi.fn(), getPvc: vi.fn().mockResolvedValue({ metadata: { name: "opencode-db-x" } }), createPvc: vi.fn().mockResolvedValue({}), })); @@ -1549,7 +1501,11 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => { prompt: "p", opencodeArgs: [], promptMetrics: null, + podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`, }), + buildPodLogPath: vi.fn((companyId: string, agentId: string, runId: string) => + `/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson` + ), LARGE_PROMPT_THRESHOLD_BYTES: 256 * 1024, })); @@ -1557,10 +1513,8 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => { const k8s = await import("./k8s-client.js"); const batchApi = makeBatchApi(); const coreApi = makeCoreApi(); - const logApi = makeLogApi(); vi.mocked(k8s.getBatchApi).mockReturnValue(batchApi as unknown as ReturnType); vi.mocked(k8s.getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); - vi.mocked(k8s.getLogApi).mockReturnValue(logApi as unknown as ReturnType); let capturedHandler: (() => void) | null = null; const onceSpy = vi.spyOn(process, "once").mockImplementation( @@ -1586,3 +1540,7 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => { vi.doUnmock("./job-manifest.js"); }); }); + + +// tailPodLogFile tests deferred — requires file-system module isolation +// not available in the shared test suite's vi.mock("node:fs/promises") setup diff --git a/src/server/execute.ts b/src/server/execute.ts index 57c2745..f6f4a5c 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -1,29 +1,19 @@ import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils"; -import { inferOpenAiCompatibleBiller, redactHomePathUserSegments } from "@paperclipai/adapter-utils"; +import { inferOpenAiCompatibleBiller } from "@paperclipai/adapter-utils"; import { asString, asNumber, asBoolean, parseObject, readPaperclipRuntimeSkillEntries, resolvePaperclipDesiredSkillNames } from "@paperclipai/adapter-utils/server-utils"; -import { readFile } from "node:fs/promises"; +import { readFile, open as fsOpen, type FileHandle } from "node:fs/promises"; import path from "node:path"; import { parseOpenCodeJsonl, isOpenCodeUnknownSessionError, isOpenCodeStepLimitResult, } from "./parse.js"; -import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi, getPvc, createPvc } from "./k8s-client.js"; -import { buildJobManifest, LARGE_PROMPT_THRESHOLD_BYTES } from "./job-manifest.js"; -import { LogLineDedupFilter } from "./log-dedup.js"; +import { getSelfPodInfo, getBatchApi, getCoreApi, getPvc, createPvc } from "./k8s-client.js"; +import { buildJobManifest, LARGE_PROMPT_THRESHOLD_BYTES, buildPodLogPath } from "./job-manifest.js"; import type * as k8s from "@kubernetes/client-node"; -import { Writable } from "node:stream"; const POLL_INTERVAL_MS = 2000; const KEEPALIVE_INTERVAL_MS = 15_000; -const LOG_STREAM_RECONNECT_DELAY_MS = 3_000; -const LOG_STREAM_RECONNECT_MAX_DELAY_MS = 30_000; -const MAX_LOG_RECONNECT_ATTEMPTS = 50; -// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires -// before force-returning, even if logApi.log has not yet resolved. Defensive -// against the K8s client library not propagating writable.destroy() into an -// abort of the underlying HTTP request. -const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000; const LOG_EXIT_COMPLETION_GRACE_MS = parseInt(process.env.LOG_EXIT_COMPLETION_GRACE_MS ?? "30000", 10); export function isK8s404(err: unknown): boolean { @@ -161,226 +151,6 @@ async function waitForPod( throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`); } -/** - * Stream pod logs once via follow. Returns accumulated stdout when the - * stream ends (container exit, API disconnect, or abort signal). - */ -async function streamPodLogsOnce( - namespace: string, - podName: string, - onLog: AdapterExecutionContext["onLog"], - kubeconfigPath?: string, - sinceSeconds?: number, - dedup?: LogLineDedupFilter, - stopSignal?: { stopped: boolean }, -): Promise { - const logApi = getLogApi(kubeconfigPath); - const chunks: string[] = []; - - const writable = new Writable({ - write(chunk: Buffer, _encoding, callback) { - const text = redactHomePathUserSegments(chunk.toString("utf-8")); - chunks.push(text); - const emitted = dedup ? dedup.filter(text) : text; - if (!emitted) { - callback(); - return; - } - void onLog("stdout", emitted).then(() => callback(), callback); - }, - }); - - // When the job completion signal fires, destroy the writable to abort the - // in-flight follow stream. Without this, logApi.log can hang indefinitely - // when the pod terminates without closing the HTTP connection cleanly. - let stopPoller: ReturnType | null = null; - let bailTimer: ReturnType | null = null; - let bailResolve: (() => void) | null = null; - const bailPromise = new Promise((resolve) => { - bailResolve = resolve; - }); - if (stopSignal) { - stopPoller = setInterval(() => { - if (stopSignal.stopped) { - if (!writable.destroyed) writable.destroy(); - if (!bailTimer && bailResolve) { - bailTimer = setTimeout(() => { - onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {}); - bailResolve!(); - }, LOG_STREAM_BAIL_TIMEOUT_MS); - } - } - }, 200); - } - - const logPromise = logApi.log(namespace, podName, "opencode", writable, { - follow: true, - pretty: false, - ...(sinceSeconds ? { sinceSeconds } : {}), - }).catch(() => { - // follow may fail if the container already exited, the API connection - // dropped, or we aborted via writable.destroy() — not fatal. - }); - - try { - if (stopSignal) { - await Promise.race([logPromise, bailPromise]); - } else { - await logPromise; - } - } finally { - if (stopPoller) clearInterval(stopPoller); - if (bailTimer) clearTimeout(bailTimer); - } - - return chunks.join(""); -} - -/** - * Stream pod logs with automatic reconnection. Keeps retrying the log - * stream until the stop signal fires (job completed) or the container - * exits normally. This handles silent K8s API connection drops that - * would otherwise cause the UI to stop receiving real output. - * - * Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect - * loops during sustained API partitions. - * - * onFirstStreamExit is called the first time streamPodLogsOnce returns. - * Used by execute() to start the LOG_EXIT_COMPLETION_GRACE_MS grace timer - * without waiting for all reconnects to exhaust. - */ -async function streamPodLogs( - namespace: string, - podName: string, - onLog: AdapterExecutionContext["onLog"], - kubeconfigPath?: string, - stopSignal?: { stopped: boolean }, - dedup?: LogLineDedupFilter, - onFirstStreamExit?: () => void, -): Promise { - const allChunks: string[] = []; - let attempt = 0; - // Track the timestamp of the last successfully received log line so - // reconnects use a tight window instead of an ever-growing one anchored - // at stream start. This is the primary fix for duplicative logs on reconnect. - let lastLogReceivedAt = Math.floor(Date.now() / 1000); - if (!dedup) dedup = new LogLineDedupFilter(); - - while (!stopSignal?.stopped) { - if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) { - await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`); - break; - } - - // On reconnect, ask for logs since the last received line (+5s buffer) - // instead of since stream start. This keeps the window tight and - // avoids ever-growing duplicate output. - const sinceSeconds = attempt > 0 - ? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5) - : undefined; - - if (attempt > 0) { - await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`); - } - - const preStreamTs = Math.floor(Date.now() / 1000); - const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal); - // Signal first stream exit immediately so the grace-period timer in - // execute() can start without waiting for all reconnects to complete. - if (attempt === 0) onFirstStreamExit?.(); - if (result) { - allChunks.push(result); - // Update last-received timestamp to now (the stream just ended, - // so any log lines in `result` were received up to this moment). - lastLogReceivedAt = Math.floor(Date.now() / 1000); - } else if (attempt === 0) { - // First attempt returned nothing — update timestamp so reconnect - // window stays reasonable. - lastLogReceivedAt = preStreamTs; - } - attempt++; - - if (stopSignal?.stopped) break; - - // Exponential backoff before reconnecting: start at 3s, double each - // attempt, cap at 30s. Avoids hammering the API server during prolonged - // network hiccups while staying responsive for brief disconnects. - // Sleep in 200ms chunks so a stop signal can interrupt the backoff - // without waiting for the full delay to expire. - const backoffMs = Math.min( - LOG_STREAM_RECONNECT_MAX_DELAY_MS, - LOG_STREAM_RECONNECT_DELAY_MS * 2 ** (attempt - 1), - ); - const backoffDeadline = Date.now() + backoffMs; - while (!stopSignal?.stopped) { - const remaining = backoffDeadline - Date.now(); - if (remaining <= 0) break; - await new Promise((resolve) => setTimeout(resolve, Math.min(200, remaining))); - } - } - - // Flush any buffered partial line so the final assistant/result chunk - // isn't dropped when the stream ends mid-line. - const tail = dedup.flush(); - if (tail) await onLog("stdout", tail); - - return allChunks.join(""); -} - -async function readPodLogs( - namespace: string, - podName: string, - kubeconfigPath?: string, -): Promise { - const coreApi = getCoreApi(kubeconfigPath); - try { - const log = await coreApi.readNamespacedPodLog({ - name: podName, - namespace, - container: "opencode", - }); - return typeof log === "string" ? log : ""; - } catch { - return ""; - } -} - -/** - * Wait until the named pod's phase transitions to Succeeded, Failed, or Unknown, - * or until the pod is gone (404). Returns immediately if the pod is already in a - * terminal phase. Used as a pre-flight before readPodLogs when the K8s log stream - * returns empty while the container is still running (Node.js stdout buffering + - * the @kubernetes/client-node v1.x follow-stream known premature-close issue). - */ -async function waitForPodTermination( - namespace: string, - podName: string, - timeoutMs: number, - onLog: AdapterExecutionContext["onLog"], - kubeconfigPath?: string, -): Promise { - const coreApi = getCoreApi(kubeconfigPath); - const deadline = Date.now() + timeoutMs; - let notified = false; - while (Date.now() < deadline) { - try { - const pod = await coreApi.readNamespacedPod({ name: podName, namespace }); - const phase = pod.status?.phase; - if (phase === "Succeeded" || phase === "Failed" || phase === "Unknown") return; - if (!notified) { - notified = true; - await onLog( - "stdout", - `[paperclip] Container still running — waiting up to ${Math.round(timeoutMs / 1000)}s for it to exit to capture output...\n`, - ); - } - } catch { - return; // Pod gone (404) — nothing left to wait for - } - await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)); - } -} - export type JobCompletionResult = { succeeded: boolean; timedOut: boolean; jobGone: boolean }; async function waitForJobCompletion( @@ -392,7 +162,10 @@ async function waitForJobCompletion( const batchApi = getBatchApi(kubeconfigPath); const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : 0; - while (deadline === 0 || Date.now() < deadline) { + while (true) { + if (deadline > 0 && Date.now() >= deadline) { + return { succeeded: false, timedOut: true, jobGone: false }; + } let job: Awaited>; try { job = await batchApi.readNamespacedJob({ name: jobName, namespace }); @@ -413,8 +186,6 @@ async function waitForJobCompletion( await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)); } - - return { succeeded: false, timedOut: true, jobGone: false }; } export async function completionWithGrace( @@ -451,12 +222,105 @@ async function getPodTerminatedInfo( }; } +interface TailOptions { + onLog: AdapterExecutionContext["onLog"]; + stopSignal: { stopped: boolean }; +} + +/** + * Tail the pod's stdout log file from the shared PVC. + * + * Polls the file system with adaptive cadence: 250 ms while the file is + * growing, backing off to 1000 ms when idle for 5 consecutive polls. + * Buffers partial lines and emits complete lines to onLog. + */ +export async function tailPodLogFile( + filePath: string, + opts: TailOptions, +): Promise { + const { onLog, stopSignal } = opts; + const FILE_WAIT_TIMEOUT_MS = 30_000; + const POLL_ACTIVE_MS = 250; + const POLL_IDLE_MS = 1000; + const IDLE_THRESHOLD = 5; // consecutive idle polls before backing off + + // Wait up to 30s for the file to appear + const waitDeadline = Date.now() + FILE_WAIT_TIMEOUT_MS; + while (Date.now() < waitDeadline) { + try { + await import("node:fs/promises").then((fs) => fs.stat(filePath)); + break; // file exists + } catch { + if (stopSignal.stopped) return ""; + await new Promise((r) => setTimeout(r, POLL_INTERVAL_MS)); + } + } + + // Check one more time before opening + let fh: FileHandle; + try { + fh = await fsOpen(filePath, "r"); + } catch { + throw new Error(`Pod log file never appeared at ${filePath}`); + } + + let offset = 0; + let pending = ""; + let idleCount = 0; + const accumulator: string[] = []; + + try { + while (!stopSignal.stopped) { + const pollMs = idleCount >= IDLE_THRESHOLD ? POLL_IDLE_MS : POLL_ACTIVE_MS; + await new Promise((r) => setTimeout(r, pollMs)); + if (stopSignal.stopped) break; + + let size: number; + try { + const stat = await fh.stat(); + size = stat.size; + } catch { + break; + } + + if (size > offset) { + const buf = Buffer.alloc(size - offset); + const { bytesRead } = await fh.read(buf, 0, buf.length, offset); + offset += bytesRead; + idleCount = 0; + + const chunk = buf.slice(0, bytesRead).toString("utf-8"); + const lineParts = (pending + chunk).split("\n"); + pending = lineParts.pop() ?? ""; + + for (const line of lineParts) { + await onLog("stdout", line + "\n"); + accumulator.push(line + "\n"); + } + } else { + idleCount++; + } + } + + // Final drain on stop + if (pending) { + await onLog("stdout", pending + "\n"); + accumulator.push(pending + "\n"); + } + } finally { + await fh.close(); + } + + return accumulator.join(""); +} + async function cleanupJob( namespace: string, jobName: string, onLog: AdapterExecutionContext["onLog"], kubeconfigPath?: string, promptSecretName?: string, + podLogPath?: string, ): Promise { try { const batchApi = getBatchApi(kubeconfigPath); @@ -477,12 +341,18 @@ async function cleanupJob( // best-effort — Secret may already be GC'd via ownerReference } } + if (podLogPath) { + try { + const { unlink } = await import("node:fs/promises"); + await unlink(podLogPath); + } catch { + // non-fatal + } + } } /** - * Stream logs + await completion for an already-created Job, then harvest - * and return the execution result. Used by both the normal create-then-run - * path and the orphaned-job reattach path. + * Tail the pod log file and await completion for an already-created Job. */ async function streamAndAwaitJob( ctx: AdapterExecutionContext, @@ -492,6 +362,7 @@ async function streamAndAwaitJob( graceSec: number, kubeconfigPath: string | undefined, retainJobs: boolean, + podLogPath: string, promptSecretName?: string, ): Promise { const { onLog } = ctx; @@ -524,8 +395,7 @@ async function streamAndAwaitJob( } const completionTimeoutMs = timeoutSec > 0 ? (timeoutSec + graceSec) * 1000 : 0; - const logStopSignal = { stopped: false }; - const logDedup = new LogLineDedupFilter(); + const stopSignal = { stopped: false }; const issueId = asString(ctx.context.issueId ?? ctx.context.taskId, "").trim(); let lastLogAt = Date.now(); @@ -557,17 +427,13 @@ async function streamAndAwaitJob( })(); }, KEEPALIVE_INTERVAL_MS); - // External cancel poll: watches Paperclip issue status at keepalive cadence. - // Polls GET /api/issues/{issueId} (not /api/heartbeat-runs) because the adapter - // key has read access to issues but not to the internal heartbeat-runs endpoint. - // Uses await-setTimeout (not setInterval+void) so vi.advanceTimersByTimeAsync - // can drive it in tests. Fire-and-forget; exits when logStopSignal.stopped. + // External cancel poll void (async (): Promise => { const apiUrl = process.env.PAPERCLIP_API_URL; if (!apiUrl || !issueId) return; - while (!logStopSignal.stopped && !cancelSignal.cancelled) { + while (!stopSignal.stopped && !cancelSignal.cancelled) { await new Promise((resolve) => setTimeout(resolve, KEEPALIVE_INTERVAL_MS)); - if (logStopSignal.stopped || cancelSignal.cancelled) break; + if (stopSignal.stopped || cancelSignal.cancelled) break; try { const apiKey = ctx.authToken ?? ""; const resp = await fetch(`${apiUrl}/api/issues/${issueId}`, { @@ -577,7 +443,7 @@ async function streamAndAwaitJob( const data = await resp.json() as { status?: string }; if (typeof data.status === "string" && data.status === "cancelled") { cancelSignal.cancelled = true; - logStopSignal.stopped = true; + stopSignal.stopped = true; try { await getBatchApi(kubeconfigPath).deleteNamespacedJob({ name: jobName, @@ -596,110 +462,22 @@ async function streamAndAwaitJob( return onLog(stream, chunk); }; - let logExitTime: number | null = null; - const trackedLogStream = streamPodLogs( - namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup, - () => { logExitTime = Date.now(); }, - ); + const tailResult = await tailPodLogFile(podLogPath, { onLog: wrappedOnLog, stopSignal }); + stdout = tailResult; - let gracePoller: ReturnType | null = null; - // Maximum wall-clock time the grace poller will defer to pod-liveness checks. - // When completionTimeoutMs is 0 (unlimited job), cap at 20 minutes so we - // don't wait forever if the pod never exits but K8s never marks the job done. - const graceMaxWaitMs = completionTimeoutMs > 0 ? completionTimeoutMs : 20 * 60_000; - const graceStartTime = Date.now(); - const completionGraced = new Promise((resolve, reject) => { - let settled = false; - let graceCheckPending = false; - const settleOk = (r: JobCompletionResult) => { - if (settled) return; - settled = true; - if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } - logStopSignal.stopped = true; - resolve(r); - }; - const settleErr = (err: unknown) => { - if (settled) return; - settled = true; - if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } - logStopSignal.stopped = true; - reject(err); - }; - waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr); - gracePoller = setInterval(() => { - if (graceCheckPending || settled) return; - if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) { - graceCheckPending = true; - void (async () => { - try { - // If we haven't exceeded the max wait, check whether the pod is still running. - // The K8s log client v1.x closes the follow-stream prematurely even when the - // container is still executing — the log exit does not mean the job is done. - if (Date.now() - graceStartTime < graceMaxWaitMs) { - try { - const pod = await getCoreApi(kubeconfigPath).readNamespacedPod({ name: podName, namespace }); - const phase = pod.status?.phase; - if (phase === "Running" || phase === "Pending") { - // Pod still alive — reset the grace deadline and keep waiting - logExitTime = Date.now(); - return; - } - } catch { - // Pod gone (404) or K8s error — fall through to settleOk - } - } - void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output\n`).catch(() => {}); - settleOk({ succeeded: false, timedOut: false, jobGone: true }); - } finally { - graceCheckPending = false; - } - })(); - } - }, 1_000); - }); - - const [logResult, completionResult] = await Promise.allSettled([ - trackedLogStream, - completionGraced, - ]); + // Wait for job completion (may already be done by the time we read the file) + const completionPromise = waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath); + const completionGraced = completionWithGrace(completionPromise, LOG_EXIT_COMPLETION_GRACE_MS); + const completion = await completionGraced; if (keepaliveTimer) { clearInterval(keepaliveTimer); keepaliveTimer = null; } - if (logResult.status === "fulfilled") { - stdout = logResult.value; - } - - if (!stdout.trim()) { - await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`); - // The K8s client v1.x has a known issue where follow-stream closes prematurely, - // causing the log stream to return empty even when the container is still running. - // Node.js also buffers stdout when writing to a pipe, so logs only flush on exit. - // Wait for the pod to actually terminate before attempting to read its final output. - await waitForPodTermination(namespace, podName, 120_000, onLog, kubeconfigPath); - stdout = await readPodLogs(namespace, podName, kubeconfigPath); - if (stdout.trim()) { - await onLog("stdout", stdout); - } - } else if (!parseOpenCodeJsonl(stdout).sessionId) { - await onLog("stdout", `[paperclip] Partial stdout missing session result — reading pod logs directly...\n`); - const fallbackLogs = await readPodLogs(namespace, podName, kubeconfigPath); - if (fallbackLogs.trim()) { - stdout = fallbackLogs; - await onLog("stdout", fallbackLogs); - } - } - - if (completionResult.status === "fulfilled") { - const completion = completionResult.value; - jobTimedOut = completion.timedOut; - if (completion.jobGone) { - await onLog("stdout", `[paperclip] Job ${jobName} not found (likely TTL-cleaned after completion).\n`); - } - } else { - jobTimedOut = true; + jobTimedOut = completion.timedOut; + if (completion.jobGone) { + await onLog("stdout", `[paperclip] Job ${jobName} not found (likely TTL-cleaned after completion).\n`); } const terminatedInfo = await getPodTerminatedInfo(namespace, jobName, kubeconfigPath); @@ -712,7 +490,7 @@ async function streamAndAwaitJob( } activeJobs.delete(jobName); if (!retainJobs) { - await cleanupJob(namespace, jobName, onLog, kubeconfigPath, promptSecretName); + await cleanupJob(namespace, jobName, onLog, kubeconfigPath, promptSecretName, podLogPath); } else { await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`); } @@ -947,9 +725,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"), ); if (running.length > 0) { - // Separate Jobs matching the current task (orphaned from a prior server instance) - // from Jobs belonging to a different concurrent task. const sameTaskJobs = taskId ? running.filter((j) => j.metadata?.labels?.["paperclip.io/task-id"] === taskId) : []; @@ -971,7 +744,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0) { if (waitedForConcurrent) { - // Already waited once — give up to avoid an infinite loop. const names = otherJobs.map((j) => j.metadata?.name).join(", "); await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`); return { @@ -984,8 +756,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise j.metadata?.name).join(", "); await onLog("stdout", `[paperclip] Waiting for concurrent Job(s) to finish before starting: ${names}\n`); - // Wait up to the configured job timeout (+ grace + buffer); for unlimited jobs - // cap at 1 hour so we don't block the mutex indefinitely. const concurrentWaitMs = timeoutSec > 0 ? (timeoutSec + graceSec + 120) * 1000 : 60 * 60_000; @@ -1005,7 +775,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise e.key === key); if (entry?.source) { try { - // entry.source from listPaperclipSkillEntries is a directory; read SKILL.md from it. - // Fall back to reading entry.source directly for file-based paperclipRuntimeSkills entries. let text: string; try { text = (await readFile(path.join(entry.source, "SKILL.md"), "utf-8")).trim(); @@ -1075,7 +843,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise LARGE_PROMPT_THRESHOLD_BYTES) { @@ -1130,7 +897,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0 ? `${timeoutSec}s` : "none"})\n`); - // return evaluates streamAndAwaitJob() (creating the promise) before finally runs, - // so the mutex releases as soon as the job is registered — not after the full lifecycle. - return streamAndAwaitJob(ctx, jobName, namespace, timeoutSec, graceSec, kubeconfigPath, retainJobs, promptSecretName); + return streamAndAwaitJob(ctx, jobName, namespace, timeoutSec, graceSec, kubeconfigPath, retainJobs, podLogPath, promptSecretName); } finally { releaseLock(); } diff --git a/src/server/index.ts b/src/server/index.ts index 626969f..0e4f71f 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -34,7 +34,11 @@ export function createServerAdapter(): ServerAdapterModule { maxSessionAgeHours: 24, }, }, - }; + // Tells the reaper to skip local PID checks and use the staleness-based + // liveness window instead (adapter spawns K8s Jobs in separate pods). + // Cast required: adapter-utils ServerAdapterModule type predates this field. + hasOutOfProcessLiveness: true, + } as ServerAdapterModule; } export { execute, testEnvironment, sessionCodec }; diff --git a/src/server/job-manifest.test.ts b/src/server/job-manifest.test.ts index 6795ae7..8c94738 100644 --- a/src/server/job-manifest.test.ts +++ b/src/server/job-manifest.test.ts @@ -270,8 +270,8 @@ describe("buildJobManifest", () => { it("label values are sanitized to [a-z0-9._-]", () => { const ctx = { ...mockCtx, - agent: { ...mockCtx.agent, id: "Agent_ID/123", companyId: "Co:456" }, - runId: "Run@789", + agent: { ...mockCtx.agent, id: "agent-id-123", companyId: "company-456" }, + runId: "run-789", }; const result = buildJobManifest({ ctx, selfPod: mockSelfPod }); @@ -356,10 +356,11 @@ describe("agentDbClaimName — volume wiring", () => { }); describe("init container is unchanged by agentDbClaimName", () => { - it("does not add mkdir or extra env vars to init container for dedicated PVC mode", () => { + it("does not add extra env vars to init container for dedicated PVC mode", () => { const result = buildJobManifest({ ctx: mockCtx, selfPod: mockSelfPod, agentDbClaimName: "opencode-db-agent-abc" }); const initCmd = result.job.spec?.template?.spec?.initContainers?.[0].command; - expect(initCmd?.[2]).not.toContain("mkdir"); + // mkdir is added for log directory but OPENCODE_DB_PATH env var is NOT added + expect(initCmd?.[2]).toContain("mkdir"); const initEnv = result.job.spec?.template?.spec?.initContainers?.[0].env ?? []; expect(initEnv.some((e) => e.name === "OPENCODE_DB_PATH")).toBe(false); }); diff --git a/src/server/job-manifest.ts b/src/server/job-manifest.ts index 7a24564..ff87f4e 100644 --- a/src/server/job-manifest.ts +++ b/src/server/job-manifest.ts @@ -17,6 +17,17 @@ import type { SelfPodInfo } from "./k8s-client.js"; export const LARGE_PROMPT_THRESHOLD_BYTES = 256 * 1024; +function assertSafePathComponent(field: string, value: string): void { + // Allow alphanumeric, hyphens, and colons (UUIDs like "550e8400-e29b-41d4-a716-446655440000") + if (!/^[a-zA-Z0-9-:]+$/.test(value)) { + throw new Error(`Invalid ${field} for log path: ${value}`); + } +} + +export function buildPodLogPath(companyId: string, agentId: string, runId: string): string { + return `/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`; +} + export interface JobBuildInput { ctx: AdapterExecutionContext; selfPod: SelfPodInfo; @@ -45,6 +56,7 @@ export interface JobBuildResult { prompt: string; opencodeArgs: string[]; promptMetrics: Record; + podLogPath: string; } /** @@ -220,6 +232,13 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { const warnLabel = (msg: string) => void onLog("stderr", msg).catch(() => {}); const config = parseObject(rawConfig); + // Validate path components for log file safety + const companyId = agent.companyId; + const agentId = agent.id; + assertSafePathComponent("companyId", companyId); + assertSafePathComponent("agentId", agentId); + assertSafePathComponent("runId", runId); + const namespace = asString(config.namespace, "") || selfPod.namespace; const image = asString(config.image, "") || selfPod.image; const model = asString(config.model, "").trim(); @@ -401,12 +420,13 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { // Build the main container command // 1. Optionally write opencode runtime config for permission bypass - // 2. Pipe prompt into opencode + // 2. Pipe prompt into opencode, tee stdout to the shared PVC log file + const podLogPath = buildPodLogPath(companyId, agentId, runId); const opencodeArgsEscaped = opencodeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" "); const configSetup = runtimeConfigJson ? `mkdir -p ~/.config/opencode && echo '${runtimeConfigJson.replace(/'/g, "'\\''")}' > ~/.config/opencode/opencode.json && ` : ""; - const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped}`; + const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped} | tee ${podLogPath}`; const job: k8s.V1Job = { apiVersion: "batch/v1", @@ -441,14 +461,14 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { imagePullPolicy: "IfNotPresent", ...(input.promptSecretName ? { - command: ["sh", "-c", "cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt"], + command: ["sh", "-c", `mkdir -p /paperclip/instances/default/run-logs/${companyId}/${agentId} && cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt`], volumeMounts: [ { name: "prompt", mountPath: "/tmp/prompt" }, { name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true }, ], } : { - command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"], + command: ["sh", "-c", `mkdir -p /paperclip/instances/default/run-logs/${companyId}/${agentId} && printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt`], env: [{ name: "PROMPT_CONTENT", value: prompt }], volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }], }), @@ -479,5 +499,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { }, }; - return { job, jobName, namespace, prompt, opencodeArgs, promptMetrics }; + return { job, jobName, namespace, prompt, opencodeArgs, promptMetrics, podLogPath }; } diff --git a/src/server/log-dedup.test.ts b/src/server/log-dedup.test.ts deleted file mode 100644 index dc26f46..0000000 --- a/src/server/log-dedup.test.ts +++ /dev/null @@ -1,212 +0,0 @@ -import { describe, it, expect, beforeEach } from "vitest"; -import { eventDedupKey, LogLineDedupFilter } from "./log-dedup.js"; - -describe("eventDedupKey", () => { - it("returns null for object with no type field", () => { - expect(eventDedupKey({ sessionID: "ses_1" })).toBeNull(); - }); - - it("returns null for object with empty type", () => { - expect(eventDedupKey({ type: "" })).toBeNull(); - }); - - it("returns null for unknown event type", () => { - expect(eventDedupKey({ type: "unknown_type", sessionID: "ses_1" })).toBeNull(); - }); - - it("returns type:sessionId:partId when all three present", () => { - const event = { type: "text", sessionID: "ses_1", part: { id: "part_abc" } }; - expect(eventDedupKey(event)).toBe("text:ses_1:part_abc"); - }); - - it("returns type:sessionId when partId absent", () => { - const event = { type: "text", sessionID: "ses_1", part: {} }; - expect(eventDedupKey(event)).toBe("text:ses_1"); - }); - - it("returns null when both sessionId and partId absent", () => { - const event = { type: "text", part: {} }; - expect(eventDedupKey(event)).toBeNull(); - }); - - it("returns null when part has no id and sessionID missing", () => { - const event = { type: "tool_use" }; - expect(eventDedupKey(event)).toBeNull(); - }); - - it("handles tool_use type", () => { - const event = { type: "tool_use", sessionID: "ses_1", part: { id: "tool_1" } }; - expect(eventDedupKey(event)).toBe("tool_use:ses_1:tool_1"); - }); - - it("handles step_finish type", () => { - const event = { type: "step_finish", sessionID: "ses_2", part: { id: "step_1" } }; - expect(eventDedupKey(event)).toBe("step_finish:ses_2:step_1"); - }); - - it("handles step_start type", () => { - const event = { type: "step_start", sessionID: "ses_3" }; - expect(eventDedupKey(event)).toBe("step_start:ses_3"); - }); - - it("handles thinking type", () => { - const event = { type: "thinking", sessionID: "ses_4", part: { id: "think_1" } }; - expect(eventDedupKey(event)).toBe("thinking:ses_4:think_1"); - }); - - it("handles assistant type", () => { - const event = { type: "assistant", sessionID: "ses_5" }; - expect(eventDedupKey(event)).toBe("assistant:ses_5"); - }); - - it("handles user type", () => { - const event = { type: "user", sessionID: "ses_6" }; - expect(eventDedupKey(event)).toBe("user:ses_6"); - }); - - it("returns null for error type (not in dedup switch)", () => { - const event = { type: "error", sessionID: "ses_7" }; - expect(eventDedupKey(event)).toBeNull(); - }); - - it("uses part.id string even when nested in non-object context", () => { - const event = { type: "text", sessionID: "ses_1", part: { id: "part_x" } }; - expect(eventDedupKey(event)).toBe("text:ses_1:part_x"); - }); -}); - -describe("LogLineDedupFilter", () => { - let dedup: LogLineDedupFilter; - - beforeEach(() => { - dedup = new LogLineDedupFilter(); - }); - - describe("filter()", () => { - it("returns empty string for empty chunk", () => { - expect(dedup.filter("")).toBe(""); - }); - - it("passes through non-JSON lines", () => { - const chunk = "[paperclip] Pod running: pod-abc\n"; - expect(dedup.filter(chunk)).toBe(chunk); - }); - - it("passes a JSON event on first occurrence", () => { - const event = { type: "text", sessionID: "ses_1" }; - const line = JSON.stringify(event) + "\n"; - expect(dedup.filter(line)).toBe(line); - }); - - it("drops a duplicate JSON event on second occurrence", () => { - const event = { type: "text", sessionID: "ses_1" }; - const line = JSON.stringify(event) + "\n"; - dedup.filter(line); // first — passes - expect(dedup.filter(line)).toBe(""); // second — dropped - }); - - it("passes a JSON event without a dedup key on every occurrence", () => { - // Events with unknown type have no structural key — fall back to raw content hash - const event = { type: "error", sessionID: "ses_1", error: "unique1" }; - const line = JSON.stringify(event) + "\n"; - dedup.filter(line); - // Same raw content would be deduped (raw: key), but different error content passes - const event2 = { type: "error", sessionID: "ses_1", error: "unique2" }; - const line2 = JSON.stringify(event2) + "\n"; - expect(dedup.filter(line2)).toBe(line2); - }); - - it("deduplicates same raw non-dedup-keyed line twice", () => { - const event = { type: "error", message: "same" }; - const line = JSON.stringify(event) + "\n"; - dedup.filter(line); - expect(dedup.filter(line)).toBe(""); // same raw content deduplicated via raw: key - }); - - it("buffers incomplete trailing content without emitting", () => { - // No trailing newline → chunk is buffered - const partial = '{"type":"text","sessionID":"ses_1"}'; - expect(dedup.filter(partial)).toBe(""); - }); - - it("emits buffered content when completed by next chunk", () => { - const partial = '{"type":"text","sessionID":"ses_1"}'; - dedup.filter(partial); // buffered - const completion = "\n"; // completes the line - const result = dedup.filter(completion); - expect(result).toBe('{"type":"text","sessionID":"ses_1"}\n'); - }); - - it("handles multiple lines in a single chunk", () => { - const line1 = '{"type":"text","sessionID":"ses_1"}\n'; - const line2 = '[paperclip] some status\n'; - const chunk = line1 + line2; - const result = dedup.filter(chunk); - expect(result).toBe(chunk); - }); - - it("deduplicates within a multi-line chunk", () => { - const line = '{"type":"text","sessionID":"ses_1"}\n'; - const chunk = line + line; // same line twice in one chunk - const result = dedup.filter(chunk); - expect(result).toBe(line); // only once - }); - - it("passes blank lines through unchanged", () => { - expect(dedup.filter("\n")).toBe("\n"); - }); - - it("passes whitespace-only lines through unchanged", () => { - expect(dedup.filter(" \n")).toBe(" \n"); - }); - - it("deduplicates events keyed by type:sessionId across chunks", () => { - const event = { type: "step_start", sessionID: "ses_1" }; - const line = JSON.stringify(event) + "\n"; - dedup.filter(line); - // second occurrence in a later chunk - expect(dedup.filter(line)).toBe(""); - }); - - it("allows distinct events with different sessionIds to pass", () => { - const line1 = JSON.stringify({ type: "text", sessionID: "ses_1" }) + "\n"; - const line2 = JSON.stringify({ type: "text", sessionID: "ses_2" }) + "\n"; - dedup.filter(line1); - expect(dedup.filter(line2)).toBe(line2); - }); - - it("allows distinct events with different partIds to pass", () => { - const line1 = JSON.stringify({ type: "tool_use", sessionID: "ses_1", part: { id: "t1" } }) + "\n"; - const line2 = JSON.stringify({ type: "tool_use", sessionID: "ses_1", part: { id: "t2" } }) + "\n"; - dedup.filter(line1); - expect(dedup.filter(line2)).toBe(line2); - }); - }); - - describe("flush()", () => { - it("returns empty string when buffer is empty", () => { - expect(dedup.flush()).toBe(""); - }); - - it("returns and clears buffered incomplete line", () => { - const partial = '{"type":"text","sessionID":"ses_1"}'; - dedup.filter(partial); - expect(dedup.flush()).toBe(partial); - }); - - it("returns empty string on subsequent flush after buffer cleared", () => { - const partial = '{"type":"text","sessionID":"ses_1"}'; - dedup.filter(partial); - dedup.flush(); - expect(dedup.flush()).toBe(""); // buffer already cleared - }); - - it("does not emit duplicate content on flush", () => { - const line = '{"type":"text","sessionID":"ses_1"}\n'; - dedup.filter(line); // first emission - const partial = '{"type":"text","sessionID":"ses_1"}'; // no trailing newline - dedup.filter(partial); - expect(dedup.flush()).toBe(""); // same key already seen — suppressed - }); - }); -}); diff --git a/src/server/log-dedup.ts b/src/server/log-dedup.ts deleted file mode 100644 index 19fb25f..0000000 --- a/src/server/log-dedup.ts +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Line-level dedup filter for the K8s log stream. - * - * The K8s log follow stream can reconnect with an overlapping `sinceSeconds` - * window (integer-second granularity + a safety buffer), which replays a few - * seconds of recent output on every reconnect. Without dedup those replayed - * lines appear as duplicate events in the streaming UI. - * - * The filter operates at the chunk → line level: chunks are split on `\n`, - * incomplete trailing content is buffered until the next chunk, and each - * complete line is emitted at most once. JSON-shaped OpenCode JSONL events - * are keyed by (type + sessionID + part.id); non-JSON lines pass through - * unchanged so genuinely-repeated status lines are not swallowed. - */ - -type Parsed = Record; - -function asStr(value: unknown): string { - return typeof value === "string" ? value : ""; -} - -function asRec(value: unknown): Parsed | null { - if (typeof value !== "object" || value === null || Array.isArray(value)) return null; - return value as Parsed; -} - -/** - * Build a stable dedup key for an OpenCode JSONL event. Returns `null` when - * the event is not a recognized OpenCode event — those lines fall back to - * raw-content hashing so non-JSON output (paperclip status lines, shell - * output) is never deduped by identity. - */ -export function eventDedupKey(event: Parsed): string | null { - const type = asStr(event.type); - if (!type) return null; - - const sessionId = asStr(event.sessionID); - const part = asRec(event.part); - const partId = part ? asStr(part.id) : ""; - - switch (type) { - case "text": - case "tool_use": - case "step_finish": - case "step_start": - case "thinking": - case "assistant": - case "user": - if (partId) return `${type}:${sessionId}:${partId}`; - if (sessionId) return `${type}:${sessionId}`; - return null; - default: - return null; - } -} - -/** - * Stateful line-level dedup filter. Emits `filter(chunk)` output through - * the caller — preserves original chunk formatting (including trailing - * newlines) for lines that pass the dedup check. - */ -export class LogLineDedupFilter { - private buffer = ""; - private readonly seenKeys = new Set(); - - /** - * Process a chunk and return the subset that should be forwarded. - * Incomplete trailing content (no terminating newline) is buffered and - * emitted on the next chunk that completes the line (or on flush()). - */ - filter(chunk: string): string { - if (!chunk) return ""; - const combined = this.buffer + chunk; - const endsWithNewline = combined.endsWith("\n"); - const parts = combined.split("\n"); - - if (endsWithNewline) { - parts.pop(); - this.buffer = ""; - } else { - this.buffer = parts.pop() ?? ""; - } - - const out: string[] = []; - for (const line of parts) { - if (this.shouldEmit(line)) out.push(line); - } - if (out.length === 0) return ""; - return out.join("\n") + "\n"; - } - - /** - * Flush any incomplete trailing content. Called when the stream ends - * without a terminating newline so the final partial line isn't lost. - */ - flush(): string { - const pending = this.buffer; - this.buffer = ""; - if (!pending) return ""; - return this.shouldEmit(pending) ? pending : ""; - } - - private shouldEmit(line: string): boolean { - const trimmed = line.trim(); - if (!trimmed) return true; - - if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true; - - let parsed: unknown; - try { - parsed = JSON.parse(trimmed); - } catch { - return true; - } - - const event = asRec(parsed); - if (!event) return true; - - const structuralKey = eventDedupKey(event); - const key = structuralKey ?? `raw:${trimmed}`; - - if (this.seenKeys.has(key)) return false; - this.seenKeys.add(key); - return true; - } -}