From b60765785b36d95afb327941c5f545bbda58c861 Mon Sep 17 00:00:00 2001 From: Chris Farhood Date: Fri, 24 Apr 2026 17:26:37 +0000 Subject: [PATCH] feat: format Claude stream-json events in K8s streaming path for consistency with claude_local (FAR-32) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All output sent to Paperclip via onLog now passes through formatClaudeStreamLine, converting raw stream-json blobs into human-readable text consistent with how the CLI and claude_local adapter format events. Changes: - format-event.ts: add formatClaudeStreamLine(raw) -> string | null Plain-text equivalent of printClaudeStreamEvent — no ANSI colours, returns null for lines to suppress (assistant with no content, unknown events). Handles: system/init, assistant (text/thinking/tool_use), user (tool_result), result (summary + tokens), rate_limit_event. Non-JSON lines pass through. - execute.ts: wire formatClaudeStreamLine into streamPodLogsOnce write handler. raw chunks still stored in 'chunks[]' for parseClaudeStreamJson; only the onLog path receives formatted text. - 12 new tests for formatClaudeStreamLine covering all event types. - 352/352 tests pass. Co-Authored-By: Paperclip --- src/cli/format-event.test.ts | 102 +++++++++++++++++++++++++- src/cli/format-event.ts | 137 +++++++++++++++++++++++++++++++++-- src/cli/index.ts | 2 +- src/index.ts | 2 +- src/server/execute.ts | 11 ++- 5 files changed, 243 insertions(+), 11 deletions(-) diff --git a/src/cli/format-event.test.ts b/src/cli/format-event.test.ts index 902cdeb..821b6ad 100644 --- a/src/cli/format-event.test.ts +++ b/src/cli/format-event.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, vi, beforeEach } from "vitest"; -import { printClaudeStreamEvent } from "./format-event.js"; +import { printClaudeStreamEvent, formatClaudeStreamLine } from "./format-event.js"; // Mock console methods to capture output const consoleMock = { @@ -181,3 +181,103 @@ describe("printClaudeStreamEvent", () => { expect(output()).toContain("stuff"); }); }); + +describe("formatClaudeStreamLine", () => { + it("returns null for empty/blank lines", () => { + expect(formatClaudeStreamLine("")).toBeNull(); + expect(formatClaudeStreamLine(" ")).toBeNull(); + }); + + it("returns raw text for non-JSON lines (adapter status messages pass through)", () => { + expect(formatClaudeStreamLine("[paperclip] Pod running: pod-abc")).toBe("[paperclip] Pod running: pod-abc"); + expect(formatClaudeStreamLine("Error: disk full")).toBe("Error: disk full"); + }); + + it("formats system/init event", () => { + const result = formatClaudeStreamLine(JSON.stringify({ + type: "system", subtype: "init", model: "claude-opus-4-7", session_id: "sess_abc", + })); + expect(result).toContain("Claude initialized"); + expect(result).toContain("claude-opus-4-7"); + expect(result).toContain("sess_abc"); + expect(result).not.toContain("{"); + }); + + it("formats assistant text block", () => { + const result = formatClaudeStreamLine(JSON.stringify({ + type: "assistant", + message: { content: [{ type: "text", text: "Hello world" }] }, + })); + expect(result).toBe("assistant: Hello world"); + }); + + it("formats assistant thinking block", () => { + const result = formatClaudeStreamLine(JSON.stringify({ + type: "assistant", + message: { content: [{ type: "thinking", thinking: "Let me think..." }] }, + })); + expect(result).toBe("thinking: Let me think..."); + }); + + it("formats assistant tool_use block", () => { + const result = formatClaudeStreamLine(JSON.stringify({ + type: "assistant", + message: { content: [{ type: "tool_use", name: "Bash", input: { command: "ls" } }] }, + })); + expect(result).toContain("tool_call: Bash"); + expect(result).toContain("ls"); + }); + + it("returns null for assistant with no printable content (thinking-only with empty text)", () => { + const result = formatClaudeStreamLine(JSON.stringify({ + type: "assistant", + message: { content: [{ type: "thinking", thinking: "" }] }, + })); + expect(result).toBeNull(); + }); + + it("formats user tool_result", () => { + const result = formatClaudeStreamLine(JSON.stringify({ + type: "user", + message: { content: [{ type: "tool_result", content: "file1.txt\nfile2.txt" }] }, + })); + expect(result).toContain("tool_result"); + expect(result).toContain("file1.txt"); + }); + + it("formats user tool_result error", () => { + const result = formatClaudeStreamLine(JSON.stringify({ + type: "user", + message: { content: [{ type: "tool_result", is_error: true, content: "Permission denied" }] }, + })); + expect(result).toContain("tool_result (error)"); + expect(result).toContain("Permission denied"); + }); + + it("formats result event with tokens and cost", () => { + const result = formatClaudeStreamLine(JSON.stringify({ + type: "result", result: "Done", subtype: "stop", total_cost_usd: 0.005, + usage: { input_tokens: 100, output_tokens: 200, cache_read_input_tokens: 50 }, + })); + expect(result).toContain("result:"); + expect(result).toContain("Done"); + expect(result).toContain("in=100"); + expect(result).toContain("out=200"); + expect(result).toContain("cached=50"); + }); + + it("formats rate_limit_event (FAR-32 repro)", () => { + const result = formatClaudeStreamLine(JSON.stringify({ + type: "rate_limit_event", + rate_limit_info: { status: "allowed", resetsAt: 1777056000, rateLimitType: "five_hour" }, + })); + expect(result).toContain("rate_limit:"); + expect(result).toContain("five_hour"); + expect(result).toContain("allowed"); + expect(result).not.toContain("{"); + }); + + it("returns null for unknown event types", () => { + expect(formatClaudeStreamLine(JSON.stringify({ type: "unknown_event", data: "x" }))).toBeNull(); + }); +}); diff --git a/src/cli/format-event.ts b/src/cli/format-event.ts index 9cbe191..245105d 100644 --- a/src/cli/format-event.ts +++ b/src/cli/format-event.ts @@ -17,27 +17,150 @@ function asErrorText(value: unknown): string { } } -function printToolResult(block: Record): void { - const isError = block.is_error === true; - let text = ""; - if (typeof block.content === "string") { - text = block.content; - } else if (Array.isArray(block.content)) { +function extractToolResultText(block: Record): string { + if (typeof block.content === "string") return block.content; + if (Array.isArray(block.content)) { const parts: string[] = []; for (const part of block.content) { if (typeof part !== "object" || part === null || Array.isArray(part)) continue; const record = part as Record; if (typeof record.text === "string") parts.push(record.text); } - text = parts.join("\n"); + return parts.join("\n"); } + return ""; +} +function printToolResult(block: Record): void { + const isError = block.is_error === true; + const text = extractToolResultText(block); console.log((isError ? pc.red : pc.cyan)(`tool_result${isError ? " (error)" : ""}`)); if (text) { console.log((isError ? pc.red : pc.gray)(text)); } } +/** + * Format a single raw Claude stream-json line into a plain-text human-readable + * string (no ANSI colour codes) suitable for forwarding to the Paperclip server + * via onLog. Returns null for lines that should be suppressed (empty, + * assistant events with no printable content, etc.). Non-JSON lines are + * returned as-is so plain-text adapter status messages pass through unchanged. + * + * Mirrors the event coverage of printClaudeStreamEvent so the K8s server + * streaming path and the CLI display path produce consistent output. + */ +export function formatClaudeStreamLine(raw: string): string | null { + const line = raw.trim(); + if (!line) return null; + + let parsed: Record | null = null; + try { + parsed = JSON.parse(line) as Record; + } catch { + return line; + } + + const type = typeof parsed.type === "string" ? parsed.type : ""; + + if (type === "system" && parsed.subtype === "init") { + const model = typeof parsed.model === "string" ? parsed.model : "unknown"; + const sessionId = typeof parsed.session_id === "string" ? parsed.session_id : ""; + return `Claude initialized (model: ${model}${sessionId ? `, session: ${sessionId}` : ""})`; + } + + if (type === "assistant") { + const message = + typeof parsed.message === "object" && parsed.message !== null && !Array.isArray(parsed.message) + ? (parsed.message as Record) + : {}; + const content = Array.isArray(message.content) ? message.content : []; + const lines: string[] = []; + for (const blockRaw of content) { + if (typeof blockRaw !== "object" || blockRaw === null || Array.isArray(blockRaw)) continue; + const block = blockRaw as Record; + const blockType = typeof block.type === "string" ? block.type : ""; + if (blockType === "text") { + const text = typeof block.text === "string" ? block.text : ""; + if (text) lines.push(`assistant: ${text}`); + } else if (blockType === "thinking") { + const text = typeof block.thinking === "string" ? block.thinking : ""; + if (text) lines.push(`thinking: ${text}`); + } else if (blockType === "tool_use") { + const name = typeof block.name === "string" ? block.name : "unknown"; + lines.push(`tool_call: ${name}`); + if (block.input !== undefined) { + lines.push(JSON.stringify(block.input, null, 2)); + } + } + } + return lines.length > 0 ? lines.join("\n") : null; + } + + if (type === "user") { + const message = + typeof parsed.message === "object" && parsed.message !== null && !Array.isArray(parsed.message) + ? (parsed.message as Record) + : {}; + const content = Array.isArray(message.content) ? message.content : []; + const lines: string[] = []; + for (const blockRaw of content) { + if (typeof blockRaw !== "object" || blockRaw === null || Array.isArray(blockRaw)) continue; + const block = blockRaw as Record; + if (typeof block.type === "string" && block.type === "tool_result") { + const isError = block.is_error === true; + const text = extractToolResultText(block); + lines.push(`tool_result${isError ? " (error)" : ""}`); + if (text) lines.push(text); + } + } + return lines.length > 0 ? lines.join("\n") : null; + } + + if (type === "result") { + const usage = + typeof parsed.usage === "object" && parsed.usage !== null && !Array.isArray(parsed.usage) + ? (parsed.usage as Record) + : {}; + const input = Number(usage.input_tokens ?? 0); + const output = Number(usage.output_tokens ?? 0); + const cached = Number(usage.cache_read_input_tokens ?? 0); + const cost = Number(parsed.total_cost_usd ?? 0); + const subtype = typeof parsed.subtype === "string" ? parsed.subtype : ""; + const isError = parsed.is_error === true; + const resultText = typeof parsed.result === "string" ? parsed.result : ""; + const errors = Array.isArray(parsed.errors) ? parsed.errors.map(asErrorText).filter(Boolean) : []; + const lines: string[] = []; + if (resultText) { + lines.push("result:"); + lines.push(resultText); + } + if (subtype.startsWith("error") || isError || errors.length > 0) { + lines.push(`claude_result: subtype=${subtype || "unknown"} is_error=${isError ? "true" : "false"}`); + if (errors.length > 0) lines.push(`claude_errors: ${errors.join(" | ")}`); + } + lines.push(`tokens: in=${Number.isFinite(input) ? input : 0} out=${Number.isFinite(output) ? output : 0} cached=${Number.isFinite(cached) ? cached : 0} cost=$${Number.isFinite(cost) ? cost.toFixed(6) : "0.000000"}`); + return lines.join("\n"); + } + + if (type === "rate_limit_event") { + const info = + typeof parsed.rate_limit_info === "object" && parsed.rate_limit_info !== null + ? (parsed.rate_limit_info as Record) + : {}; + const limitType = typeof info.rateLimitType === "string" ? info.rateLimitType : "unknown"; + const status = typeof info.status === "string" ? info.status : "unknown"; + const resetsAt = typeof info.resetsAt === "number" + ? new Date(info.resetsAt * 1000).toISOString() + : ""; + const parts = [`rate_limit: type=${limitType} status=${status}`]; + if (resetsAt) parts.push(`resets=${resetsAt}`); + return parts.join(" "); + } + + return null; +} + export function printClaudeStreamEvent(raw: string, debug: boolean): void { const line = raw.trim(); if (!line) return; diff --git a/src/cli/index.ts b/src/cli/index.ts index 7c81ab6..c21e21b 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -1 +1 @@ -export { printClaudeStreamEvent } from "./format-event.js"; +export { printClaudeStreamEvent, formatClaudeStreamLine } from "./format-event.js"; diff --git a/src/index.ts b/src/index.ts index 0580420..b8611c1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -52,4 +52,4 @@ Notes: `; export { createServerAdapter } from "./server/index.js"; -export { printClaudeStreamEvent } from "./cli/index.js"; +export { printClaudeStreamEvent, formatClaudeStreamLine } from "./cli/index.js"; diff --git a/src/server/execute.ts b/src/server/execute.ts index 55ce0db..36cc0d1 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -19,6 +19,7 @@ import { import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js"; import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js"; import { LogLineDedupFilter } from "./log-dedup.js"; +import { formatClaudeStreamLine } from "../cli/format-event.js"; import type * as k8s from "@kubernetes/client-node"; import { Writable } from "node:stream"; @@ -353,13 +354,21 @@ export async function streamPodLogsOnce( const writable = new Writable({ write(chunk: Buffer, _encoding, callback) { const text = chunk.toString("utf-8"); + // Always store raw text — parseClaudeStreamJson needs the original + // stream-json lines to extract session IDs, usage, and result events. chunks.push(text); const emitted = dedup ? dedup.filter(text) : text; if (!emitted) { callback(); return; } - void onLog("stdout", emitted).then(() => callback(), callback); + // Format each stream-json event into human-readable text before the + // Paperclip server sees it, matching claude_local output style. + // Non-JSON lines (adapter status messages, plain errors) pass through. + const formatted = emitted.split("\n") + .map((line) => formatClaudeStreamLine(line) ?? "") + .join("\n"); + void onLog("stdout", formatted).then(() => callback(), callback); }, });