diff --git a/src/cli/format-event.test.ts b/src/cli/format-event.test.ts new file mode 100644 index 0000000..40847ea --- /dev/null +++ b/src/cli/format-event.test.ts @@ -0,0 +1,250 @@ +import { describe, it, expect } from "vitest"; +import { formatEvent } from "./format-event.js"; + +describe("formatEvent", () => { + describe("empty / non-JSON input", () => { + it("returns empty string for empty line", () => { + expect(formatEvent("", false)).toBe(""); + }); + + it("returns empty string for whitespace-only line", () => { + expect(formatEvent(" ", false)).toBe(""); + }); + + it("returns non-JSON line as-is (trimmed)", () => { + expect(formatEvent("plain text output", false)).toBe("plain text output"); + }); + + it("trims whitespace from non-JSON lines", () => { + expect(formatEvent(" trimmed ", false)).toBe("trimmed"); + }); + }); + + describe("step_start", () => { + it("returns empty string in normal mode", () => { + const line = JSON.stringify({ type: "step_start", sessionID: "ses_1" }); + expect(formatEvent(line, false)).toBe(""); + }); + + it("returns [step_start] with session in debug mode", () => { + const line = JSON.stringify({ type: "step_start", sessionID: "ses_1" }); + expect(formatEvent(line, true)).toBe("[step_start] session=ses_1"); + }); + + it("returns [step_start] without session suffix when sessionID absent in debug mode", () => { + const line = JSON.stringify({ type: "step_start" }); + expect(formatEvent(line, true)).toBe("[step_start]"); + }); + }); + + describe("text", () => { + it("returns text content", () => { + const line = JSON.stringify({ type: "text", part: { text: "Hello world" } }); + expect(formatEvent(line, false)).toBe("Hello world"); + }); + + it("returns trimmed text", () => { + const line = JSON.stringify({ type: "text", part: { text: " trimmed " } }); + expect(formatEvent(line, false)).toBe("trimmed"); + }); + + it("returns empty string for empty text field", () => { + const line = JSON.stringify({ type: "text", part: { text: "" } }); + expect(formatEvent(line, false)).toBe(""); + }); + + it("returns same output in debug mode", () => { + const line = JSON.stringify({ type: "text", part: { text: "Debug output" } }); + expect(formatEvent(line, true)).toBe("Debug output"); + }); + }); + + describe("tool_use", () => { + it("returns empty for normal tool_use in non-debug mode", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", state: { status: "pending", description: "ls" } }, + }); + expect(formatEvent(line, false)).toBe(""); + }); + + it("returns empty for completed tool_use in non-debug mode", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", state: { status: "completed", output: "result" } }, + }); + expect(formatEvent(line, false)).toBe(""); + }); + + it("returns warning with ⚠ prefix for tool error in non-debug mode", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", state: { status: "error", error: "Command failed" } }, + }); + expect(formatEvent(line, false)).toBe("⚠ Command failed"); + }); + + it("returns empty for tool error with empty error field in non-debug mode", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", state: { status: "error", error: "" } }, + }); + expect(formatEvent(line, false)).toBe(""); + }); + + it("returns debug info including tool name and status in debug mode", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "grep", state: { status: "completed", description: "search files" } }, + }); + const result = formatEvent(line, true); + expect(result).toContain("[tool:grep]"); + expect(result).toContain("completed"); + expect(result).toContain("search files"); + }); + + it("appends output snippet in debug mode", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", state: { status: "completed", output: "output result here" } }, + }); + const result = formatEvent(line, true); + expect(result).toContain("output result here"); + }); + + it("appends error in debug mode", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", state: { status: "error", error: "exit code 1" } }, + }); + const result = formatEvent(line, true); + expect(result).toContain("✗ exit code 1"); + }); + }); + + describe("step_finish", () => { + it("returns message when provided", () => { + const line = JSON.stringify({ + type: "step_finish", + part: { message: "Task complete", reason: "end_turn" }, + }); + expect(formatEvent(line, false)).toBe("Task complete"); + }); + + it("returns fallback with reason when message is empty", () => { + const line = JSON.stringify({ + type: "step_finish", + part: { reason: "end_turn", message: "" }, + }); + expect(formatEvent(line, false)).toBe("[step_finish] end_turn"); + }); + + it("returns fallback with empty reason when both message and reason absent", () => { + const line = JSON.stringify({ type: "step_finish", part: {} }); + expect(formatEvent(line, false)).toBe("[step_finish] "); + }); + + it("appends token count when non-zero", () => { + const line = JSON.stringify({ + type: "step_finish", + part: { message: "Done", tokens: { total: 500 }, cost: 0 }, + }); + const result = formatEvent(line, false); + expect(result).toContain("tokens=500"); + }); + + it("appends cost when non-zero", () => { + const line = JSON.stringify({ + type: "step_finish", + part: { message: "Done", tokens: { total: 0 }, cost: 0.0025 }, + }); + const result = formatEvent(line, false); + expect(result).toContain("cost$0.0025"); + }); + + it("appends both tokens and cost when both non-zero", () => { + const line = JSON.stringify({ + type: "step_finish", + part: { message: "Done", tokens: { total: 300 }, cost: 0.001 }, + }); + const result = formatEvent(line, false); + expect(result).toContain("tokens=300"); + expect(result).toContain("cost$0.0010"); + }); + + it("omits metrics suffix when tokens and cost are zero", () => { + const line = JSON.stringify({ + type: "step_finish", + part: { message: "Done", tokens: { total: 0 }, cost: 0 }, + }); + expect(formatEvent(line, false)).toBe("Done"); + }); + }); + + describe("error", () => { + it("returns error message with ✗ prefix", () => { + const line = JSON.stringify({ type: "error", error: { message: "Something failed" } }); + expect(formatEvent(line, false)).toBe("✗ Something failed"); + }); + + it("returns ✗ prefix with string error", () => { + const line = JSON.stringify({ type: "error", message: "Direct error" }); + const result = formatEvent(line, false); + expect(result).toContain("✗"); + }); + + it("returns empty string for error with no extractable text", () => { + const line = JSON.stringify({ type: "error" }); + const result = formatEvent(line, false); + expect(typeof result).toBe("string"); + }); + }); + + describe("assistant", () => { + it("returns nested text content", () => { + const line = JSON.stringify({ + type: "assistant", + part: { message: { content: [{ type: "text", text: "Assistant response" }] } }, + }); + expect(formatEvent(line, false)).toBe("Assistant response"); + }); + + it("returns trimmed nested text", () => { + const line = JSON.stringify({ + type: "assistant", + part: { message: { content: [{ type: "text", text: " Trimmed " }] } }, + }); + expect(formatEvent(line, false)).toBe("Trimmed"); + }); + + it("returns empty for non-text content blocks", () => { + const line = JSON.stringify({ + type: "assistant", + part: { message: { content: [{ type: "tool_use" }] } }, + }); + expect(formatEvent(line, false)).toBe(""); + }); + + it("returns empty for assistant with no content", () => { + const line = JSON.stringify({ type: "assistant", part: {} }); + expect(formatEvent(line, false)).toBe(""); + }); + }); + + describe("unknown types", () => { + it("returns empty string for unknown type in non-debug mode", () => { + const line = JSON.stringify({ type: "some_unknown_type", data: {} }); + expect(formatEvent(line, false)).toBe(""); + }); + + it("returns [type] for unknown type in debug mode", () => { + const line = JSON.stringify({ type: "some_unknown_type" }); + expect(formatEvent(line, true)).toBe("[some_unknown_type]"); + }); + + it("returns empty string for JSON with no type in non-debug mode", () => { + const line = JSON.stringify({ sessionID: "ses_123" }); + expect(formatEvent(line, false)).toBe(""); + }); + }); +}); diff --git a/src/cli/format-event.ts b/src/cli/format-event.ts index 636a97f..efbd82c 100644 --- a/src/cli/format-event.ts +++ b/src/cli/format-event.ts @@ -133,7 +133,7 @@ export function formatEvent(line: string, debug: boolean): string { } case "error": { - const text = errorText(event).trim(); + const text = errorText(event.error ?? event.message ?? event).trim(); if (text) return `✗ ${text}`; return ""; } diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index 7eb479b..b012197 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -23,6 +23,8 @@ const MOCK_SELF_POD = { pvcClaimName: null, secretVolumes: [], inheritedEnv: {}, + inheritedEnvValueFrom: [], + inheritedEnvFrom: [], }; const MOCK_JOB = { diff --git a/src/server/execute.ts b/src/server/execute.ts index bac9fa2..9dd029f 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -9,10 +9,19 @@ import { } from "./parse.js"; import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js"; import { buildJobManifest } from "./job-manifest.js"; +import { LogLineDedupFilter } from "./log-dedup.js"; import type * as k8s from "@kubernetes/client-node"; import { Writable } from "node:stream"; const POLL_INTERVAL_MS = 2000; +const KEEPALIVE_INTERVAL_MS = 15_000; +const LOG_STREAM_RECONNECT_DELAY_MS = 3_000; +const MAX_LOG_RECONNECT_ATTEMPTS = 50; +// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires +// before force-returning, even if logApi.log has not yet resolved. Defensive +// against the K8s client library not propagating writable.destroy() into an +// abort of the underlying HTTP request. +const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000; const LOG_EXIT_COMPLETION_GRACE_MS = parseInt(process.env.LOG_EXIT_COMPLETION_GRACE_MS ?? "30000", 10); export function isK8s404(err: unknown): boolean { @@ -132,55 +141,157 @@ async function waitForPod( throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`); } +/** + * Stream pod logs once via follow. Returns accumulated stdout when the + * stream ends (container exit, API disconnect, or abort signal). + */ +async function streamPodLogsOnce( + namespace: string, + podName: string, + onLog: AdapterExecutionContext["onLog"], + kubeconfigPath?: string, + sinceSeconds?: number, + dedup?: LogLineDedupFilter, + stopSignal?: { stopped: boolean }, +): Promise { + const logApi = getLogApi(kubeconfigPath); + const chunks: string[] = []; + + const writable = new Writable({ + write(chunk: Buffer, _encoding, callback) { + const text = redactHomePathUserSegments(chunk.toString("utf-8")); + chunks.push(text); + const emitted = dedup ? dedup.filter(text) : text; + if (!emitted) { + callback(); + return; + } + void onLog("stdout", emitted).then(() => callback(), callback); + }, + }); + + // When the job completion signal fires, destroy the writable to abort the + // in-flight follow stream. Without this, logApi.log can hang indefinitely + // when the pod terminates without closing the HTTP connection cleanly. + let stopPoller: ReturnType | null = null; + let bailTimer: ReturnType | null = null; + let bailResolve: (() => void) | null = null; + const bailPromise = new Promise((resolve) => { + bailResolve = resolve; + }); + if (stopSignal) { + stopPoller = setInterval(() => { + if (stopSignal.stopped) { + if (!writable.destroyed) writable.destroy(); + if (!bailTimer && bailResolve) { + bailTimer = setTimeout(() => { + onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {}); + bailResolve!(); + }, LOG_STREAM_BAIL_TIMEOUT_MS); + } + } + }, 200); + } + + const logPromise = logApi.log(namespace, podName, "opencode", writable, { + follow: true, + pretty: false, + ...(sinceSeconds ? { sinceSeconds } : {}), + }).catch(() => { + // follow may fail if the container already exited, the API connection + // dropped, or we aborted via writable.destroy() — not fatal. + }); + + try { + if (stopSignal) { + await Promise.race([logPromise, bailPromise]); + } else { + await logPromise; + } + } finally { + if (stopPoller) clearInterval(stopPoller); + if (bailTimer) clearTimeout(bailTimer); + } + + return chunks.join(""); +} + +/** + * Stream pod logs with automatic reconnection. Keeps retrying the log + * stream until the stop signal fires (job completed) or the container + * exits normally. This handles silent K8s API connection drops that + * would otherwise cause the UI to stop receiving real output. + * + * Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect + * loops during sustained API partitions. + * + * onFirstStreamExit is called the first time streamPodLogsOnce returns. + * Used by execute() to start the LOG_EXIT_COMPLETION_GRACE_MS grace timer + * without waiting for all reconnects to exhaust. + */ async function streamPodLogs( namespace: string, podName: string, onLog: AdapterExecutionContext["onLog"], kubeconfigPath?: string, + stopSignal?: { stopped: boolean }, + dedup?: LogLineDedupFilter, + onFirstStreamExit?: () => void, ): Promise { - const logApi = getLogApi(kubeconfigPath); - const parts: string[] = []; - let lineBuffer = ""; + const allChunks: string[] = []; + let attempt = 0; + // Track the timestamp of the last successfully received log line so + // reconnects use a tight window instead of an ever-growing one anchored + // at stream start. This is the primary fix for duplicative logs on reconnect. + let lastLogReceivedAt = Math.floor(Date.now() / 1000); + if (!dedup) dedup = new LogLineDedupFilter(); - const writable = new Writable({ - write(chunk: Buffer, _encoding, callback) { - const incoming = lineBuffer + chunk.toString("utf-8"); - const nlIdx = incoming.lastIndexOf("\n"); - if (nlIdx === -1) { - // No complete line yet — buffer until newline arrives - lineBuffer = incoming; - callback(); - return; - } - lineBuffer = incoming.slice(nlIdx + 1); - // Redact each complete line individually to avoid path splits across chunk boundaries - const redacted = incoming - .slice(0, nlIdx + 1) - .split("\n") - .map((line) => redactHomePathUserSegments(line)) - .join("\n"); - parts.push(redacted); - void onLog("stdout", redacted).then(() => callback(), callback); - }, - }); + while (!stopSignal?.stopped) { + if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) { + await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`); + break; + } - try { - await logApi.log(namespace, podName, "opencode", writable, { - follow: true, - pretty: false, - }); - } catch { - // follow may fail if the container already exited + // On reconnect, ask for logs since the last received line (+5s buffer) + // instead of since stream start. This keeps the window tight and + // avoids ever-growing duplicate output. + const sinceSeconds = attempt > 0 + ? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5) + : undefined; + + if (attempt > 0) { + await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`); + } + + const preStreamTs = Math.floor(Date.now() / 1000); + const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal); + // Signal first stream exit immediately so the grace-period timer in + // execute() can start without waiting for all reconnects to complete. + if (attempt === 0) onFirstStreamExit?.(); + if (result) { + allChunks.push(result); + // Update last-received timestamp to now (the stream just ended, + // so any log lines in `result` were received up to this moment). + lastLogReceivedAt = Math.floor(Date.now() / 1000); + } else if (attempt === 0) { + // First attempt returned nothing — update timestamp so reconnect + // window stays reasonable. + lastLogReceivedAt = preStreamTs; + } + attempt++; + + if (stopSignal?.stopped) break; + + // Brief pause before reconnecting to avoid tight loops. + await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS)); } - // Flush any partial line that never received a trailing newline - if (lineBuffer) { - const redacted = redactHomePathUserSegments(lineBuffer); - parts.push(redacted); - await onLog("stdout", redacted); - } + // Flush any buffered partial line so the final assistant/result chunk + // isn't dropped when the stream ends mid-line. + const tail = dedup.flush(); + if (tail) await onLog("stdout", tail); - return parts.join(""); + return allChunks.join(""); } async function readPodLogs( @@ -201,7 +312,7 @@ async function readPodLogs( } } -type JobCompletionResult = { succeeded: boolean; timedOut: boolean; jobGone: boolean }; +export type JobCompletionResult = { succeeded: boolean; timedOut: boolean; jobGone: boolean }; async function waitForJobCompletion( namespace: string, @@ -406,6 +517,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise | null = null; try { const scheduleTimeoutMs = 120_000; @@ -427,10 +539,101 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0 ? (timeoutSec + graceSec) * 1000 : 0; - // Start completion poller in parallel with log streaming - const completionPromise = waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath); + // Shared stop signal: set to true when job completion is detected so + // the log stream stops reconnecting promptly. + const logStopSignal = { stopped: false }; + // Shared dedup filter across reconnects so replayed lines inside the + // sinceSeconds overlap window are dropped before reaching the UI. + const logDedup = new LogLineDedupFilter(); - stdout = await streamPodLogs(namespace, podName, onLog, kubeconfigPath); + // Keepalive: periodically emit a status line so the Paperclip server + // knows the adapter is still alive during long silent phases. + let lastLogAt = Date.now(); + let keepaliveJobTerminal = false; + let consecutiveTerminalReadings = 0; + keepaliveTimer = setInterval(() => { + void (async () => { + if (keepaliveJobTerminal) return; + + // Require two consecutive terminal readings before latching to + // guard against a stale K8s API cache returning a false terminal + // status on a single read. + try { + const j = await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace }); + const terminal = j.status?.conditions?.some( + (c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True", + ); + if (terminal) { + consecutiveTerminalReadings++; + if (consecutiveTerminalReadings >= 2) keepaliveJobTerminal = true; + return; + } + consecutiveTerminalReadings = 0; + } catch { + return; + } + + const silenceSec = Math.round((Date.now() - lastLogAt) / 1000); + void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {}); + })(); + }, KEEPALIVE_INTERVAL_MS); + + // wrappedOnLog updates lastLogAt so the keepalive timer can measure silence. + const wrappedOnLog: typeof onLog = async (stream, chunk) => { + lastLogAt = Date.now(); + return onLog(stream, chunk); + }; + + // Track when the log stream first exits so the grace-period can fire + // if the K8s Job condition lags behind container exit. + let logExitTime: number | null = null; + const trackedLogStream = streamPodLogs( + namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup, + () => { logExitTime = Date.now(); }, + ); + + // completionGraced races waitForJobCompletion against a grace timer that + // fires LOG_EXIT_COMPLETION_GRACE_MS after the log stream exits. This bounds + // the stale-UI window when K8s Job conditions lag container exit. + let gracePoller: ReturnType | null = null; + const completionGraced = new Promise((resolve, reject) => { + let settled = false; + const settleOk = (r: JobCompletionResult) => { + if (settled) return; + settled = true; + if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } + logStopSignal.stopped = true; + resolve(r); + }; + const settleErr = (err: unknown) => { + if (settled) return; + settled = true; + if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } + logStopSignal.stopped = true; + reject(err); + }; + waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr); + gracePoller = setInterval(() => { + if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) { + void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output\n`).catch(() => {}); + settleOk({ succeeded: false, timedOut: false, jobGone: true }); + } + }, 1_000); + }); + + const [logResult, completionResult] = await Promise.allSettled([ + trackedLogStream, + completionGraced, + ]); + + if (keepaliveTimer) { + clearInterval(keepaliveTimer); + keepaliveTimer = null; + } + + if (logResult.status === "fulfilled") { + stdout = logResult.value; + } if (!stdout.trim()) { await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`); @@ -448,19 +651,24 @@ export async function execute(ctx: AdapterExecutionContext): Promise { expect(result.job.spec?.template?.spec?.nodeSelector).toEqual({ "kubernetes.io/arch": "amd64" }); }); + + it("forwards inheritedEnvValueFrom entries onto the opencode container env", () => { + const selfPod = { + ...mockSelfPod, + inheritedEnvValueFrom: [ + { name: "MY_SECRET", valueFrom: { secretKeyRef: { name: "my-secret", key: "token" } } }, + ], + }; + const result = buildJobManifest({ ctx: mockCtx, selfPod }); + + const env = result.job.spec?.template?.spec?.containers?.[0].env ?? []; + const secretEnv = env.find((e) => e.name === "MY_SECRET"); + expect(secretEnv?.valueFrom?.secretKeyRef?.name).toBe("my-secret"); + expect(secretEnv?.valueFrom?.secretKeyRef?.key).toBe("token"); + }); + + it("does not duplicate an inheritedEnvValueFrom entry if the name is already set as a literal", () => { + const selfPod = { + ...mockSelfPod, + inheritedEnv: { HOME: "/custom" }, + inheritedEnvValueFrom: [ + { name: "HOME", valueFrom: { secretKeyRef: { name: "s", key: "k" } } }, + ], + }; + const result = buildJobManifest({ ctx: mockCtx, selfPod }); + + const env = result.job.spec?.template?.spec?.containers?.[0].env ?? []; + const homeEntries = env.filter((e) => e.name === "HOME"); + // HOME is overridden by merged (HOME=/paperclip hardcoded last), so valueFrom must not appear + expect(homeEntries.every((e) => e.value !== undefined)).toBe(true); + }); + + it("forwards inheritedEnvFrom onto the opencode container envFrom", () => { + const selfPod = { + ...mockSelfPod, + inheritedEnvFrom: [{ secretRef: { name: "my-config-secret" } }], + }; + const result = buildJobManifest({ ctx: mockCtx, selfPod }); + + const container = result.job.spec?.template?.spec?.containers?.[0]; + expect(container?.envFrom).toEqual([{ secretRef: { name: "my-config-secret" } }]); + }); + + it("omits envFrom when inheritedEnvFrom is empty", () => { + const result = buildJobManifest({ ctx: mockCtx, selfPod: mockSelfPod }); + + const container = result.job.spec?.template?.spec?.containers?.[0]; + expect(container?.envFrom).toBeUndefined(); + }); }); diff --git a/src/server/job-manifest.ts b/src/server/job-manifest.ts index 8043693..c00002f 100644 --- a/src/server/job-manifest.ts +++ b/src/server/job-manifest.ts @@ -14,6 +14,8 @@ import { } from "@paperclipai/adapter-utils/server-utils"; import type { SelfPodInfo } from "./k8s-client.js"; +export const LARGE_PROMPT_THRESHOLD_BYTES = 256 * 1024; + export interface JobBuildInput { ctx: AdapterExecutionContext; selfPod: SelfPodInfo; @@ -21,6 +23,12 @@ export interface JobBuildInput { instructionsContent?: string; /** Concatenated content of desired skill markdown files, prepended after instructions. */ skillsBundleContent?: string; + /** + * When set, the prompt is stored in this K8s Secret (already created by the caller) + * and the init container mounts and copies it instead of using an env var. + * Required when the prompt exceeds LARGE_PROMPT_THRESHOLD_BYTES. + */ + promptSecretName?: string; } export interface JobBuildResult { @@ -157,12 +165,19 @@ function buildEnvVars( merged.OPENCODE_DISABLE_PROJECT_CONFIG = "true"; merged.HOME = "/paperclip"; - // Convert to V1EnvVar array + // Convert literal-value vars to V1EnvVar array const envVars: k8s.V1EnvVar[] = Object.entries(merged).map(([name, value]) => ({ name, value, })); + // Append valueFrom vars (Secret/ConfigMap-backed) only for names not already overridden + for (const envVar of selfPod.inheritedEnvValueFrom) { + if (!Object.prototype.hasOwnProperty.call(merged, envVar.name)) { + envVars.push({ name: envVar.name, valueFrom: envVar.valueFrom }); + } + } + return envVars; } @@ -293,6 +308,10 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { const volumes: k8s.V1Volume[] = [{ name: "prompt", emptyDir: {} }]; const volumeMounts: k8s.V1VolumeMount[] = [{ name: "prompt", mountPath: "/tmp/prompt" }]; + if (input.promptSecretName) { + volumes.push({ name: "prompt-secret", secret: { secretName: input.promptSecretName } }); + } + if (selfPod.pvcClaimName) { volumes.push({ name: "data", @@ -370,9 +389,19 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { name: "write-prompt", image: "busybox:1.36", imagePullPolicy: "IfNotPresent", - command: ["sh", "-c", "echo \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"], - env: [{ name: "PROMPT_CONTENT", value: prompt }], - volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }], + ...(input.promptSecretName + ? { + command: ["sh", "-c", "cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt"], + volumeMounts: [ + { name: "prompt", mountPath: "/tmp/prompt" }, + { name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true }, + ], + } + : { + command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"], + env: [{ name: "PROMPT_CONTENT", value: prompt }], + volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }], + }), securityContext, resources: { requests: { cpu: "10m", memory: "16Mi" }, @@ -388,6 +417,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { workingDir, command: ["sh", "-c", mainCommand], env: envVars, + ...(selfPod.inheritedEnvFrom.length > 0 ? { envFrom: selfPod.inheritedEnvFrom } : {}), volumeMounts, securityContext, resources: containerResources, diff --git a/src/server/k8s-client.ts b/src/server/k8s-client.ts index 2cc0a1d..d862ca9 100644 --- a/src/server/k8s-client.ts +++ b/src/server/k8s-client.ts @@ -20,8 +20,12 @@ export interface SelfPodInfo { dnsConfig: k8s.V1PodDNSConfig | undefined; pvcClaimName: string | null; secretVolumes: SelfPodSecretVolume[]; - /** Env vars read directly from the pod spec's container definition. */ + /** Env vars with literal values from the container spec. */ inheritedEnv: Record; + /** Env vars backed by secretKeyRef/configMapKeyRef/fieldRef (valueFrom). */ + inheritedEnvValueFrom: k8s.V1EnvVar[]; + /** Whole-Secret/ConfigMap env sources (envFrom) from the container spec. */ + inheritedEnvFrom: k8s.V1EnvFromSource[]; } let cachedSelfPod: SelfPodInfo | null = null; @@ -102,7 +106,8 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise c.name === "paperclip") ?? spec.containers[0]; if (!mainContainer?.image) { throw new Error(`claude_k8s: pod ${hostname} has no container image`); } @@ -131,13 +136,21 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise = {}; + const inheritedEnvValueFrom: k8s.V1EnvVar[] = []; for (const envVar of mainContainer.env ?? []) { - if (envVar.value) inheritedEnv[envVar.name] = envVar.value; + if (envVar.value !== undefined) { + inheritedEnv[envVar.name] = envVar.value; + } else if (envVar.valueFrom) { + inheritedEnvValueFrom.push({ name: envVar.name, valueFrom: envVar.valueFrom }); + } } + const inheritedEnvFrom: k8s.V1EnvFromSource[] = [...(mainContainer.envFrom ?? [])]; cachedSelfPod = { namespace, @@ -149,6 +162,8 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise { + it("returns null for object with no type field", () => { + expect(eventDedupKey({ sessionID: "ses_1" })).toBeNull(); + }); + + it("returns null for object with empty type", () => { + expect(eventDedupKey({ type: "" })).toBeNull(); + }); + + it("returns null for unknown event type", () => { + expect(eventDedupKey({ type: "unknown_type", sessionID: "ses_1" })).toBeNull(); + }); + + it("returns type:sessionId:partId when all three present", () => { + const event = { type: "text", sessionID: "ses_1", part: { id: "part_abc" } }; + expect(eventDedupKey(event)).toBe("text:ses_1:part_abc"); + }); + + it("returns type:sessionId when partId absent", () => { + const event = { type: "text", sessionID: "ses_1", part: {} }; + expect(eventDedupKey(event)).toBe("text:ses_1"); + }); + + it("returns null when both sessionId and partId absent", () => { + const event = { type: "text", part: {} }; + expect(eventDedupKey(event)).toBeNull(); + }); + + it("returns null when part has no id and sessionID missing", () => { + const event = { type: "tool_use" }; + expect(eventDedupKey(event)).toBeNull(); + }); + + it("handles tool_use type", () => { + const event = { type: "tool_use", sessionID: "ses_1", part: { id: "tool_1" } }; + expect(eventDedupKey(event)).toBe("tool_use:ses_1:tool_1"); + }); + + it("handles step_finish type", () => { + const event = { type: "step_finish", sessionID: "ses_2", part: { id: "step_1" } }; + expect(eventDedupKey(event)).toBe("step_finish:ses_2:step_1"); + }); + + it("handles step_start type", () => { + const event = { type: "step_start", sessionID: "ses_3" }; + expect(eventDedupKey(event)).toBe("step_start:ses_3"); + }); + + it("handles thinking type", () => { + const event = { type: "thinking", sessionID: "ses_4", part: { id: "think_1" } }; + expect(eventDedupKey(event)).toBe("thinking:ses_4:think_1"); + }); + + it("handles assistant type", () => { + const event = { type: "assistant", sessionID: "ses_5" }; + expect(eventDedupKey(event)).toBe("assistant:ses_5"); + }); + + it("handles user type", () => { + const event = { type: "user", sessionID: "ses_6" }; + expect(eventDedupKey(event)).toBe("user:ses_6"); + }); + + it("returns null for error type (not in dedup switch)", () => { + const event = { type: "error", sessionID: "ses_7" }; + expect(eventDedupKey(event)).toBeNull(); + }); + + it("uses part.id string even when nested in non-object context", () => { + const event = { type: "text", sessionID: "ses_1", part: { id: "part_x" } }; + expect(eventDedupKey(event)).toBe("text:ses_1:part_x"); + }); +}); + +describe("LogLineDedupFilter", () => { + let dedup: LogLineDedupFilter; + + beforeEach(() => { + dedup = new LogLineDedupFilter(); + }); + + describe("filter()", () => { + it("returns empty string for empty chunk", () => { + expect(dedup.filter("")).toBe(""); + }); + + it("passes through non-JSON lines", () => { + const chunk = "[paperclip] Pod running: pod-abc\n"; + expect(dedup.filter(chunk)).toBe(chunk); + }); + + it("passes a JSON event on first occurrence", () => { + const event = { type: "text", sessionID: "ses_1" }; + const line = JSON.stringify(event) + "\n"; + expect(dedup.filter(line)).toBe(line); + }); + + it("drops a duplicate JSON event on second occurrence", () => { + const event = { type: "text", sessionID: "ses_1" }; + const line = JSON.stringify(event) + "\n"; + dedup.filter(line); // first — passes + expect(dedup.filter(line)).toBe(""); // second — dropped + }); + + it("passes a JSON event without a dedup key on every occurrence", () => { + // Events with unknown type have no structural key — fall back to raw content hash + const event = { type: "error", sessionID: "ses_1", error: "unique1" }; + const line = JSON.stringify(event) + "\n"; + dedup.filter(line); + // Same raw content would be deduped (raw: key), but different error content passes + const event2 = { type: "error", sessionID: "ses_1", error: "unique2" }; + const line2 = JSON.stringify(event2) + "\n"; + expect(dedup.filter(line2)).toBe(line2); + }); + + it("deduplicates same raw non-dedup-keyed line twice", () => { + const event = { type: "error", message: "same" }; + const line = JSON.stringify(event) + "\n"; + dedup.filter(line); + expect(dedup.filter(line)).toBe(""); // same raw content deduplicated via raw: key + }); + + it("buffers incomplete trailing content without emitting", () => { + // No trailing newline → chunk is buffered + const partial = '{"type":"text","sessionID":"ses_1"}'; + expect(dedup.filter(partial)).toBe(""); + }); + + it("emits buffered content when completed by next chunk", () => { + const partial = '{"type":"text","sessionID":"ses_1"}'; + dedup.filter(partial); // buffered + const completion = "\n"; // completes the line + const result = dedup.filter(completion); + expect(result).toBe('{"type":"text","sessionID":"ses_1"}\n'); + }); + + it("handles multiple lines in a single chunk", () => { + const line1 = '{"type":"text","sessionID":"ses_1"}\n'; + const line2 = '[paperclip] some status\n'; + const chunk = line1 + line2; + const result = dedup.filter(chunk); + expect(result).toBe(chunk); + }); + + it("deduplicates within a multi-line chunk", () => { + const line = '{"type":"text","sessionID":"ses_1"}\n'; + const chunk = line + line; // same line twice in one chunk + const result = dedup.filter(chunk); + expect(result).toBe(line); // only once + }); + + it("passes blank lines through unchanged", () => { + expect(dedup.filter("\n")).toBe("\n"); + }); + + it("passes whitespace-only lines through unchanged", () => { + expect(dedup.filter(" \n")).toBe(" \n"); + }); + + it("deduplicates events keyed by type:sessionId across chunks", () => { + const event = { type: "step_start", sessionID: "ses_1" }; + const line = JSON.stringify(event) + "\n"; + dedup.filter(line); + // second occurrence in a later chunk + expect(dedup.filter(line)).toBe(""); + }); + + it("allows distinct events with different sessionIds to pass", () => { + const line1 = JSON.stringify({ type: "text", sessionID: "ses_1" }) + "\n"; + const line2 = JSON.stringify({ type: "text", sessionID: "ses_2" }) + "\n"; + dedup.filter(line1); + expect(dedup.filter(line2)).toBe(line2); + }); + + it("allows distinct events with different partIds to pass", () => { + const line1 = JSON.stringify({ type: "tool_use", sessionID: "ses_1", part: { id: "t1" } }) + "\n"; + const line2 = JSON.stringify({ type: "tool_use", sessionID: "ses_1", part: { id: "t2" } }) + "\n"; + dedup.filter(line1); + expect(dedup.filter(line2)).toBe(line2); + }); + }); + + describe("flush()", () => { + it("returns empty string when buffer is empty", () => { + expect(dedup.flush()).toBe(""); + }); + + it("returns and clears buffered incomplete line", () => { + const partial = '{"type":"text","sessionID":"ses_1"}'; + dedup.filter(partial); + expect(dedup.flush()).toBe(partial); + }); + + it("returns empty string on subsequent flush after buffer cleared", () => { + const partial = '{"type":"text","sessionID":"ses_1"}'; + dedup.filter(partial); + dedup.flush(); + expect(dedup.flush()).toBe(""); // buffer already cleared + }); + + it("does not emit duplicate content on flush", () => { + const line = '{"type":"text","sessionID":"ses_1"}\n'; + dedup.filter(line); // first emission + const partial = '{"type":"text","sessionID":"ses_1"}'; // no trailing newline + dedup.filter(partial); + expect(dedup.flush()).toBe(""); // same key already seen — suppressed + }); + }); +}); diff --git a/src/server/log-dedup.ts b/src/server/log-dedup.ts new file mode 100644 index 0000000..19fb25f --- /dev/null +++ b/src/server/log-dedup.ts @@ -0,0 +1,126 @@ +/** + * Line-level dedup filter for the K8s log stream. + * + * The K8s log follow stream can reconnect with an overlapping `sinceSeconds` + * window (integer-second granularity + a safety buffer), which replays a few + * seconds of recent output on every reconnect. Without dedup those replayed + * lines appear as duplicate events in the streaming UI. + * + * The filter operates at the chunk → line level: chunks are split on `\n`, + * incomplete trailing content is buffered until the next chunk, and each + * complete line is emitted at most once. JSON-shaped OpenCode JSONL events + * are keyed by (type + sessionID + part.id); non-JSON lines pass through + * unchanged so genuinely-repeated status lines are not swallowed. + */ + +type Parsed = Record; + +function asStr(value: unknown): string { + return typeof value === "string" ? value : ""; +} + +function asRec(value: unknown): Parsed | null { + if (typeof value !== "object" || value === null || Array.isArray(value)) return null; + return value as Parsed; +} + +/** + * Build a stable dedup key for an OpenCode JSONL event. Returns `null` when + * the event is not a recognized OpenCode event — those lines fall back to + * raw-content hashing so non-JSON output (paperclip status lines, shell + * output) is never deduped by identity. + */ +export function eventDedupKey(event: Parsed): string | null { + const type = asStr(event.type); + if (!type) return null; + + const sessionId = asStr(event.sessionID); + const part = asRec(event.part); + const partId = part ? asStr(part.id) : ""; + + switch (type) { + case "text": + case "tool_use": + case "step_finish": + case "step_start": + case "thinking": + case "assistant": + case "user": + if (partId) return `${type}:${sessionId}:${partId}`; + if (sessionId) return `${type}:${sessionId}`; + return null; + default: + return null; + } +} + +/** + * Stateful line-level dedup filter. Emits `filter(chunk)` output through + * the caller — preserves original chunk formatting (including trailing + * newlines) for lines that pass the dedup check. + */ +export class LogLineDedupFilter { + private buffer = ""; + private readonly seenKeys = new Set(); + + /** + * Process a chunk and return the subset that should be forwarded. + * Incomplete trailing content (no terminating newline) is buffered and + * emitted on the next chunk that completes the line (or on flush()). + */ + filter(chunk: string): string { + if (!chunk) return ""; + const combined = this.buffer + chunk; + const endsWithNewline = combined.endsWith("\n"); + const parts = combined.split("\n"); + + if (endsWithNewline) { + parts.pop(); + this.buffer = ""; + } else { + this.buffer = parts.pop() ?? ""; + } + + const out: string[] = []; + for (const line of parts) { + if (this.shouldEmit(line)) out.push(line); + } + if (out.length === 0) return ""; + return out.join("\n") + "\n"; + } + + /** + * Flush any incomplete trailing content. Called when the stream ends + * without a terminating newline so the final partial line isn't lost. + */ + flush(): string { + const pending = this.buffer; + this.buffer = ""; + if (!pending) return ""; + return this.shouldEmit(pending) ? pending : ""; + } + + private shouldEmit(line: string): boolean { + const trimmed = line.trim(); + if (!trimmed) return true; + + if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true; + + let parsed: unknown; + try { + parsed = JSON.parse(trimmed); + } catch { + return true; + } + + const event = asRec(parsed); + if (!event) return true; + + const structuralKey = eventDedupKey(event); + const key = structuralKey ?? `raw:${trimmed}`; + + if (this.seenKeys.has(key)) return false; + this.seenKeys.add(key); + return true; + } +} diff --git a/src/server/session.test.ts b/src/server/session.test.ts new file mode 100644 index 0000000..7962d23 --- /dev/null +++ b/src/server/session.test.ts @@ -0,0 +1,237 @@ +import { describe, it, expect } from "vitest"; +import { sessionCodec } from "./session.js"; + +describe("sessionCodec.deserialize", () => { + it("returns null for null input", () => { + expect(sessionCodec.deserialize(null)).toBeNull(); + }); + + it("returns null for string input", () => { + expect(sessionCodec.deserialize("string")).toBeNull(); + }); + + it("returns null for number input", () => { + expect(sessionCodec.deserialize(42)).toBeNull(); + }); + + it("returns null for array input", () => { + expect(sessionCodec.deserialize([])).toBeNull(); + }); + + it("returns null when sessionId is absent", () => { + expect(sessionCodec.deserialize({ cwd: "/foo" })).toBeNull(); + }); + + it("returns null when sessionId is empty string", () => { + expect(sessionCodec.deserialize({ sessionId: "" })).toBeNull(); + }); + + it("returns null when sessionId is whitespace only", () => { + expect(sessionCodec.deserialize({ sessionId: " " })).toBeNull(); + }); + + it("reads canonical sessionId", () => { + const result = sessionCodec.deserialize({ sessionId: "ses_abc" }); + expect(result?.sessionId).toBe("ses_abc"); + }); + + it("reads legacy session_id field", () => { + const result = sessionCodec.deserialize({ session_id: "ses_legacy" }); + expect(result?.sessionId).toBe("ses_legacy"); + }); + + it("reads legacy sessionID field", () => { + const result = sessionCodec.deserialize({ sessionID: "ses_ID" }); + expect(result?.sessionId).toBe("ses_ID"); + }); + + it("prefers sessionId over session_id", () => { + const result = sessionCodec.deserialize({ sessionId: "canonical", session_id: "legacy" }); + expect(result?.sessionId).toBe("canonical"); + }); + + it("prefers session_id over sessionID", () => { + const result = sessionCodec.deserialize({ session_id: "mid", sessionID: "last" }); + expect(result?.sessionId).toBe("mid"); + }); + + it("trims whitespace from sessionId", () => { + const result = sessionCodec.deserialize({ sessionId: " ses_123 " }); + expect(result?.sessionId).toBe("ses_123"); + }); + + it("reads cwd field", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", cwd: "/work/dir" }); + expect(result?.cwd).toBe("/work/dir"); + }); + + it("reads workdir as cwd fallback", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", workdir: "/workdir" }); + expect(result?.cwd).toBe("/workdir"); + }); + + it("reads folder as cwd fallback", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", folder: "/folder" }); + expect(result?.cwd).toBe("/folder"); + }); + + it("prefers cwd over workdir", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", cwd: "/cwd", workdir: "/workdir" }); + expect(result?.cwd).toBe("/cwd"); + }); + + it("prefers workdir over folder", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", workdir: "/workdir", folder: "/folder" }); + expect(result?.cwd).toBe("/workdir"); + }); + + it("reads workspaceId field", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", workspaceId: "ws-1" }); + expect(result?.workspaceId).toBe("ws-1"); + }); + + it("reads workspace_id as workspaceId fallback", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", workspace_id: "ws-legacy" }); + expect(result?.workspaceId).toBe("ws-legacy"); + }); + + it("reads repoUrl field", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", repoUrl: "https://github.com/org/repo" }); + expect(result?.repoUrl).toBe("https://github.com/org/repo"); + }); + + it("reads repo_url as repoUrl fallback", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", repo_url: "https://github.com/org/repo" }); + expect(result?.repoUrl).toBe("https://github.com/org/repo"); + }); + + it("reads repoRef field", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", repoRef: "main" }); + expect(result?.repoRef).toBe("main"); + }); + + it("reads repo_ref as repoRef fallback", () => { + const result = sessionCodec.deserialize({ sessionId: "s1", repo_ref: "feature/branch" }); + expect(result?.repoRef).toBe("feature/branch"); + }); + + it("omits absent optional fields from result", () => { + const result = sessionCodec.deserialize({ sessionId: "s1" }); + expect(result).toEqual({ sessionId: "s1" }); + expect(result && "cwd" in result).toBe(false); + expect(result && "workspaceId" in result).toBe(false); + expect(result && "repoUrl" in result).toBe(false); + expect(result && "repoRef" in result).toBe(false); + }); + + it("includes all fields when all are present", () => { + const result = sessionCodec.deserialize({ + sessionId: "ses_full", + cwd: "/work", + workspaceId: "ws-1", + repoUrl: "https://github.com/org/repo", + repoRef: "main", + }); + expect(result).toEqual({ + sessionId: "ses_full", + cwd: "/work", + workspaceId: "ws-1", + repoUrl: "https://github.com/org/repo", + repoRef: "main", + }); + }); +}); + +describe("sessionCodec.serialize", () => { + it("returns null for null input", () => { + expect(sessionCodec.serialize(null)).toBeNull(); + }); + + it("returns null when sessionId is missing", () => { + expect(sessionCodec.serialize({ cwd: "/foo" })).toBeNull(); + }); + + it("returns null when sessionId is empty string", () => { + expect(sessionCodec.serialize({ sessionId: "" })).toBeNull(); + }); + + it("serializes canonical fields", () => { + const result = sessionCodec.serialize({ + sessionId: "ses_abc", + cwd: "/work", + workspaceId: "ws-1", + repoUrl: "https://github.com/org/repo", + repoRef: "main", + }); + expect(result).toEqual({ + sessionId: "ses_abc", + cwd: "/work", + workspaceId: "ws-1", + repoUrl: "https://github.com/org/repo", + repoRef: "main", + }); + }); + + it("reads legacy session_id field", () => { + const result = sessionCodec.serialize({ session_id: "ses_legacy" }); + expect(result?.sessionId).toBe("ses_legacy"); + }); + + it("reads legacy workdir as cwd", () => { + const result = sessionCodec.serialize({ sessionId: "s1", workdir: "/workdir" }); + expect(result?.cwd).toBe("/workdir"); + }); + + it("reads legacy workspace_id", () => { + const result = sessionCodec.serialize({ sessionId: "s1", workspace_id: "ws-2" }); + expect(result?.workspaceId).toBe("ws-2"); + }); + + it("reads legacy repo_url", () => { + const result = sessionCodec.serialize({ sessionId: "s1", repo_url: "https://github.com/org/repo" }); + expect(result?.repoUrl).toBe("https://github.com/org/repo"); + }); + + it("reads legacy repo_ref", () => { + const result = sessionCodec.serialize({ sessionId: "s1", repo_ref: "develop" }); + expect(result?.repoRef).toBe("develop"); + }); + + it("omits absent optional fields", () => { + const result = sessionCodec.serialize({ sessionId: "s1" }); + expect(result).toEqual({ sessionId: "s1" }); + }); +}); + +describe("sessionCodec.getDisplayId", () => { + // getDisplayId is optional in the AdapterSessionCodec interface; use non-null assertion since we know it's implemented + const getDisplayId = sessionCodec.getDisplayId!.bind(sessionCodec); + + it("returns null for null input", () => { + expect(getDisplayId(null)).toBeNull(); + }); + + it("returns sessionId", () => { + expect(getDisplayId({ sessionId: "ses_abc" })).toBe("ses_abc"); + }); + + it("returns session_id as fallback", () => { + expect(getDisplayId({ session_id: "ses_legacy" })).toBe("ses_legacy"); + }); + + it("returns sessionID as fallback", () => { + expect(getDisplayId({ sessionID: "ses_ID" })).toBe("ses_ID"); + }); + + it("prefers sessionId over session_id", () => { + expect(getDisplayId({ sessionId: "canonical", session_id: "legacy" })).toBe("canonical"); + }); + + it("returns null when no valid id field present", () => { + expect(getDisplayId({ other: "value" })).toBeNull(); + }); + + it("returns null when sessionId is empty string", () => { + expect(getDisplayId({ sessionId: "" })).toBeNull(); + }); +}); diff --git a/src/ui-parser.test.ts b/src/ui-parser.test.ts index 409f29a..7b12968 100644 --- a/src/ui-parser.test.ts +++ b/src/ui-parser.test.ts @@ -146,4 +146,180 @@ describe("parseStdoutLine", () => { const line = JSON.stringify({ type: "thinking", part: { thinking: " " } }); expect(parseStdoutLine(line, TS)).toEqual([]); }); + + it("maps step_start to system kind", () => { + const line = JSON.stringify({ type: "step_start" }); + expect(parseStdoutLine(line, TS)).toEqual([{ kind: "system", ts: TS, text: "Starting step…" }]); + }); + + it("maps tool_use pending status to tool_call kind", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", id: "call_1", state: { status: "pending", description: "ls -la" } }, + }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(1); + expect(entries[0].kind).toBe("tool_call"); + const entry = entries[0] as { name: string; toolUseId: string; input: unknown }; + expect(entry.name).toBe("bash"); + expect(entry.toolUseId).toBe("call_1"); + expect(entry.input).toBe("ls -la"); + }); + + it("maps tool_use error status to tool_result with isError=true", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", id: "call_2", state: { status: "error", error: "Command not found" } }, + }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(1); + const entry = entries[0] as { kind: string; isError: boolean; content: string; toolName: string }; + expect(entry.kind).toBe("tool_result"); + expect(entry.isError).toBe(true); + expect(entry.content).toBe("Command not found"); + expect(entry.toolName).toBe("bash"); + }); + + it("uses 'Tool error' fallback when tool_use error field is empty", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", state: { status: "error", error: "" } }, + }); + const entry = parseStdoutLine(line, TS)[0] as { content: string }; + expect(entry.content).toBe("Tool error"); + }); + + it("maps tool_use done status to tool_result", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "grep", id: "call_3", state: { status: "done", output: "3 matches" } }, + }); + const entries = parseStdoutLine(line, TS); + const entry = entries[0] as { kind: string; isError: boolean; content: string }; + expect(entry.kind).toBe("tool_result"); + expect(entry.isError).toBe(false); + expect(entry.content).toBe("3 matches"); + }); + + it("uses description as content fallback when tool_use output is empty", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "ls", state: { status: "completed", output: "", description: "Listed 5 files" } }, + }); + const entry = parseStdoutLine(line, TS)[0] as { content: string }; + expect(entry.content).toBe("Listed 5 files"); + }); + + it("uses 'Done' when tool_use output and description are both empty", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "ls", state: { status: "completed", output: "", description: "" } }, + }); + const entry = parseStdoutLine(line, TS)[0] as { content: string }; + expect(entry.content).toBe("Done"); + }); + + it("uses tool name as toolUseId when id field is absent", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", state: { status: "pending" } }, + }); + const entry = parseStdoutLine(line, TS)[0] as { toolUseId: string }; + expect(entry.toolUseId).toBe("bash"); + }); + + it("sets tool_call input to undefined when description is empty", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "bash", state: { status: "pending", description: "" } }, + }); + const entry = parseStdoutLine(line, TS)[0] as { input: unknown }; + expect(entry.input).toBeUndefined(); + }); + + it("accumulates reasoning tokens into step_finish outputTokens", () => { + const line = JSON.stringify({ + type: "step_finish", + part: { tokens: { input: 100, output: 50, reasoning: 20, cache: { read: 80 } }, cost: 0.005 }, + }); + const entry = parseStdoutLine(line, TS)[0] as { + inputTokens: number; outputTokens: number; cachedTokens: number; costUsd: number; + }; + expect(entry.inputTokens).toBe(100); + expect(entry.outputTokens).toBe(70); // output(50) + reasoning(20) + expect(entry.cachedTokens).toBe(80); + expect(entry.costUsd).toBeCloseTo(0.005); + }); + + it("step_finish uses reason as fallback text when message is empty", () => { + const line = JSON.stringify({ + type: "step_finish", + part: { reason: "end_turn", tokens: {} }, + }); + const entry = parseStdoutLine(line, TS)[0] as { text: string; subtype: string }; + expect(entry.text).toBe("Step finished: end_turn"); + expect(entry.subtype).toBe("end_turn"); + }); + + it("step_finish uses 'done' subtype when reason is absent", () => { + const line = JSON.stringify({ type: "step_finish", part: { tokens: {} } }); + const entry = parseStdoutLine(line, TS)[0] as { text: string; subtype: string }; + expect(entry.text).toBe("Step finished: done"); + expect(entry.subtype).toBe("step_finish"); + }); + + it("step_finish defaults all numeric fields to 0 when tokens absent", () => { + const line = JSON.stringify({ type: "step_finish", part: {} }); + const entry = parseStdoutLine(line, TS)[0] as { + inputTokens: number; outputTokens: number; cachedTokens: number; costUsd: number; + }; + expect(entry.inputTokens).toBe(0); + expect(entry.outputTokens).toBe(0); + expect(entry.cachedTokens).toBe(0); + expect(entry.costUsd).toBe(0); + }); + + it("returns empty for assistant event with non-text content blocks", () => { + const line = JSON.stringify({ + type: "assistant", + part: { message: { content: [{ type: "tool_use", input: {} }] } }, + }); + expect(parseStdoutLine(line, TS)).toEqual([]); + }); + + it("returns empty for assistant event with empty text block", () => { + const line = JSON.stringify({ + type: "assistant", + part: { message: { content: [{ type: "text", text: " " }] } }, + }); + expect(parseStdoutLine(line, TS)).toEqual([]); + }); + + it("extracts error message from nested error.data.message", () => { + const line = JSON.stringify({ type: "error", error: { data: { message: "Nested message" } } }); + const entry = parseStdoutLine(line, TS)[0] as { text: string }; + expect(entry.text).toBe("Nested message"); + }); + + it("falls back to error.name when message absent", () => { + const line = JSON.stringify({ type: "error", error: { name: "NotFoundError" } }); + const entry = parseStdoutLine(line, TS)[0] as { text: string }; + expect(entry.text).toBe("NotFoundError"); + }); + + it("falls back to error.code when name absent", () => { + const line = JSON.stringify({ type: "error", error: { code: "ERR_CONN" } }); + const entry = parseStdoutLine(line, TS)[0] as { text: string }; + expect(entry.text).toBe("ERR_CONN"); + }); + + it("returns empty array for unrecognized event types", () => { + const line = JSON.stringify({ type: "some_unknown_type", data: {} }); + expect(parseStdoutLine(line, TS)).toEqual([]); + }); + + it("returns empty array for JSON with no type field", () => { + const line = JSON.stringify({ sessionID: "ses_123", data: "something" }); + expect(parseStdoutLine(line, TS)).toEqual([]); + }); });