From b9def0964e363e29e640cb3979a708f7b282a410 Mon Sep 17 00:00:00 2001 From: Test User Date: Wed, 22 Apr 2026 19:33:15 +0000 Subject: [PATCH 1/3] fix: improve partial-log handling and error messages for fast-exit containers (FAR-122) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add a second log fallback: if the follow stream captured partial output (init event present but no result event), attempt a one-shot readPodLogs before the pod is cleaned up. Fast-exiting containers (bad model, missing API key, etc.) can cause the follow stream to return only the init line before the connection drops; the one-shot read is more reliable for already-terminated containers. - Improve the `!parsed` error message: skip system/init events when searching for the first content line, so the error reads "Claude started but did not produce a result (model: MiniMax-M2.7) — check API credentials..." instead of "Claude exited with code -1: {"type":"system","subtype":"init",...}". Co-Authored-By: Paperclip --- src/server/execute.ts | 44 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/src/server/execute.ts b/src/server/execute.ts index c313f15..558a5b7 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -650,6 +650,19 @@ export async function execute(ctx: AdapterExecutionContext): Promise stdout.length) { + await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`); + stdout = fullLogs; + } + } + if (completionResult.status === "fulfilled") { jobTimedOut = completionResult.value.timedOut; if (completionResult.value.jobGone) { @@ -739,16 +752,39 @@ export async function execute(ctx: AdapterExecutionContext): Promise l.trim()).find(Boolean) ?? ""; + // Find the first stdout line that is NOT a system/init event. + // Using the system/init JSON blob as the error message produces a huge, + // unreadable error in the UI. Skip those and use the first real content line. + const firstContentLine = stdout.split(/\r?\n/) + .map((l) => l.trim()) + .find((l) => { + if (!l) return false; + try { + const obj = JSON.parse(l); + if (typeof obj === "object" && obj !== null && (obj as Record).type === "system") return false; + } catch { + // not JSON — treat as content + } + return true; + }) ?? ""; + + // If we got an init event but nothing else, give a specific message that + // names the model so it is easier to diagnose (e.g. unsupported model, + // missing API credentials). + const initOnlyOutput = stdout.trim() !== "" && parsedStream.model !== "" && !firstContentLine; + const modelHint = parsedStream.model ? ` (model: ${parsedStream.model})` : ""; + return { exitCode, signal: null, timedOut: false, errorMessage: exitCode === 0 ? "Failed to parse Claude JSON output" - : stderrLine - ? `Claude exited with code ${exitCode ?? -1}: ${stderrLine}` - : `Claude exited with code ${exitCode ?? -1}`, + : initOnlyOutput + ? `Claude started but did not produce a result${modelHint} — check API credentials, model support, and adapter config` + : firstContentLine + ? `Claude exited with code ${exitCode ?? -1}: ${firstContentLine}` + : `Claude exited with code ${exitCode ?? -1}`, resultJson: { stdout }, }; } -- 2.52.0 From 8c8c2f2ec01ab0ff63298b9f19a48b27d5aa7217 Mon Sep 17 00:00:00 2001 From: Test User Date: Wed, 22 Apr 2026 19:42:57 +0000 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20address=20review=20nits=20=E2=80=94?= =?UTF-8?q?=20refactor=20fallbacks,=20add=20unit=20tests=20(FAR-122)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Merge both one-shot log fallbacks into a single conditional block using a cheap string-scan guard (`stdout.includes('"type":"result"')`) to avoid calling parseClaudeStreamJson twice and prevent double readPodLogs calls when the first fallback already ran. - Extract error-message logic into `buildPartialRunError(exitCode, model, stdout)` (exported for tests) so the `!parsed` branch is a one-liner and the logic is independently testable. - Export `isK8s404` for tests. - Add execute.test.ts with 15 unit tests covering: - isK8s404: v0.x response.statusCode, v1.0+ response.status, direct statusCode, message-based detection, non-404 codes - buildPartialRunError: exitCode=0 path, empty stdout, init-only output (model surfaced), first non-system content line, null exitCode (-1), multiple consecutive system events Co-Authored-By: Paperclip --- src/server/execute.test.ts | 108 ++++++++++++++++++++++++++++++++++++ src/server/execute.ts | 109 +++++++++++++++++++++---------------- 2 files changed, 169 insertions(+), 48 deletions(-) create mode 100644 src/server/execute.test.ts diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts new file mode 100644 index 0000000..24e924a --- /dev/null +++ b/src/server/execute.test.ts @@ -0,0 +1,108 @@ +import { describe, it, expect } from "vitest"; +import { isK8s404, buildPartialRunError } from "./execute.js"; + +describe("isK8s404", () => { + it("returns false for non-Error values", () => { + expect(isK8s404(null)).toBe(false); + expect(isK8s404(undefined)).toBe(false); + expect(isK8s404("string error")).toBe(false); + expect(isK8s404(404)).toBe(false); + }); + + it("returns false for unrelated errors", () => { + expect(isK8s404(new Error("something went wrong"))).toBe(false); + expect(isK8s404(new Error("HTTP-Code: 500 Message: Internal Server Error"))).toBe(false); + }); + + it("detects 404 from v1.0+ message format", () => { + const err = new Error("HTTP-Code: 404 Message: Unknown API Status Code! Body: ..."); + expect(isK8s404(err)).toBe(true); + }); + + it("detects 404 from v0.x response.statusCode", () => { + const err = Object.assign(new Error("Not Found"), { + response: { statusCode: 404 }, + }); + expect(isK8s404(err)).toBe(true); + }); + + it("detects 404 from v1.0+ response.status", () => { + const err = Object.assign(new Error("Not Found"), { + response: { status: 404 }, + }); + expect(isK8s404(err)).toBe(true); + }); + + it("detects 404 from direct statusCode property", () => { + const err = Object.assign(new Error("Not Found"), { statusCode: 404 }); + expect(isK8s404(err)).toBe(true); + }); + + it("does not match non-404 status codes on response", () => { + const err = Object.assign(new Error("Forbidden"), { + response: { statusCode: 403 }, + }); + expect(isK8s404(err)).toBe(false); + }); +}); + +describe("buildPartialRunError", () => { + const initLine = JSON.stringify({ + type: "system", + subtype: "init", + model: "claude-sonnet-4-6", + session_id: "sess_abc", + }); + + it("returns parse-failure message when exitCode is 0", () => { + expect(buildPartialRunError(0, "", "")).toBe("Failed to parse Claude JSON output"); + expect(buildPartialRunError(0, "claude-sonnet-4-6", initLine)).toBe( + "Failed to parse Claude JSON output", + ); + }); + + it("returns generic exit message when stdout is empty", () => { + expect(buildPartialRunError(1, "", "")).toBe("Claude exited with code 1"); + expect(buildPartialRunError(null, "", "")).toBe("Claude exited with code -1"); + }); + + it("skips system/init events and returns generic message when only init captured", () => { + const msg = buildPartialRunError(1, "claude-sonnet-4-6", initLine); + expect(msg).toBe( + "Claude started but did not produce a result (model: claude-sonnet-4-6) — check API credentials, model support, and adapter config", + ); + }); + + it("includes model from parsedStream when stdout is init-only", () => { + const msg = buildPartialRunError(null, "MiniMax-M2.7", initLine); + expect(msg).toContain("MiniMax-M2.7"); + expect(msg).not.toContain("type"); + expect(msg).not.toContain("system"); + }); + + it("uses first non-system line as content when present", () => { + const stdout = [initLine, "Error: no API key configured"].join("\n"); + const msg = buildPartialRunError(1, "claude-sonnet-4-6", stdout); + expect(msg).toBe("Claude exited with code 1: Error: no API key configured"); + }); + + it("uses first non-system JSON event as content", () => { + const resultLike = JSON.stringify({ type: "result", subtype: "error", result: "rate limit" }); + const stdout = [initLine, resultLike].join("\n"); + const msg = buildPartialRunError(2, "claude-sonnet-4-6", stdout); + expect(msg).toContain("rate limit"); + expect(msg).toContain("code 2"); + }); + + it("null exitCode renders as -1 in message", () => { + const msg = buildPartialRunError(null, "", "Some plain error text"); + expect(msg).toBe("Claude exited with code -1: Some plain error text"); + }); + + it("skips multiple consecutive system events", () => { + const anotherSystem = JSON.stringify({ type: "system", subtype: "other" }); + const stdout = [initLine, anotherSystem, "real error line"].join("\n"); + const msg = buildPartialRunError(1, "model-x", stdout); + expect(msg).toBe("Claude exited with code 1: real error line"); + }); +}); diff --git a/src/server/execute.ts b/src/server/execute.ts index 558a5b7..ea8c282 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -19,8 +19,9 @@ const MAX_LOG_RECONNECT_ATTEMPTS = 50; /** * Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node. * Works for both v0.x (response.statusCode) and v1.0+ (response.status, message). + * Exported for unit tests. */ -function isK8s404(err: unknown): boolean { +export function isK8s404(err: unknown): boolean { if (!(err instanceof Error)) return false; const e = err as unknown as Record; const resp = e.response as Record | undefined; @@ -29,6 +30,45 @@ function isK8s404(err: unknown): boolean { return /HTTP-Code:\s*404\b/.test(err.message); } +/** + * Build the error message when Claude's stdout contains no result event. + * Skips system/init event lines so the UI doesn't display the raw init JSON. + * Exported for unit tests. + */ +export function buildPartialRunError( + exitCode: number | null, + model: string, + stdout: string, +): string { + if (exitCode === 0) return "Failed to parse Claude JSON output"; + + // Walk stdout lines, skip system events, return the first real content line. + const firstContentLine = stdout.split(/\r?\n/) + .map((l) => l.trim()) + .find((l) => { + if (!l) return false; + try { + const obj = JSON.parse(l); + if (typeof obj === "object" && obj !== null && (obj as Record).type === "system") return false; + } catch { + // not JSON — treat as content + } + return true; + }) ?? ""; + + // If we only have system/init events and nothing else, surface the model + // name so the operator can diagnose missing credentials or unsupported model. + const initOnlyOutput = stdout.trim() !== "" && model !== "" && !firstContentLine; + if (initOnlyOutput) { + const modelHint = model ? ` (model: ${model})` : ""; + return `Claude started but did not produce a result${modelHint} — check API credentials, model support, and adapter config`; + } + + return firstContentLine + ? `Claude exited with code ${exitCode ?? -1}: ${firstContentLine}` + : `Claude exited with code ${exitCode ?? -1}`; +} + /** * Wait for the Job's pod to reach a terminal or running state. * Returns the pod name once logs can be streamed, or throws on failure. @@ -640,26 +680,27 @@ export async function execute(ctx: AdapterExecutionContext): Promise stdout.length) { + const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath); + if (!stdout.trim() && oneShotLogs.trim()) { + stdout = oneShotLogs; + await onLog("stdout", stdout); + } else if (oneShotLogs && oneShotLogs.length > stdout.length) { await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`); - stdout = fullLogs; + stdout = oneShotLogs; } } @@ -752,39 +793,11 @@ export async function execute(ctx: AdapterExecutionContext): Promise l.trim()) - .find((l) => { - if (!l) return false; - try { - const obj = JSON.parse(l); - if (typeof obj === "object" && obj !== null && (obj as Record).type === "system") return false; - } catch { - // not JSON — treat as content - } - return true; - }) ?? ""; - - // If we got an init event but nothing else, give a specific message that - // names the model so it is easier to diagnose (e.g. unsupported model, - // missing API credentials). - const initOnlyOutput = stdout.trim() !== "" && parsedStream.model !== "" && !firstContentLine; - const modelHint = parsedStream.model ? ` (model: ${parsedStream.model})` : ""; - return { exitCode, signal: null, timedOut: false, - errorMessage: exitCode === 0 - ? "Failed to parse Claude JSON output" - : initOnlyOutput - ? `Claude started but did not produce a result${modelHint} — check API credentials, model support, and adapter config` - : firstContentLine - ? `Claude exited with code ${exitCode ?? -1}: ${firstContentLine}` - : `Claude exited with code ${exitCode ?? -1}`, + errorMessage: buildPartialRunError(exitCode, parsedStream.model, stdout), resultJson: { stdout }, }; } -- 2.52.0 From 5e01ae99b3fb45b38940e40533f9546eb33537e8 Mon Sep 17 00:00:00 2001 From: Test User Date: Wed, 22 Apr 2026 19:49:04 +0000 Subject: [PATCH 3/3] fix: dedup replayed K8s log lines at the streaming UI boundary (FAR-123) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The K8s log follow stream replays the trailing few seconds of output on every reconnect because `sinceSeconds` uses integer-second granularity with a small safety buffer. FAR-105 dedupped those replays at the final parser (parse.ts), but the streaming UI consumes raw onLog chunks and still showed each replayed assistant/tool event as a fresh entry — which is how the duplicate "Three nits to fix…" blocks in the screenshot appeared between successive tool calls. Fix: add a stateful line-level dedup filter around onLog, shared across reconnects. Claude stream-json events are keyed by their stable structural IDs (message.id, tool_use_id, session_id); non-JSON output (paperclip status lines, shell output) passes through unchanged. - New `src/server/log-dedup.ts` + tests: LogLineDedupFilter handles chunk-to-line buffering, replay dedup, and end-of-stream flush. - `streamPodLogs` instantiates one filter per run so dedup state persists across reconnect attempts. Co-Authored-By: Paperclip --- src/server/execute.ts | 19 +++- src/server/log-dedup.test.ts | 173 +++++++++++++++++++++++++++++++++++ src/server/log-dedup.ts | 146 +++++++++++++++++++++++++++++ 3 files changed, 336 insertions(+), 2 deletions(-) create mode 100644 src/server/log-dedup.test.ts create mode 100644 src/server/log-dedup.ts diff --git a/src/server/execute.ts b/src/server/execute.ts index ea8c282..868002a 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -8,6 +8,7 @@ import { } from "./parse.js"; import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js"; import { buildJobManifest } from "./job-manifest.js"; +import { LogLineDedupFilter } from "./log-dedup.js"; import type * as k8s from "@kubernetes/client-node"; import { Writable } from "node:stream"; @@ -190,6 +191,7 @@ async function streamPodLogsOnce( onLog: AdapterExecutionContext["onLog"], kubeconfigPath?: string, sinceSeconds?: number, + dedup?: LogLineDedupFilter, ): Promise { const logApi = getLogApi(kubeconfigPath); const chunks: string[] = []; @@ -198,7 +200,12 @@ async function streamPodLogsOnce( write(chunk: Buffer, _encoding, callback) { const text = chunk.toString("utf-8"); chunks.push(text); - void onLog("stdout", text).then(() => callback(), callback); + const emitted = dedup ? dedup.filter(text) : text; + if (!emitted) { + callback(); + return; + } + void onLog("stdout", emitted).then(() => callback(), callback); }, }); @@ -238,6 +245,9 @@ async function streamPodLogs( // reconnects use a tight window instead of an ever-growing one anchored // at stream start. This is the primary fix for FAR-105 duplicative logs. let lastLogReceivedAt = Math.floor(Date.now() / 1000); + // Shared across reconnects so replayed lines inside the `sinceSeconds` + // overlap window are dropped before they reach the streaming UI (FAR-123). + const dedup = new LogLineDedupFilter(); while (!stopSignal?.stopped) { if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) { @@ -257,7 +267,7 @@ async function streamPodLogs( } const preStreamTs = Math.floor(Date.now() / 1000); - const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds); + const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup); if (result) { allChunks.push(result); // Update last-received timestamp to now (the stream just ended, @@ -277,6 +287,11 @@ async function streamPodLogs( await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS)); } + // Flush any buffered partial line so the final assistant/result chunk + // isn't dropped when the stream ends mid-line. + const tail = dedup.flush(); + if (tail) await onLog("stdout", tail); + return allChunks.join(""); } diff --git a/src/server/log-dedup.test.ts b/src/server/log-dedup.test.ts new file mode 100644 index 0000000..5c497f6 --- /dev/null +++ b/src/server/log-dedup.test.ts @@ -0,0 +1,173 @@ +import { describe, it, expect } from "vitest"; +import { LogLineDedupFilter, eventDedupKey } from "./log-dedup.js"; + +function assistantEvent(id: string, text: string): string { + return JSON.stringify({ + type: "assistant", + session_id: "sess_1", + message: { + id, + content: [{ type: "text", text }], + }, + }); +} + +function userToolResultEvent(toolUseId: string, content: string): string { + return JSON.stringify({ + type: "user", + session_id: "sess_1", + message: { + content: [{ type: "tool_result", tool_use_id: toolUseId, content }], + }, + }); +} + +function systemInitEvent(sessionId: string): string { + return JSON.stringify({ + type: "system", + subtype: "init", + session_id: sessionId, + model: "claude-opus-4-7", + }); +} + +function resultEvent(sessionId: string): string { + return JSON.stringify({ + type: "result", + subtype: "success", + session_id: sessionId, + result: "done", + total_cost_usd: 0.01, + usage: { input_tokens: 1, output_tokens: 1, cache_read_input_tokens: 0 }, + }); +} + +describe("eventDedupKey", () => { + it("keys assistant events by message.id", () => { + const key = eventDedupKey(JSON.parse(assistantEvent("msg_abc", "hi"))); + expect(key).toBe("assistant:msg_abc"); + }); + + it("keys user tool_result events by tool_use_id", () => { + const key = eventDedupKey(JSON.parse(userToolResultEvent("toolu_1", "ok"))); + expect(key).toBe("user:tool_result:toolu_1"); + }); + + it("keys system init events by session_id", () => { + const key = eventDedupKey(JSON.parse(systemInitEvent("sess_xyz"))); + expect(key).toBe("system:init:sess_xyz"); + }); + + it("keys result events by session_id", () => { + const key = eventDedupKey(JSON.parse(resultEvent("sess_xyz"))); + expect(key).toBe("result:sess_xyz"); + }); + + it("returns null for assistant events missing message.id", () => { + const event = { type: "assistant", message: { content: [] } }; + expect(eventDedupKey(event)).toBeNull(); + }); + + it("returns null for unknown event types", () => { + expect(eventDedupKey({ type: "unknown" })).toBeNull(); + expect(eventDedupKey({})).toBeNull(); + }); +}); + +describe("LogLineDedupFilter", () => { + it("passes unique lines through unchanged", () => { + const filter = new LogLineDedupFilter(); + const a = assistantEvent("msg_1", "hello"); + const b = assistantEvent("msg_2", "world"); + expect(filter.filter(`${a}\n${b}\n`)).toBe(`${a}\n${b}\n`); + }); + + it("drops assistant events replayed with the same message.id", () => { + const filter = new LogLineDedupFilter(); + const a = assistantEvent("msg_1", "Three nits to fix."); + filter.filter(`${a}\n`); + expect(filter.filter(`${a}\n`)).toBe(""); + }); + + it("drops user tool_result events replayed with the same tool_use_id", () => { + const filter = new LogLineDedupFilter(); + const a = userToolResultEvent("toolu_abc", "file contents"); + filter.filter(`${a}\n`); + expect(filter.filter(`${a}\n`)).toBe(""); + }); + + it("drops system init and result events on replay", () => { + const filter = new LogLineDedupFilter(); + const init = systemInitEvent("sess_1"); + const result = resultEvent("sess_1"); + filter.filter(`${init}\n${result}\n`); + expect(filter.filter(`${init}\n${result}\n`)).toBe(""); + }); + + it("buffers incomplete trailing lines across chunks", () => { + const filter = new LogLineDedupFilter(); + const line = assistantEvent("msg_1", "hello"); + const mid = Math.floor(line.length / 2); + const out1 = filter.filter(line.slice(0, mid)); + const out2 = filter.filter(line.slice(mid) + "\n"); + expect(out1).toBe(""); + expect(out2).toBe(`${line}\n`); + }); + + it("flush() emits a final incomplete line that was not replayed", () => { + const filter = new LogLineDedupFilter(); + const line = assistantEvent("msg_tail", "no newline"); + filter.filter(line); + expect(filter.flush()).toBe(line); + }); + + it("flush() drops an incomplete line that was already seen with a newline", () => { + const filter = new LogLineDedupFilter(); + const line = assistantEvent("msg_same", "x"); + filter.filter(`${line}\n`); + filter.filter(line); + expect(filter.flush()).toBe(""); + }); + + it("passes non-JSON lines through every time (does not dedup paperclip status)", () => { + const filter = new LogLineDedupFilter(); + const status = "[paperclip] keepalive — job foo running\n"; + expect(filter.filter(status)).toBe(status); + expect(filter.filter(status)).toBe(status); + }); + + it("dedups structurally identical JSON with identical content (raw fallback)", () => { + const filter = new LogLineDedupFilter(); + // No recognized type → raw fallback key. + const line = JSON.stringify({ foo: "bar", baz: 1 }); + filter.filter(`${line}\n`); + expect(filter.filter(`${line}\n`)).toBe(""); + }); + + it("handles multiple complete lines in a single chunk with partial trailing", () => { + const filter = new LogLineDedupFilter(); + const a = assistantEvent("msg_a", "a"); + const b = assistantEvent("msg_b", "b"); + const c = assistantEvent("msg_c", "c"); + // a and b are complete, c is partial (no trailing newline). + const out = filter.filter(`${a}\n${b}\n${c}`); + expect(out).toBe(`${a}\n${b}\n`); + // Completing c later should emit exactly c. + expect(filter.filter("\n")).toBe(`${c}\n`); + }); + + it("drops the classic FAR-123 replay scenario across reconnects", () => { + const filter = new LogLineDedupFilter(); + const assistantNits = assistantEvent("msg_nits", "Three nits to fix. Let me look at an existing test file..."); + const assistantWrite = assistantEvent("msg_write", "Now I need to write unit tests"); + // First stream attempt emits both events. + const out1 = filter.filter(`${assistantNits}\n${assistantWrite}\n`); + expect(out1).toBe(`${assistantNits}\n${assistantWrite}\n`); + // Reconnect replays both within the sinceSeconds overlap — filter should drop them. + const out2 = filter.filter(`${assistantNits}\n${assistantWrite}\n`); + expect(out2).toBe(""); + // And a genuinely new event after the replay should still pass through. + const assistantFresh = assistantEvent("msg_fresh", "next turn"); + expect(filter.filter(`${assistantFresh}\n`)).toBe(`${assistantFresh}\n`); + }); +}); diff --git a/src/server/log-dedup.ts b/src/server/log-dedup.ts new file mode 100644 index 0000000..31a318a --- /dev/null +++ b/src/server/log-dedup.ts @@ -0,0 +1,146 @@ +/** + * Line-level dedup filter for the K8s log stream. + * + * The K8s log follow stream can reconnect with an overlapping `sinceSeconds` + * window (integer-second granularity + a safety buffer), which replays a few + * seconds of recent output on every reconnect. Without dedup those replayed + * lines appear as duplicate events in the streaming UI — the same assistant + * text block shows up between every subsequent tool call (FAR-123). + * + * The filter operates at the chunk → line level: chunks are split on `\n`, + * incomplete trailing content is buffered until the next chunk, and each + * complete line is emitted at most once. JSON-shaped Claude stream-json + * events are keyed by their stable structural IDs; non-JSON lines pass + * through unchanged so genuinely-repeated status lines are not swallowed. + */ + +type Parsed = Record; + +function asString(value: unknown): string { + return typeof value === "string" ? value : ""; +} + +function asRecord(value: unknown): Parsed | null { + if (typeof value !== "object" || value === null || Array.isArray(value)) return null; + return value as Parsed; +} + +/** + * Build a stable dedup key for a Claude stream-json event. Returns `null` + * when the event is not a recognized Claude event — those lines fall back to + * raw-content hashing so non-JSON output (paperclip status lines, shell + * output) is never deduped by identity. + */ +export function eventDedupKey(event: Parsed): string | null { + const type = asString(event.type); + + if (type === "system") { + const subtype = asString(event.subtype); + const sessionId = asString(event.session_id); + if (subtype === "init" && sessionId) return `system:init:${sessionId}`; + return null; + } + + if (type === "assistant") { + const message = asRecord(event.message); + const id = message ? asString(message.id) : ""; + if (id) return `assistant:${id}`; + return null; + } + + if (type === "user") { + const message = asRecord(event.message); + const content = message && Array.isArray(message.content) ? message.content : []; + const toolUseIds: string[] = []; + for (const entry of content) { + const block = asRecord(entry); + if (!block) continue; + const toolUseId = asString(block.tool_use_id); + if (toolUseId) toolUseIds.push(toolUseId); + } + if (toolUseIds.length > 0) return `user:tool_result:${toolUseIds.join(",")}`; + return null; + } + + if (type === "result") { + const sessionId = asString(event.session_id); + return sessionId ? `result:${sessionId}` : "result:unknown"; + } + + return null; +} + +/** + * Stateful line-level dedup filter. Emits `filter(chunk)` output through + * the caller — preserves original chunk formatting (including trailing + * newlines) for lines that pass the dedup check. + */ +export class LogLineDedupFilter { + private buffer = ""; + private readonly seenKeys = new Set(); + + /** + * Process a chunk and return the subset that should be forwarded. + * Incomplete trailing content (no terminating newline) is buffered and + * emitted on the next chunk that completes the line (or on flush()). + */ + filter(chunk: string): string { + if (!chunk) return ""; + const combined = this.buffer + chunk; + const endsWithNewline = combined.endsWith("\n"); + const parts = combined.split("\n"); + + if (endsWithNewline) { + // Discard the final empty element — last line was complete. + parts.pop(); + this.buffer = ""; + } else { + // Last element is an incomplete line — hold it for the next chunk. + this.buffer = parts.pop() ?? ""; + } + + const out: string[] = []; + for (const line of parts) { + if (this.shouldEmit(line)) out.push(line); + } + if (out.length === 0) return ""; + return out.join("\n") + "\n"; + } + + /** + * Flush any incomplete trailing content. Called when the stream ends + * without a terminating newline so the final partial line isn't lost. + */ + flush(): string { + const pending = this.buffer; + this.buffer = ""; + if (!pending) return ""; + return this.shouldEmit(pending) ? pending : ""; + } + + private shouldEmit(line: string): boolean { + const trimmed = line.trim(); + if (!trimmed) return true; + + // Only attempt dedup on JSON-shaped lines; pass shell/text output through. + if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true; + + let parsed: unknown; + try { + parsed = JSON.parse(trimmed); + } catch { + return true; + } + + const event = asRecord(parsed); + if (!event) return true; + + // Recognized Claude stream-json event → structural key. + const structuralKey = eventDedupKey(event); + const key = structuralKey ?? `raw:${trimmed}`; + + if (this.seenKeys.has(key)) return false; + this.seenKeys.add(key); + return true; + } +} -- 2.52.0