diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts new file mode 100644 index 0000000..24e924a --- /dev/null +++ b/src/server/execute.test.ts @@ -0,0 +1,108 @@ +import { describe, it, expect } from "vitest"; +import { isK8s404, buildPartialRunError } from "./execute.js"; + +describe("isK8s404", () => { + it("returns false for non-Error values", () => { + expect(isK8s404(null)).toBe(false); + expect(isK8s404(undefined)).toBe(false); + expect(isK8s404("string error")).toBe(false); + expect(isK8s404(404)).toBe(false); + }); + + it("returns false for unrelated errors", () => { + expect(isK8s404(new Error("something went wrong"))).toBe(false); + expect(isK8s404(new Error("HTTP-Code: 500 Message: Internal Server Error"))).toBe(false); + }); + + it("detects 404 from v1.0+ message format", () => { + const err = new Error("HTTP-Code: 404 Message: Unknown API Status Code! Body: ..."); + expect(isK8s404(err)).toBe(true); + }); + + it("detects 404 from v0.x response.statusCode", () => { + const err = Object.assign(new Error("Not Found"), { + response: { statusCode: 404 }, + }); + expect(isK8s404(err)).toBe(true); + }); + + it("detects 404 from v1.0+ response.status", () => { + const err = Object.assign(new Error("Not Found"), { + response: { status: 404 }, + }); + expect(isK8s404(err)).toBe(true); + }); + + it("detects 404 from direct statusCode property", () => { + const err = Object.assign(new Error("Not Found"), { statusCode: 404 }); + expect(isK8s404(err)).toBe(true); + }); + + it("does not match non-404 status codes on response", () => { + const err = Object.assign(new Error("Forbidden"), { + response: { statusCode: 403 }, + }); + expect(isK8s404(err)).toBe(false); + }); +}); + +describe("buildPartialRunError", () => { + const initLine = JSON.stringify({ + type: "system", + subtype: "init", + model: "claude-sonnet-4-6", + session_id: "sess_abc", + }); + + it("returns parse-failure message when exitCode is 0", () => { + expect(buildPartialRunError(0, "", "")).toBe("Failed to parse Claude JSON output"); + expect(buildPartialRunError(0, "claude-sonnet-4-6", initLine)).toBe( + "Failed to parse Claude JSON output", + ); + }); + + it("returns generic exit message when stdout is empty", () => { + expect(buildPartialRunError(1, "", "")).toBe("Claude exited with code 1"); + expect(buildPartialRunError(null, "", "")).toBe("Claude exited with code -1"); + }); + + it("skips system/init events and returns generic message when only init captured", () => { + const msg = buildPartialRunError(1, "claude-sonnet-4-6", initLine); + expect(msg).toBe( + "Claude started but did not produce a result (model: claude-sonnet-4-6) — check API credentials, model support, and adapter config", + ); + }); + + it("includes model from parsedStream when stdout is init-only", () => { + const msg = buildPartialRunError(null, "MiniMax-M2.7", initLine); + expect(msg).toContain("MiniMax-M2.7"); + expect(msg).not.toContain("type"); + expect(msg).not.toContain("system"); + }); + + it("uses first non-system line as content when present", () => { + const stdout = [initLine, "Error: no API key configured"].join("\n"); + const msg = buildPartialRunError(1, "claude-sonnet-4-6", stdout); + expect(msg).toBe("Claude exited with code 1: Error: no API key configured"); + }); + + it("uses first non-system JSON event as content", () => { + const resultLike = JSON.stringify({ type: "result", subtype: "error", result: "rate limit" }); + const stdout = [initLine, resultLike].join("\n"); + const msg = buildPartialRunError(2, "claude-sonnet-4-6", stdout); + expect(msg).toContain("rate limit"); + expect(msg).toContain("code 2"); + }); + + it("null exitCode renders as -1 in message", () => { + const msg = buildPartialRunError(null, "", "Some plain error text"); + expect(msg).toBe("Claude exited with code -1: Some plain error text"); + }); + + it("skips multiple consecutive system events", () => { + const anotherSystem = JSON.stringify({ type: "system", subtype: "other" }); + const stdout = [initLine, anotherSystem, "real error line"].join("\n"); + const msg = buildPartialRunError(1, "model-x", stdout); + expect(msg).toBe("Claude exited with code 1: real error line"); + }); +}); diff --git a/src/server/execute.ts b/src/server/execute.ts index c313f15..868002a 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -8,6 +8,7 @@ import { } from "./parse.js"; import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js"; import { buildJobManifest } from "./job-manifest.js"; +import { LogLineDedupFilter } from "./log-dedup.js"; import type * as k8s from "@kubernetes/client-node"; import { Writable } from "node:stream"; @@ -19,8 +20,9 @@ const MAX_LOG_RECONNECT_ATTEMPTS = 50; /** * Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node. * Works for both v0.x (response.statusCode) and v1.0+ (response.status, message). + * Exported for unit tests. */ -function isK8s404(err: unknown): boolean { +export function isK8s404(err: unknown): boolean { if (!(err instanceof Error)) return false; const e = err as unknown as Record; const resp = e.response as Record | undefined; @@ -29,6 +31,45 @@ function isK8s404(err: unknown): boolean { return /HTTP-Code:\s*404\b/.test(err.message); } +/** + * Build the error message when Claude's stdout contains no result event. + * Skips system/init event lines so the UI doesn't display the raw init JSON. + * Exported for unit tests. + */ +export function buildPartialRunError( + exitCode: number | null, + model: string, + stdout: string, +): string { + if (exitCode === 0) return "Failed to parse Claude JSON output"; + + // Walk stdout lines, skip system events, return the first real content line. + const firstContentLine = stdout.split(/\r?\n/) + .map((l) => l.trim()) + .find((l) => { + if (!l) return false; + try { + const obj = JSON.parse(l); + if (typeof obj === "object" && obj !== null && (obj as Record).type === "system") return false; + } catch { + // not JSON — treat as content + } + return true; + }) ?? ""; + + // If we only have system/init events and nothing else, surface the model + // name so the operator can diagnose missing credentials or unsupported model. + const initOnlyOutput = stdout.trim() !== "" && model !== "" && !firstContentLine; + if (initOnlyOutput) { + const modelHint = model ? ` (model: ${model})` : ""; + return `Claude started but did not produce a result${modelHint} — check API credentials, model support, and adapter config`; + } + + return firstContentLine + ? `Claude exited with code ${exitCode ?? -1}: ${firstContentLine}` + : `Claude exited with code ${exitCode ?? -1}`; +} + /** * Wait for the Job's pod to reach a terminal or running state. * Returns the pod name once logs can be streamed, or throws on failure. @@ -150,6 +191,7 @@ async function streamPodLogsOnce( onLog: AdapterExecutionContext["onLog"], kubeconfigPath?: string, sinceSeconds?: number, + dedup?: LogLineDedupFilter, ): Promise { const logApi = getLogApi(kubeconfigPath); const chunks: string[] = []; @@ -158,7 +200,12 @@ async function streamPodLogsOnce( write(chunk: Buffer, _encoding, callback) { const text = chunk.toString("utf-8"); chunks.push(text); - void onLog("stdout", text).then(() => callback(), callback); + const emitted = dedup ? dedup.filter(text) : text; + if (!emitted) { + callback(); + return; + } + void onLog("stdout", emitted).then(() => callback(), callback); }, }); @@ -198,6 +245,9 @@ async function streamPodLogs( // reconnects use a tight window instead of an ever-growing one anchored // at stream start. This is the primary fix for FAR-105 duplicative logs. let lastLogReceivedAt = Math.floor(Date.now() / 1000); + // Shared across reconnects so replayed lines inside the `sinceSeconds` + // overlap window are dropped before they reach the streaming UI (FAR-123). + const dedup = new LogLineDedupFilter(); while (!stopSignal?.stopped) { if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) { @@ -217,7 +267,7 @@ async function streamPodLogs( } const preStreamTs = Math.floor(Date.now() / 1000); - const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds); + const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup); if (result) { allChunks.push(result); // Update last-received timestamp to now (the stream just ended, @@ -237,6 +287,11 @@ async function streamPodLogs( await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS)); } + // Flush any buffered partial line so the final assistant/result chunk + // isn't dropped when the stream ends mid-line. + const tail = dedup.flush(); + if (tail) await onLog("stdout", tail); + return allChunks.join(""); } @@ -640,13 +695,27 @@ export async function execute(ctx: AdapterExecutionContext): Promise stdout.length) { + await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`); + stdout = oneShotLogs; } } @@ -739,16 +808,11 @@ export async function execute(ctx: AdapterExecutionContext): Promise l.trim()).find(Boolean) ?? ""; return { exitCode, signal: null, timedOut: false, - errorMessage: exitCode === 0 - ? "Failed to parse Claude JSON output" - : stderrLine - ? `Claude exited with code ${exitCode ?? -1}: ${stderrLine}` - : `Claude exited with code ${exitCode ?? -1}`, + errorMessage: buildPartialRunError(exitCode, parsedStream.model, stdout), resultJson: { stdout }, }; } diff --git a/src/server/log-dedup.test.ts b/src/server/log-dedup.test.ts new file mode 100644 index 0000000..5c497f6 --- /dev/null +++ b/src/server/log-dedup.test.ts @@ -0,0 +1,173 @@ +import { describe, it, expect } from "vitest"; +import { LogLineDedupFilter, eventDedupKey } from "./log-dedup.js"; + +function assistantEvent(id: string, text: string): string { + return JSON.stringify({ + type: "assistant", + session_id: "sess_1", + message: { + id, + content: [{ type: "text", text }], + }, + }); +} + +function userToolResultEvent(toolUseId: string, content: string): string { + return JSON.stringify({ + type: "user", + session_id: "sess_1", + message: { + content: [{ type: "tool_result", tool_use_id: toolUseId, content }], + }, + }); +} + +function systemInitEvent(sessionId: string): string { + return JSON.stringify({ + type: "system", + subtype: "init", + session_id: sessionId, + model: "claude-opus-4-7", + }); +} + +function resultEvent(sessionId: string): string { + return JSON.stringify({ + type: "result", + subtype: "success", + session_id: sessionId, + result: "done", + total_cost_usd: 0.01, + usage: { input_tokens: 1, output_tokens: 1, cache_read_input_tokens: 0 }, + }); +} + +describe("eventDedupKey", () => { + it("keys assistant events by message.id", () => { + const key = eventDedupKey(JSON.parse(assistantEvent("msg_abc", "hi"))); + expect(key).toBe("assistant:msg_abc"); + }); + + it("keys user tool_result events by tool_use_id", () => { + const key = eventDedupKey(JSON.parse(userToolResultEvent("toolu_1", "ok"))); + expect(key).toBe("user:tool_result:toolu_1"); + }); + + it("keys system init events by session_id", () => { + const key = eventDedupKey(JSON.parse(systemInitEvent("sess_xyz"))); + expect(key).toBe("system:init:sess_xyz"); + }); + + it("keys result events by session_id", () => { + const key = eventDedupKey(JSON.parse(resultEvent("sess_xyz"))); + expect(key).toBe("result:sess_xyz"); + }); + + it("returns null for assistant events missing message.id", () => { + const event = { type: "assistant", message: { content: [] } }; + expect(eventDedupKey(event)).toBeNull(); + }); + + it("returns null for unknown event types", () => { + expect(eventDedupKey({ type: "unknown" })).toBeNull(); + expect(eventDedupKey({})).toBeNull(); + }); +}); + +describe("LogLineDedupFilter", () => { + it("passes unique lines through unchanged", () => { + const filter = new LogLineDedupFilter(); + const a = assistantEvent("msg_1", "hello"); + const b = assistantEvent("msg_2", "world"); + expect(filter.filter(`${a}\n${b}\n`)).toBe(`${a}\n${b}\n`); + }); + + it("drops assistant events replayed with the same message.id", () => { + const filter = new LogLineDedupFilter(); + const a = assistantEvent("msg_1", "Three nits to fix."); + filter.filter(`${a}\n`); + expect(filter.filter(`${a}\n`)).toBe(""); + }); + + it("drops user tool_result events replayed with the same tool_use_id", () => { + const filter = new LogLineDedupFilter(); + const a = userToolResultEvent("toolu_abc", "file contents"); + filter.filter(`${a}\n`); + expect(filter.filter(`${a}\n`)).toBe(""); + }); + + it("drops system init and result events on replay", () => { + const filter = new LogLineDedupFilter(); + const init = systemInitEvent("sess_1"); + const result = resultEvent("sess_1"); + filter.filter(`${init}\n${result}\n`); + expect(filter.filter(`${init}\n${result}\n`)).toBe(""); + }); + + it("buffers incomplete trailing lines across chunks", () => { + const filter = new LogLineDedupFilter(); + const line = assistantEvent("msg_1", "hello"); + const mid = Math.floor(line.length / 2); + const out1 = filter.filter(line.slice(0, mid)); + const out2 = filter.filter(line.slice(mid) + "\n"); + expect(out1).toBe(""); + expect(out2).toBe(`${line}\n`); + }); + + it("flush() emits a final incomplete line that was not replayed", () => { + const filter = new LogLineDedupFilter(); + const line = assistantEvent("msg_tail", "no newline"); + filter.filter(line); + expect(filter.flush()).toBe(line); + }); + + it("flush() drops an incomplete line that was already seen with a newline", () => { + const filter = new LogLineDedupFilter(); + const line = assistantEvent("msg_same", "x"); + filter.filter(`${line}\n`); + filter.filter(line); + expect(filter.flush()).toBe(""); + }); + + it("passes non-JSON lines through every time (does not dedup paperclip status)", () => { + const filter = new LogLineDedupFilter(); + const status = "[paperclip] keepalive — job foo running\n"; + expect(filter.filter(status)).toBe(status); + expect(filter.filter(status)).toBe(status); + }); + + it("dedups structurally identical JSON with identical content (raw fallback)", () => { + const filter = new LogLineDedupFilter(); + // No recognized type → raw fallback key. + const line = JSON.stringify({ foo: "bar", baz: 1 }); + filter.filter(`${line}\n`); + expect(filter.filter(`${line}\n`)).toBe(""); + }); + + it("handles multiple complete lines in a single chunk with partial trailing", () => { + const filter = new LogLineDedupFilter(); + const a = assistantEvent("msg_a", "a"); + const b = assistantEvent("msg_b", "b"); + const c = assistantEvent("msg_c", "c"); + // a and b are complete, c is partial (no trailing newline). + const out = filter.filter(`${a}\n${b}\n${c}`); + expect(out).toBe(`${a}\n${b}\n`); + // Completing c later should emit exactly c. + expect(filter.filter("\n")).toBe(`${c}\n`); + }); + + it("drops the classic FAR-123 replay scenario across reconnects", () => { + const filter = new LogLineDedupFilter(); + const assistantNits = assistantEvent("msg_nits", "Three nits to fix. Let me look at an existing test file..."); + const assistantWrite = assistantEvent("msg_write", "Now I need to write unit tests"); + // First stream attempt emits both events. + const out1 = filter.filter(`${assistantNits}\n${assistantWrite}\n`); + expect(out1).toBe(`${assistantNits}\n${assistantWrite}\n`); + // Reconnect replays both within the sinceSeconds overlap — filter should drop them. + const out2 = filter.filter(`${assistantNits}\n${assistantWrite}\n`); + expect(out2).toBe(""); + // And a genuinely new event after the replay should still pass through. + const assistantFresh = assistantEvent("msg_fresh", "next turn"); + expect(filter.filter(`${assistantFresh}\n`)).toBe(`${assistantFresh}\n`); + }); +}); diff --git a/src/server/log-dedup.ts b/src/server/log-dedup.ts new file mode 100644 index 0000000..31a318a --- /dev/null +++ b/src/server/log-dedup.ts @@ -0,0 +1,146 @@ +/** + * Line-level dedup filter for the K8s log stream. + * + * The K8s log follow stream can reconnect with an overlapping `sinceSeconds` + * window (integer-second granularity + a safety buffer), which replays a few + * seconds of recent output on every reconnect. Without dedup those replayed + * lines appear as duplicate events in the streaming UI — the same assistant + * text block shows up between every subsequent tool call (FAR-123). + * + * The filter operates at the chunk → line level: chunks are split on `\n`, + * incomplete trailing content is buffered until the next chunk, and each + * complete line is emitted at most once. JSON-shaped Claude stream-json + * events are keyed by their stable structural IDs; non-JSON lines pass + * through unchanged so genuinely-repeated status lines are not swallowed. + */ + +type Parsed = Record; + +function asString(value: unknown): string { + return typeof value === "string" ? value : ""; +} + +function asRecord(value: unknown): Parsed | null { + if (typeof value !== "object" || value === null || Array.isArray(value)) return null; + return value as Parsed; +} + +/** + * Build a stable dedup key for a Claude stream-json event. Returns `null` + * when the event is not a recognized Claude event — those lines fall back to + * raw-content hashing so non-JSON output (paperclip status lines, shell + * output) is never deduped by identity. + */ +export function eventDedupKey(event: Parsed): string | null { + const type = asString(event.type); + + if (type === "system") { + const subtype = asString(event.subtype); + const sessionId = asString(event.session_id); + if (subtype === "init" && sessionId) return `system:init:${sessionId}`; + return null; + } + + if (type === "assistant") { + const message = asRecord(event.message); + const id = message ? asString(message.id) : ""; + if (id) return `assistant:${id}`; + return null; + } + + if (type === "user") { + const message = asRecord(event.message); + const content = message && Array.isArray(message.content) ? message.content : []; + const toolUseIds: string[] = []; + for (const entry of content) { + const block = asRecord(entry); + if (!block) continue; + const toolUseId = asString(block.tool_use_id); + if (toolUseId) toolUseIds.push(toolUseId); + } + if (toolUseIds.length > 0) return `user:tool_result:${toolUseIds.join(",")}`; + return null; + } + + if (type === "result") { + const sessionId = asString(event.session_id); + return sessionId ? `result:${sessionId}` : "result:unknown"; + } + + return null; +} + +/** + * Stateful line-level dedup filter. Emits `filter(chunk)` output through + * the caller — preserves original chunk formatting (including trailing + * newlines) for lines that pass the dedup check. + */ +export class LogLineDedupFilter { + private buffer = ""; + private readonly seenKeys = new Set(); + + /** + * Process a chunk and return the subset that should be forwarded. + * Incomplete trailing content (no terminating newline) is buffered and + * emitted on the next chunk that completes the line (or on flush()). + */ + filter(chunk: string): string { + if (!chunk) return ""; + const combined = this.buffer + chunk; + const endsWithNewline = combined.endsWith("\n"); + const parts = combined.split("\n"); + + if (endsWithNewline) { + // Discard the final empty element — last line was complete. + parts.pop(); + this.buffer = ""; + } else { + // Last element is an incomplete line — hold it for the next chunk. + this.buffer = parts.pop() ?? ""; + } + + const out: string[] = []; + for (const line of parts) { + if (this.shouldEmit(line)) out.push(line); + } + if (out.length === 0) return ""; + return out.join("\n") + "\n"; + } + + /** + * Flush any incomplete trailing content. Called when the stream ends + * without a terminating newline so the final partial line isn't lost. + */ + flush(): string { + const pending = this.buffer; + this.buffer = ""; + if (!pending) return ""; + return this.shouldEmit(pending) ? pending : ""; + } + + private shouldEmit(line: string): boolean { + const trimmed = line.trim(); + if (!trimmed) return true; + + // Only attempt dedup on JSON-shaped lines; pass shell/text output through. + if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true; + + let parsed: unknown; + try { + parsed = JSON.parse(trimmed); + } catch { + return true; + } + + const event = asRecord(parsed); + if (!event) return true; + + // Recognized Claude stream-json event → structural key. + const structuralKey = eventDedupKey(event); + const key = structuralKey ?? `raw:${trimmed}`; + + if (this.seenKeys.has(key)) return false; + this.seenKeys.add(key); + return true; + } +}