feat: format Claude stream-json events in K8s streaming path for consistency with claude_local (FAR-32)
All output sent to Paperclip via onLog now passes through formatClaudeStreamLine, converting raw stream-json blobs into human-readable text consistent with how the CLI and claude_local adapter format events. Changes: - format-event.ts: add formatClaudeStreamLine(raw) -> string | null Plain-text equivalent of printClaudeStreamEvent — no ANSI colours, returns null for lines to suppress (assistant with no content, unknown events). Handles: system/init, assistant (text/thinking/tool_use), user (tool_result), result (summary + tokens), rate_limit_event. Non-JSON lines pass through. - execute.ts: wire formatClaudeStreamLine into streamPodLogsOnce write handler. raw chunks still stored in 'chunks[]' for parseClaudeStreamJson; only the onLog path receives formatted text. - 12 new tests for formatClaudeStreamLine covering all event types. - 352/352 tests pass. Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
import { printClaudeStreamEvent } from "./format-event.js";
|
||||
import { printClaudeStreamEvent, formatClaudeStreamLine } from "./format-event.js";
|
||||
|
||||
// Mock console methods to capture output
|
||||
const consoleMock = {
|
||||
@@ -181,3 +181,103 @@ describe("printClaudeStreamEvent", () => {
|
||||
expect(output()).toContain("stuff");
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatClaudeStreamLine", () => {
|
||||
it("returns null for empty/blank lines", () => {
|
||||
expect(formatClaudeStreamLine("")).toBeNull();
|
||||
expect(formatClaudeStreamLine(" ")).toBeNull();
|
||||
});
|
||||
|
||||
it("returns raw text for non-JSON lines (adapter status messages pass through)", () => {
|
||||
expect(formatClaudeStreamLine("[paperclip] Pod running: pod-abc")).toBe("[paperclip] Pod running: pod-abc");
|
||||
expect(formatClaudeStreamLine("Error: disk full")).toBe("Error: disk full");
|
||||
});
|
||||
|
||||
it("formats system/init event", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "system", subtype: "init", model: "claude-opus-4-7", session_id: "sess_abc",
|
||||
}));
|
||||
expect(result).toContain("Claude initialized");
|
||||
expect(result).toContain("claude-opus-4-7");
|
||||
expect(result).toContain("sess_abc");
|
||||
expect(result).not.toContain("{");
|
||||
});
|
||||
|
||||
it("formats assistant text block", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "assistant",
|
||||
message: { content: [{ type: "text", text: "Hello world" }] },
|
||||
}));
|
||||
expect(result).toBe("assistant: Hello world");
|
||||
});
|
||||
|
||||
it("formats assistant thinking block", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "assistant",
|
||||
message: { content: [{ type: "thinking", thinking: "Let me think..." }] },
|
||||
}));
|
||||
expect(result).toBe("thinking: Let me think...");
|
||||
});
|
||||
|
||||
it("formats assistant tool_use block", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "assistant",
|
||||
message: { content: [{ type: "tool_use", name: "Bash", input: { command: "ls" } }] },
|
||||
}));
|
||||
expect(result).toContain("tool_call: Bash");
|
||||
expect(result).toContain("ls");
|
||||
});
|
||||
|
||||
it("returns null for assistant with no printable content (thinking-only with empty text)", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "assistant",
|
||||
message: { content: [{ type: "thinking", thinking: "" }] },
|
||||
}));
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("formats user tool_result", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "user",
|
||||
message: { content: [{ type: "tool_result", content: "file1.txt\nfile2.txt" }] },
|
||||
}));
|
||||
expect(result).toContain("tool_result");
|
||||
expect(result).toContain("file1.txt");
|
||||
});
|
||||
|
||||
it("formats user tool_result error", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "user",
|
||||
message: { content: [{ type: "tool_result", is_error: true, content: "Permission denied" }] },
|
||||
}));
|
||||
expect(result).toContain("tool_result (error)");
|
||||
expect(result).toContain("Permission denied");
|
||||
});
|
||||
|
||||
it("formats result event with tokens and cost", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "result", result: "Done", subtype: "stop", total_cost_usd: 0.005,
|
||||
usage: { input_tokens: 100, output_tokens: 200, cache_read_input_tokens: 50 },
|
||||
}));
|
||||
expect(result).toContain("result:");
|
||||
expect(result).toContain("Done");
|
||||
expect(result).toContain("in=100");
|
||||
expect(result).toContain("out=200");
|
||||
expect(result).toContain("cached=50");
|
||||
});
|
||||
|
||||
it("formats rate_limit_event (FAR-32 repro)", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "rate_limit_event",
|
||||
rate_limit_info: { status: "allowed", resetsAt: 1777056000, rateLimitType: "five_hour" },
|
||||
}));
|
||||
expect(result).toContain("rate_limit:");
|
||||
expect(result).toContain("five_hour");
|
||||
expect(result).toContain("allowed");
|
||||
expect(result).not.toContain("{");
|
||||
});
|
||||
|
||||
it("returns null for unknown event types", () => {
|
||||
expect(formatClaudeStreamLine(JSON.stringify({ type: "unknown_event", data: "x" }))).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
+130
-7
@@ -17,27 +17,150 @@ function asErrorText(value: unknown): string {
|
||||
}
|
||||
}
|
||||
|
||||
function printToolResult(block: Record<string, unknown>): void {
|
||||
const isError = block.is_error === true;
|
||||
let text = "";
|
||||
if (typeof block.content === "string") {
|
||||
text = block.content;
|
||||
} else if (Array.isArray(block.content)) {
|
||||
function extractToolResultText(block: Record<string, unknown>): string {
|
||||
if (typeof block.content === "string") return block.content;
|
||||
if (Array.isArray(block.content)) {
|
||||
const parts: string[] = [];
|
||||
for (const part of block.content) {
|
||||
if (typeof part !== "object" || part === null || Array.isArray(part)) continue;
|
||||
const record = part as Record<string, unknown>;
|
||||
if (typeof record.text === "string") parts.push(record.text);
|
||||
}
|
||||
text = parts.join("\n");
|
||||
return parts.join("\n");
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
function printToolResult(block: Record<string, unknown>): void {
|
||||
const isError = block.is_error === true;
|
||||
const text = extractToolResultText(block);
|
||||
console.log((isError ? pc.red : pc.cyan)(`tool_result${isError ? " (error)" : ""}`));
|
||||
if (text) {
|
||||
console.log((isError ? pc.red : pc.gray)(text));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Format a single raw Claude stream-json line into a plain-text human-readable
|
||||
* string (no ANSI colour codes) suitable for forwarding to the Paperclip server
|
||||
* via onLog. Returns null for lines that should be suppressed (empty,
|
||||
* assistant events with no printable content, etc.). Non-JSON lines are
|
||||
* returned as-is so plain-text adapter status messages pass through unchanged.
|
||||
*
|
||||
* Mirrors the event coverage of printClaudeStreamEvent so the K8s server
|
||||
* streaming path and the CLI display path produce consistent output.
|
||||
*/
|
||||
export function formatClaudeStreamLine(raw: string): string | null {
|
||||
const line = raw.trim();
|
||||
if (!line) return null;
|
||||
|
||||
let parsed: Record<string, unknown> | null = null;
|
||||
try {
|
||||
parsed = JSON.parse(line) as Record<string, unknown>;
|
||||
} catch {
|
||||
return line;
|
||||
}
|
||||
|
||||
const type = typeof parsed.type === "string" ? parsed.type : "";
|
||||
|
||||
if (type === "system" && parsed.subtype === "init") {
|
||||
const model = typeof parsed.model === "string" ? parsed.model : "unknown";
|
||||
const sessionId = typeof parsed.session_id === "string" ? parsed.session_id : "";
|
||||
return `Claude initialized (model: ${model}${sessionId ? `, session: ${sessionId}` : ""})`;
|
||||
}
|
||||
|
||||
if (type === "assistant") {
|
||||
const message =
|
||||
typeof parsed.message === "object" && parsed.message !== null && !Array.isArray(parsed.message)
|
||||
? (parsed.message as Record<string, unknown>)
|
||||
: {};
|
||||
const content = Array.isArray(message.content) ? message.content : [];
|
||||
const lines: string[] = [];
|
||||
for (const blockRaw of content) {
|
||||
if (typeof blockRaw !== "object" || blockRaw === null || Array.isArray(blockRaw)) continue;
|
||||
const block = blockRaw as Record<string, unknown>;
|
||||
const blockType = typeof block.type === "string" ? block.type : "";
|
||||
if (blockType === "text") {
|
||||
const text = typeof block.text === "string" ? block.text : "";
|
||||
if (text) lines.push(`assistant: ${text}`);
|
||||
} else if (blockType === "thinking") {
|
||||
const text = typeof block.thinking === "string" ? block.thinking : "";
|
||||
if (text) lines.push(`thinking: ${text}`);
|
||||
} else if (blockType === "tool_use") {
|
||||
const name = typeof block.name === "string" ? block.name : "unknown";
|
||||
lines.push(`tool_call: ${name}`);
|
||||
if (block.input !== undefined) {
|
||||
lines.push(JSON.stringify(block.input, null, 2));
|
||||
}
|
||||
}
|
||||
}
|
||||
return lines.length > 0 ? lines.join("\n") : null;
|
||||
}
|
||||
|
||||
if (type === "user") {
|
||||
const message =
|
||||
typeof parsed.message === "object" && parsed.message !== null && !Array.isArray(parsed.message)
|
||||
? (parsed.message as Record<string, unknown>)
|
||||
: {};
|
||||
const content = Array.isArray(message.content) ? message.content : [];
|
||||
const lines: string[] = [];
|
||||
for (const blockRaw of content) {
|
||||
if (typeof blockRaw !== "object" || blockRaw === null || Array.isArray(blockRaw)) continue;
|
||||
const block = blockRaw as Record<string, unknown>;
|
||||
if (typeof block.type === "string" && block.type === "tool_result") {
|
||||
const isError = block.is_error === true;
|
||||
const text = extractToolResultText(block);
|
||||
lines.push(`tool_result${isError ? " (error)" : ""}`);
|
||||
if (text) lines.push(text);
|
||||
}
|
||||
}
|
||||
return lines.length > 0 ? lines.join("\n") : null;
|
||||
}
|
||||
|
||||
if (type === "result") {
|
||||
const usage =
|
||||
typeof parsed.usage === "object" && parsed.usage !== null && !Array.isArray(parsed.usage)
|
||||
? (parsed.usage as Record<string, unknown>)
|
||||
: {};
|
||||
const input = Number(usage.input_tokens ?? 0);
|
||||
const output = Number(usage.output_tokens ?? 0);
|
||||
const cached = Number(usage.cache_read_input_tokens ?? 0);
|
||||
const cost = Number(parsed.total_cost_usd ?? 0);
|
||||
const subtype = typeof parsed.subtype === "string" ? parsed.subtype : "";
|
||||
const isError = parsed.is_error === true;
|
||||
const resultText = typeof parsed.result === "string" ? parsed.result : "";
|
||||
const errors = Array.isArray(parsed.errors) ? parsed.errors.map(asErrorText).filter(Boolean) : [];
|
||||
const lines: string[] = [];
|
||||
if (resultText) {
|
||||
lines.push("result:");
|
||||
lines.push(resultText);
|
||||
}
|
||||
if (subtype.startsWith("error") || isError || errors.length > 0) {
|
||||
lines.push(`claude_result: subtype=${subtype || "unknown"} is_error=${isError ? "true" : "false"}`);
|
||||
if (errors.length > 0) lines.push(`claude_errors: ${errors.join(" | ")}`);
|
||||
}
|
||||
lines.push(`tokens: in=${Number.isFinite(input) ? input : 0} out=${Number.isFinite(output) ? output : 0} cached=${Number.isFinite(cached) ? cached : 0} cost=$${Number.isFinite(cost) ? cost.toFixed(6) : "0.000000"}`);
|
||||
return lines.join("\n");
|
||||
}
|
||||
|
||||
if (type === "rate_limit_event") {
|
||||
const info =
|
||||
typeof parsed.rate_limit_info === "object" && parsed.rate_limit_info !== null
|
||||
? (parsed.rate_limit_info as Record<string, unknown>)
|
||||
: {};
|
||||
const limitType = typeof info.rateLimitType === "string" ? info.rateLimitType : "unknown";
|
||||
const status = typeof info.status === "string" ? info.status : "unknown";
|
||||
const resetsAt = typeof info.resetsAt === "number"
|
||||
? new Date(info.resetsAt * 1000).toISOString()
|
||||
: "";
|
||||
const parts = [`rate_limit: type=${limitType} status=${status}`];
|
||||
if (resetsAt) parts.push(`resets=${resetsAt}`);
|
||||
return parts.join(" ");
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
export function printClaudeStreamEvent(raw: string, debug: boolean): void {
|
||||
const line = raw.trim();
|
||||
if (!line) return;
|
||||
|
||||
+1
-1
@@ -1 +1 @@
|
||||
export { printClaudeStreamEvent } from "./format-event.js";
|
||||
export { printClaudeStreamEvent, formatClaudeStreamLine } from "./format-event.js";
|
||||
|
||||
+1
-1
@@ -52,4 +52,4 @@ Notes:
|
||||
`;
|
||||
|
||||
export { createServerAdapter } from "./server/index.js";
|
||||
export { printClaudeStreamEvent } from "./cli/index.js";
|
||||
export { printClaudeStreamEvent, formatClaudeStreamLine } from "./cli/index.js";
|
||||
|
||||
+10
-1
@@ -19,6 +19,7 @@ import {
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
|
||||
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
|
||||
import { LogLineDedupFilter } from "./log-dedup.js";
|
||||
import { formatClaudeStreamLine } from "../cli/format-event.js";
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import { Writable } from "node:stream";
|
||||
|
||||
@@ -353,13 +354,21 @@ export async function streamPodLogsOnce(
|
||||
const writable = new Writable({
|
||||
write(chunk: Buffer, _encoding, callback) {
|
||||
const text = chunk.toString("utf-8");
|
||||
// Always store raw text — parseClaudeStreamJson needs the original
|
||||
// stream-json lines to extract session IDs, usage, and result events.
|
||||
chunks.push(text);
|
||||
const emitted = dedup ? dedup.filter(text) : text;
|
||||
if (!emitted) {
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
void onLog("stdout", emitted).then(() => callback(), callback);
|
||||
// Format each stream-json event into human-readable text before the
|
||||
// Paperclip server sees it, matching claude_local output style.
|
||||
// Non-JSON lines (adapter status messages, plain errors) pass through.
|
||||
const formatted = emitted.split("\n")
|
||||
.map((line) => formatClaudeStreamLine(line) ?? "")
|
||||
.join("\n");
|
||||
void onLog("stdout", formatted).then(() => callback(), callback);
|
||||
},
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user