feat: format Claude stream-json events in K8s streaming path for consistency with claude_local (FAR-32)

All output sent to Paperclip via onLog now passes through formatClaudeStreamLine,
converting raw stream-json blobs into human-readable text consistent with how
the CLI and claude_local adapter format events.

Changes:
- format-event.ts: add formatClaudeStreamLine(raw) -> string | null
  Plain-text equivalent of printClaudeStreamEvent — no ANSI colours, returns
  null for lines to suppress (assistant with no content, unknown events).
  Handles: system/init, assistant (text/thinking/tool_use), user (tool_result),
  result (summary + tokens), rate_limit_event. Non-JSON lines pass through.
- execute.ts: wire formatClaudeStreamLine into streamPodLogsOnce write handler.
  raw chunks still stored in 'chunks[]' for parseClaudeStreamJson; only the
  onLog path receives formatted text.
- 12 new tests for formatClaudeStreamLine covering all event types.
- 352/352 tests pass.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-04-24 17:26:37 +00:00
parent 28d6451265
commit b60765785b
5 changed files with 243 additions and 11 deletions
+101 -1
View File
@@ -1,5 +1,5 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { printClaudeStreamEvent } from "./format-event.js";
import { printClaudeStreamEvent, formatClaudeStreamLine } from "./format-event.js";
// Mock console methods to capture output
const consoleMock = {
@@ -181,3 +181,103 @@ describe("printClaudeStreamEvent", () => {
expect(output()).toContain("stuff");
});
});
describe("formatClaudeStreamLine", () => {
it("returns null for empty/blank lines", () => {
expect(formatClaudeStreamLine("")).toBeNull();
expect(formatClaudeStreamLine(" ")).toBeNull();
});
it("returns raw text for non-JSON lines (adapter status messages pass through)", () => {
expect(formatClaudeStreamLine("[paperclip] Pod running: pod-abc")).toBe("[paperclip] Pod running: pod-abc");
expect(formatClaudeStreamLine("Error: disk full")).toBe("Error: disk full");
});
it("formats system/init event", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "system", subtype: "init", model: "claude-opus-4-7", session_id: "sess_abc",
}));
expect(result).toContain("Claude initialized");
expect(result).toContain("claude-opus-4-7");
expect(result).toContain("sess_abc");
expect(result).not.toContain("{");
});
it("formats assistant text block", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "assistant",
message: { content: [{ type: "text", text: "Hello world" }] },
}));
expect(result).toBe("assistant: Hello world");
});
it("formats assistant thinking block", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "assistant",
message: { content: [{ type: "thinking", thinking: "Let me think..." }] },
}));
expect(result).toBe("thinking: Let me think...");
});
it("formats assistant tool_use block", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "assistant",
message: { content: [{ type: "tool_use", name: "Bash", input: { command: "ls" } }] },
}));
expect(result).toContain("tool_call: Bash");
expect(result).toContain("ls");
});
it("returns null for assistant with no printable content (thinking-only with empty text)", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "assistant",
message: { content: [{ type: "thinking", thinking: "" }] },
}));
expect(result).toBeNull();
});
it("formats user tool_result", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "user",
message: { content: [{ type: "tool_result", content: "file1.txt\nfile2.txt" }] },
}));
expect(result).toContain("tool_result");
expect(result).toContain("file1.txt");
});
it("formats user tool_result error", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "user",
message: { content: [{ type: "tool_result", is_error: true, content: "Permission denied" }] },
}));
expect(result).toContain("tool_result (error)");
expect(result).toContain("Permission denied");
});
it("formats result event with tokens and cost", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "result", result: "Done", subtype: "stop", total_cost_usd: 0.005,
usage: { input_tokens: 100, output_tokens: 200, cache_read_input_tokens: 50 },
}));
expect(result).toContain("result:");
expect(result).toContain("Done");
expect(result).toContain("in=100");
expect(result).toContain("out=200");
expect(result).toContain("cached=50");
});
it("formats rate_limit_event (FAR-32 repro)", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "rate_limit_event",
rate_limit_info: { status: "allowed", resetsAt: 1777056000, rateLimitType: "five_hour" },
}));
expect(result).toContain("rate_limit:");
expect(result).toContain("five_hour");
expect(result).toContain("allowed");
expect(result).not.toContain("{");
});
it("returns null for unknown event types", () => {
expect(formatClaudeStreamLine(JSON.stringify({ type: "unknown_event", data: "x" }))).toBeNull();
});
});
+130 -7
View File
@@ -17,27 +17,150 @@ function asErrorText(value: unknown): string {
}
}
function printToolResult(block: Record<string, unknown>): void {
const isError = block.is_error === true;
let text = "";
if (typeof block.content === "string") {
text = block.content;
} else if (Array.isArray(block.content)) {
function extractToolResultText(block: Record<string, unknown>): string {
if (typeof block.content === "string") return block.content;
if (Array.isArray(block.content)) {
const parts: string[] = [];
for (const part of block.content) {
if (typeof part !== "object" || part === null || Array.isArray(part)) continue;
const record = part as Record<string, unknown>;
if (typeof record.text === "string") parts.push(record.text);
}
text = parts.join("\n");
return parts.join("\n");
}
return "";
}
function printToolResult(block: Record<string, unknown>): void {
const isError = block.is_error === true;
const text = extractToolResultText(block);
console.log((isError ? pc.red : pc.cyan)(`tool_result${isError ? " (error)" : ""}`));
if (text) {
console.log((isError ? pc.red : pc.gray)(text));
}
}
/**
* Format a single raw Claude stream-json line into a plain-text human-readable
* string (no ANSI colour codes) suitable for forwarding to the Paperclip server
* via onLog. Returns null for lines that should be suppressed (empty,
* assistant events with no printable content, etc.). Non-JSON lines are
* returned as-is so plain-text adapter status messages pass through unchanged.
*
* Mirrors the event coverage of printClaudeStreamEvent so the K8s server
* streaming path and the CLI display path produce consistent output.
*/
export function formatClaudeStreamLine(raw: string): string | null {
const line = raw.trim();
if (!line) return null;
let parsed: Record<string, unknown> | null = null;
try {
parsed = JSON.parse(line) as Record<string, unknown>;
} catch {
return line;
}
const type = typeof parsed.type === "string" ? parsed.type : "";
if (type === "system" && parsed.subtype === "init") {
const model = typeof parsed.model === "string" ? parsed.model : "unknown";
const sessionId = typeof parsed.session_id === "string" ? parsed.session_id : "";
return `Claude initialized (model: ${model}${sessionId ? `, session: ${sessionId}` : ""})`;
}
if (type === "assistant") {
const message =
typeof parsed.message === "object" && parsed.message !== null && !Array.isArray(parsed.message)
? (parsed.message as Record<string, unknown>)
: {};
const content = Array.isArray(message.content) ? message.content : [];
const lines: string[] = [];
for (const blockRaw of content) {
if (typeof blockRaw !== "object" || blockRaw === null || Array.isArray(blockRaw)) continue;
const block = blockRaw as Record<string, unknown>;
const blockType = typeof block.type === "string" ? block.type : "";
if (blockType === "text") {
const text = typeof block.text === "string" ? block.text : "";
if (text) lines.push(`assistant: ${text}`);
} else if (blockType === "thinking") {
const text = typeof block.thinking === "string" ? block.thinking : "";
if (text) lines.push(`thinking: ${text}`);
} else if (blockType === "tool_use") {
const name = typeof block.name === "string" ? block.name : "unknown";
lines.push(`tool_call: ${name}`);
if (block.input !== undefined) {
lines.push(JSON.stringify(block.input, null, 2));
}
}
}
return lines.length > 0 ? lines.join("\n") : null;
}
if (type === "user") {
const message =
typeof parsed.message === "object" && parsed.message !== null && !Array.isArray(parsed.message)
? (parsed.message as Record<string, unknown>)
: {};
const content = Array.isArray(message.content) ? message.content : [];
const lines: string[] = [];
for (const blockRaw of content) {
if (typeof blockRaw !== "object" || blockRaw === null || Array.isArray(blockRaw)) continue;
const block = blockRaw as Record<string, unknown>;
if (typeof block.type === "string" && block.type === "tool_result") {
const isError = block.is_error === true;
const text = extractToolResultText(block);
lines.push(`tool_result${isError ? " (error)" : ""}`);
if (text) lines.push(text);
}
}
return lines.length > 0 ? lines.join("\n") : null;
}
if (type === "result") {
const usage =
typeof parsed.usage === "object" && parsed.usage !== null && !Array.isArray(parsed.usage)
? (parsed.usage as Record<string, unknown>)
: {};
const input = Number(usage.input_tokens ?? 0);
const output = Number(usage.output_tokens ?? 0);
const cached = Number(usage.cache_read_input_tokens ?? 0);
const cost = Number(parsed.total_cost_usd ?? 0);
const subtype = typeof parsed.subtype === "string" ? parsed.subtype : "";
const isError = parsed.is_error === true;
const resultText = typeof parsed.result === "string" ? parsed.result : "";
const errors = Array.isArray(parsed.errors) ? parsed.errors.map(asErrorText).filter(Boolean) : [];
const lines: string[] = [];
if (resultText) {
lines.push("result:");
lines.push(resultText);
}
if (subtype.startsWith("error") || isError || errors.length > 0) {
lines.push(`claude_result: subtype=${subtype || "unknown"} is_error=${isError ? "true" : "false"}`);
if (errors.length > 0) lines.push(`claude_errors: ${errors.join(" | ")}`);
}
lines.push(`tokens: in=${Number.isFinite(input) ? input : 0} out=${Number.isFinite(output) ? output : 0} cached=${Number.isFinite(cached) ? cached : 0} cost=$${Number.isFinite(cost) ? cost.toFixed(6) : "0.000000"}`);
return lines.join("\n");
}
if (type === "rate_limit_event") {
const info =
typeof parsed.rate_limit_info === "object" && parsed.rate_limit_info !== null
? (parsed.rate_limit_info as Record<string, unknown>)
: {};
const limitType = typeof info.rateLimitType === "string" ? info.rateLimitType : "unknown";
const status = typeof info.status === "string" ? info.status : "unknown";
const resetsAt = typeof info.resetsAt === "number"
? new Date(info.resetsAt * 1000).toISOString()
: "";
const parts = [`rate_limit: type=${limitType} status=${status}`];
if (resetsAt) parts.push(`resets=${resetsAt}`);
return parts.join(" ");
}
return null;
}
export function printClaudeStreamEvent(raw: string, debug: boolean): void {
const line = raw.trim();
if (!line) return;
+1 -1
View File
@@ -1 +1 @@
export { printClaudeStreamEvent } from "./format-event.js";
export { printClaudeStreamEvent, formatClaudeStreamLine } from "./format-event.js";
+1 -1
View File
@@ -52,4 +52,4 @@ Notes:
`;
export { createServerAdapter } from "./server/index.js";
export { printClaudeStreamEvent } from "./cli/index.js";
export { printClaudeStreamEvent, formatClaudeStreamLine } from "./cli/index.js";
+10 -1
View File
@@ -19,6 +19,7 @@ import {
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import { formatClaudeStreamLine } from "../cli/format-event.js";
import type * as k8s from "@kubernetes/client-node";
import { Writable } from "node:stream";
@@ -353,13 +354,21 @@ export async function streamPodLogsOnce(
const writable = new Writable({
write(chunk: Buffer, _encoding, callback) {
const text = chunk.toString("utf-8");
// Always store raw text — parseClaudeStreamJson needs the original
// stream-json lines to extract session IDs, usage, and result events.
chunks.push(text);
const emitted = dedup ? dedup.filter(text) : text;
if (!emitted) {
callback();
return;
}
void onLog("stdout", emitted).then(() => callback(), callback);
// Format each stream-json event into human-readable text before the
// Paperclip server sees it, matching claude_local output style.
// Non-JSON lines (adapter status messages, plain errors) pass through.
const formatted = emitted.split("\n")
.map((line) => formatClaudeStreamLine(line) ?? "")
.join("\n");
void onLog("stdout", formatted).then(() => callback(), callback);
},
});