Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 357f035418 | |||
| f340ce52ee | |||
| ecc477d0be | |||
| f9ba77527a | |||
| f304c70899 | |||
| 727d9494da | |||
| b60765785b | |||
| 28d6451265 | |||
| cabdc3df98 | |||
| f9ff04a354 | |||
| e611f26d32 | |||
| f097440f3c |
Generated
+2
-2
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.38",
|
||||
"version": "0.1.42",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.38",
|
||||
"version": "0.1.42",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@kubernetes/client-node": "^1.0.0",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.38",
|
||||
"version": "0.1.43",
|
||||
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
|
||||
"license": "MIT",
|
||||
"repository": {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
import { printClaudeStreamEvent } from "./format-event.js";
|
||||
import { printClaudeStreamEvent, formatClaudeStreamLine } from "./format-event.js";
|
||||
|
||||
// Mock console methods to capture output
|
||||
const consoleMock = {
|
||||
@@ -138,6 +138,39 @@ describe("printClaudeStreamEvent", () => {
|
||||
expect(output()).toBe("some output text");
|
||||
});
|
||||
|
||||
it("prints rate_limit_event with type, status, and reset time", () => {
|
||||
printClaudeStreamEvent(JSON.stringify({
|
||||
type: "rate_limit_event",
|
||||
rate_limit_info: {
|
||||
status: "allowed",
|
||||
resetsAt: 1777056000,
|
||||
rateLimitType: "five_hour",
|
||||
overageStatus: "allowed",
|
||||
isUsingOverage: false,
|
||||
},
|
||||
uuid: "3ab8f9eb-b9d6-4bf6-9c39-4608427717fc",
|
||||
session_id: "ad5f3e11-3c0c-4144-b53d-d4b959e57cee",
|
||||
}), false);
|
||||
expect(output()).toContain("rate_limit:");
|
||||
expect(output()).toContain("five_hour");
|
||||
expect(output()).toContain("allowed");
|
||||
expect(output()).toContain("resets=");
|
||||
// Raw JSON must not be surfaced verbatim
|
||||
expect(output()).not.toContain("3ab8f9eb-b9d6-4bf6-9c39-4608427717fc");
|
||||
});
|
||||
|
||||
it("prints rate_limit_event with unknown fields gracefully", () => {
|
||||
printClaudeStreamEvent(JSON.stringify({
|
||||
type: "rate_limit_event",
|
||||
rate_limit_info: {},
|
||||
}), false);
|
||||
expect(output()).toContain("rate_limit:");
|
||||
expect(output()).toContain("type=unknown");
|
||||
expect(output()).toContain("status=unknown");
|
||||
// No resetsAt present — reset clause omitted
|
||||
expect(output()).not.toContain("resets=");
|
||||
});
|
||||
|
||||
it("does not print unknown types in non-debug mode", () => {
|
||||
printClaudeStreamEvent(JSON.stringify({ type: "unknown", data: "stuff" }), false);
|
||||
expect(output()).toBe("");
|
||||
@@ -148,3 +181,103 @@ describe("printClaudeStreamEvent", () => {
|
||||
expect(output()).toContain("stuff");
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatClaudeStreamLine", () => {
|
||||
it("returns null for empty/blank lines", () => {
|
||||
expect(formatClaudeStreamLine("")).toBeNull();
|
||||
expect(formatClaudeStreamLine(" ")).toBeNull();
|
||||
});
|
||||
|
||||
it("returns raw text for non-JSON lines (adapter status messages pass through)", () => {
|
||||
expect(formatClaudeStreamLine("[paperclip] Pod running: pod-abc")).toBe("[paperclip] Pod running: pod-abc");
|
||||
expect(formatClaudeStreamLine("Error: disk full")).toBe("Error: disk full");
|
||||
});
|
||||
|
||||
it("formats system/init event", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "system", subtype: "init", model: "claude-opus-4-7", session_id: "sess_abc",
|
||||
}));
|
||||
expect(result).toContain("Claude initialized");
|
||||
expect(result).toContain("claude-opus-4-7");
|
||||
expect(result).toContain("sess_abc");
|
||||
expect(result).not.toContain("{");
|
||||
});
|
||||
|
||||
it("formats assistant text block", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "assistant",
|
||||
message: { content: [{ type: "text", text: "Hello world" }] },
|
||||
}));
|
||||
expect(result).toBe("assistant: Hello world");
|
||||
});
|
||||
|
||||
it("formats assistant thinking block", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "assistant",
|
||||
message: { content: [{ type: "thinking", thinking: "Let me think..." }] },
|
||||
}));
|
||||
expect(result).toBe("thinking: Let me think...");
|
||||
});
|
||||
|
||||
it("formats assistant tool_use block", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "assistant",
|
||||
message: { content: [{ type: "tool_use", name: "Bash", input: { command: "ls" } }] },
|
||||
}));
|
||||
expect(result).toContain("tool_call: Bash");
|
||||
expect(result).toContain("ls");
|
||||
});
|
||||
|
||||
it("returns null for assistant with no printable content (thinking-only with empty text)", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "assistant",
|
||||
message: { content: [{ type: "thinking", thinking: "" }] },
|
||||
}));
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
it("formats user tool_result", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "user",
|
||||
message: { content: [{ type: "tool_result", content: "file1.txt\nfile2.txt" }] },
|
||||
}));
|
||||
expect(result).toContain("tool_result");
|
||||
expect(result).toContain("file1.txt");
|
||||
});
|
||||
|
||||
it("formats user tool_result error", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "user",
|
||||
message: { content: [{ type: "tool_result", is_error: true, content: "Permission denied" }] },
|
||||
}));
|
||||
expect(result).toContain("tool_result (error)");
|
||||
expect(result).toContain("Permission denied");
|
||||
});
|
||||
|
||||
it("formats result event with tokens and cost", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "result", result: "Done", subtype: "stop", total_cost_usd: 0.005,
|
||||
usage: { input_tokens: 100, output_tokens: 200, cache_read_input_tokens: 50 },
|
||||
}));
|
||||
expect(result).toContain("result:");
|
||||
expect(result).toContain("Done");
|
||||
expect(result).toContain("in=100");
|
||||
expect(result).toContain("out=200");
|
||||
expect(result).toContain("cached=50");
|
||||
});
|
||||
|
||||
it("formats rate_limit_event (FAR-32 repro)", () => {
|
||||
const result = formatClaudeStreamLine(JSON.stringify({
|
||||
type: "rate_limit_event",
|
||||
rate_limit_info: { status: "allowed", resetsAt: 1777056000, rateLimitType: "five_hour" },
|
||||
}));
|
||||
expect(result).toContain("rate_limit:");
|
||||
expect(result).toContain("five_hour");
|
||||
expect(result).toContain("allowed");
|
||||
expect(result).not.toContain("{");
|
||||
});
|
||||
|
||||
it("returns null for unknown event types", () => {
|
||||
expect(formatClaudeStreamLine(JSON.stringify({ type: "unknown_event", data: "x" }))).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
+146
-7
@@ -17,27 +17,150 @@ function asErrorText(value: unknown): string {
|
||||
}
|
||||
}
|
||||
|
||||
function printToolResult(block: Record<string, unknown>): void {
|
||||
const isError = block.is_error === true;
|
||||
let text = "";
|
||||
if (typeof block.content === "string") {
|
||||
text = block.content;
|
||||
} else if (Array.isArray(block.content)) {
|
||||
function extractToolResultText(block: Record<string, unknown>): string {
|
||||
if (typeof block.content === "string") return block.content;
|
||||
if (Array.isArray(block.content)) {
|
||||
const parts: string[] = [];
|
||||
for (const part of block.content) {
|
||||
if (typeof part !== "object" || part === null || Array.isArray(part)) continue;
|
||||
const record = part as Record<string, unknown>;
|
||||
if (typeof record.text === "string") parts.push(record.text);
|
||||
}
|
||||
text = parts.join("\n");
|
||||
return parts.join("\n");
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
function printToolResult(block: Record<string, unknown>): void {
|
||||
const isError = block.is_error === true;
|
||||
const text = extractToolResultText(block);
|
||||
console.log((isError ? pc.red : pc.cyan)(`tool_result${isError ? " (error)" : ""}`));
|
||||
if (text) {
|
||||
console.log((isError ? pc.red : pc.gray)(text));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Format a single raw Claude stream-json line into a plain-text human-readable
|
||||
* string (no ANSI colour codes) suitable for forwarding to the Paperclip server
|
||||
* via onLog. Returns null for lines that should be suppressed (empty,
|
||||
* assistant events with no printable content, etc.). Non-JSON lines are
|
||||
* returned as-is so plain-text adapter status messages pass through unchanged.
|
||||
*
|
||||
* Mirrors the event coverage of printClaudeStreamEvent so the K8s server
|
||||
* streaming path and the CLI display path produce consistent output.
|
||||
*/
|
||||
export function formatClaudeStreamLine(raw: string): string | null {
|
||||
const line = raw.trim();
|
||||
if (!line) return null;
|
||||
|
||||
let parsed: Record<string, unknown> | null = null;
|
||||
try {
|
||||
parsed = JSON.parse(line) as Record<string, unknown>;
|
||||
} catch {
|
||||
return line;
|
||||
}
|
||||
|
||||
const type = typeof parsed.type === "string" ? parsed.type : "";
|
||||
|
||||
if (type === "system" && parsed.subtype === "init") {
|
||||
const model = typeof parsed.model === "string" ? parsed.model : "unknown";
|
||||
const sessionId = typeof parsed.session_id === "string" ? parsed.session_id : "";
|
||||
return `Claude initialized (model: ${model}${sessionId ? `, session: ${sessionId}` : ""})`;
|
||||
}
|
||||
|
||||
if (type === "assistant") {
|
||||
const message =
|
||||
typeof parsed.message === "object" && parsed.message !== null && !Array.isArray(parsed.message)
|
||||
? (parsed.message as Record<string, unknown>)
|
||||
: {};
|
||||
const content = Array.isArray(message.content) ? message.content : [];
|
||||
const lines: string[] = [];
|
||||
for (const blockRaw of content) {
|
||||
if (typeof blockRaw !== "object" || blockRaw === null || Array.isArray(blockRaw)) continue;
|
||||
const block = blockRaw as Record<string, unknown>;
|
||||
const blockType = typeof block.type === "string" ? block.type : "";
|
||||
if (blockType === "text") {
|
||||
const text = typeof block.text === "string" ? block.text : "";
|
||||
if (text) lines.push(`assistant: ${text}`);
|
||||
} else if (blockType === "thinking") {
|
||||
const text = typeof block.thinking === "string" ? block.thinking : "";
|
||||
if (text) lines.push(`thinking: ${text}`);
|
||||
} else if (blockType === "tool_use") {
|
||||
const name = typeof block.name === "string" ? block.name : "unknown";
|
||||
lines.push(`tool_call: ${name}`);
|
||||
if (block.input !== undefined) {
|
||||
lines.push(JSON.stringify(block.input, null, 2));
|
||||
}
|
||||
}
|
||||
}
|
||||
return lines.length > 0 ? lines.join("\n") : null;
|
||||
}
|
||||
|
||||
if (type === "user") {
|
||||
const message =
|
||||
typeof parsed.message === "object" && parsed.message !== null && !Array.isArray(parsed.message)
|
||||
? (parsed.message as Record<string, unknown>)
|
||||
: {};
|
||||
const content = Array.isArray(message.content) ? message.content : [];
|
||||
const lines: string[] = [];
|
||||
for (const blockRaw of content) {
|
||||
if (typeof blockRaw !== "object" || blockRaw === null || Array.isArray(blockRaw)) continue;
|
||||
const block = blockRaw as Record<string, unknown>;
|
||||
if (typeof block.type === "string" && block.type === "tool_result") {
|
||||
const isError = block.is_error === true;
|
||||
const text = extractToolResultText(block);
|
||||
lines.push(`tool_result${isError ? " (error)" : ""}`);
|
||||
if (text) lines.push(text);
|
||||
}
|
||||
}
|
||||
return lines.length > 0 ? lines.join("\n") : null;
|
||||
}
|
||||
|
||||
if (type === "result") {
|
||||
const usage =
|
||||
typeof parsed.usage === "object" && parsed.usage !== null && !Array.isArray(parsed.usage)
|
||||
? (parsed.usage as Record<string, unknown>)
|
||||
: {};
|
||||
const input = Number(usage.input_tokens ?? 0);
|
||||
const output = Number(usage.output_tokens ?? 0);
|
||||
const cached = Number(usage.cache_read_input_tokens ?? 0);
|
||||
const cost = Number(parsed.total_cost_usd ?? 0);
|
||||
const subtype = typeof parsed.subtype === "string" ? parsed.subtype : "";
|
||||
const isError = parsed.is_error === true;
|
||||
const resultText = typeof parsed.result === "string" ? parsed.result : "";
|
||||
const errors = Array.isArray(parsed.errors) ? parsed.errors.map(asErrorText).filter(Boolean) : [];
|
||||
const lines: string[] = [];
|
||||
if (resultText) {
|
||||
lines.push("result:");
|
||||
lines.push(resultText);
|
||||
}
|
||||
if (subtype.startsWith("error") || isError || errors.length > 0) {
|
||||
lines.push(`claude_result: subtype=${subtype || "unknown"} is_error=${isError ? "true" : "false"}`);
|
||||
if (errors.length > 0) lines.push(`claude_errors: ${errors.join(" | ")}`);
|
||||
}
|
||||
lines.push(`tokens: in=${Number.isFinite(input) ? input : 0} out=${Number.isFinite(output) ? output : 0} cached=${Number.isFinite(cached) ? cached : 0} cost=$${Number.isFinite(cost) ? cost.toFixed(6) : "0.000000"}`);
|
||||
return lines.join("\n");
|
||||
}
|
||||
|
||||
if (type === "rate_limit_event") {
|
||||
const info =
|
||||
typeof parsed.rate_limit_info === "object" && parsed.rate_limit_info !== null
|
||||
? (parsed.rate_limit_info as Record<string, unknown>)
|
||||
: {};
|
||||
const limitType = typeof info.rateLimitType === "string" ? info.rateLimitType : "unknown";
|
||||
const status = typeof info.status === "string" ? info.status : "unknown";
|
||||
const resetsAt = typeof info.resetsAt === "number"
|
||||
? new Date(info.resetsAt * 1000).toISOString()
|
||||
: "";
|
||||
const parts = [`rate_limit: type=${limitType} status=${status}`];
|
||||
if (resetsAt) parts.push(`resets=${resetsAt}`);
|
||||
return parts.join(" ");
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
export function printClaudeStreamEvent(raw: string, debug: boolean): void {
|
||||
const line = raw.trim();
|
||||
if (!line) return;
|
||||
@@ -133,6 +256,22 @@ export function printClaudeStreamEvent(raw: string, debug: boolean): void {
|
||||
return;
|
||||
}
|
||||
|
||||
if (type === "rate_limit_event") {
|
||||
const info =
|
||||
typeof parsed.rate_limit_info === "object" && parsed.rate_limit_info !== null
|
||||
? (parsed.rate_limit_info as Record<string, unknown>)
|
||||
: {};
|
||||
const limitType = typeof info.rateLimitType === "string" ? info.rateLimitType : "unknown";
|
||||
const status = typeof info.status === "string" ? info.status : "unknown";
|
||||
const resetsAt = typeof info.resetsAt === "number"
|
||||
? new Date(info.resetsAt * 1000).toISOString()
|
||||
: "";
|
||||
const parts = [`rate_limit: type=${limitType} status=${status}`];
|
||||
if (resetsAt) parts.push(`resets=${resetsAt}`);
|
||||
console.log(pc.yellow(parts.join(" ")));
|
||||
return;
|
||||
}
|
||||
|
||||
if (debug) {
|
||||
console.log(pc.gray(line));
|
||||
}
|
||||
|
||||
+319
-4
@@ -60,7 +60,7 @@ vi.mock("@paperclipai/adapter-utils/server-utils", async (importOriginal) => {
|
||||
});
|
||||
});
|
||||
|
||||
const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, execute } = await import("./execute.js");
|
||||
const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, shouldAbortForCancellation, execute } = await import("./execute.js");
|
||||
|
||||
function makeJob(opts: {
|
||||
runId?: string;
|
||||
@@ -170,12 +170,68 @@ describe("buildPartialRunError", () => {
|
||||
expect(msg).toBe("Claude exited with code 1: Error: no API key configured");
|
||||
});
|
||||
|
||||
it("uses first non-system JSON event as content", () => {
|
||||
it("skips result events (structured protocol artefact — not surfaced verbatim)", () => {
|
||||
// In production, buildPartialRunError is only called when parseClaudeStreamJson
|
||||
// returns null (no result event). If somehow a result event appears here, the
|
||||
// raw JSON blob must not be shown — the "did not produce a result" message is
|
||||
// cleaner and avoids leaking protocol internals to the UI.
|
||||
const resultLike = JSON.stringify({ type: "result", subtype: "error", result: "rate limit" });
|
||||
const stdout = [initLine, resultLike].join("\n");
|
||||
const msg = buildPartialRunError(2, "claude-sonnet-4-6", stdout);
|
||||
expect(msg).toContain("rate limit");
|
||||
expect(msg).toContain("code 2");
|
||||
expect(msg).toContain("did not produce a result");
|
||||
expect(msg).toContain("claude-sonnet-4-6");
|
||||
expect(msg).not.toMatch(/\{.*type.*result/);
|
||||
});
|
||||
|
||||
it("skips rate_limit_event and surfaces model hint (FAR-32 Anthropic/Nancy repro)", () => {
|
||||
// Reproduces the second variant from FAR-32: init event + rate_limit_event +
|
||||
// assistant event (thinking only, no result). The rate_limit_event JSON blob
|
||||
// must not appear verbatim in the error message.
|
||||
const rateLimitEvent = JSON.stringify({
|
||||
type: "rate_limit_event",
|
||||
rate_limit_info: { status: "allowed", resetsAt: 1777056000, rateLimitType: "five_hour" },
|
||||
uuid: "3ab8f9eb-b9d6-4bf6-9c39-4608427717fc",
|
||||
session_id: "ad5f3e11-3c0c-4144-b53d-d4b959e57cee",
|
||||
});
|
||||
const stdout = [initLine, rateLimitEvent].join("\n");
|
||||
const msg = buildPartialRunError(null, "claude-opus-4-7", stdout);
|
||||
expect(msg).toContain("claude-opus-4-7");
|
||||
expect(msg).toContain("did not produce a result");
|
||||
expect(msg).not.toContain("rate_limit_event");
|
||||
expect(msg).not.toContain("rateLimitType");
|
||||
});
|
||||
|
||||
it("skips assistant events and surfaces model hint (FAR-32: MiniMax-M2.7 output_tokens=0)", () => {
|
||||
// Reproduces the exact failure: init event + assistant event with only a
|
||||
// thinking block and output_tokens=0, no result event. The assistant JSON
|
||||
// blob must not be surfaced verbatim as the error message.
|
||||
const assistantEvent = JSON.stringify({
|
||||
type: "assistant",
|
||||
message: {
|
||||
id: "063ad6038e4c889faa7c95168e007d73",
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
content: [{ type: "thinking", thinking: "Let me start…", signature: "abc123" }],
|
||||
model: "MiniMax-M2.7",
|
||||
stop_reason: null,
|
||||
stop_sequence: null,
|
||||
usage: { input_tokens: 11013, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 },
|
||||
},
|
||||
});
|
||||
const stdout = [initLine, assistantEvent].join("\n");
|
||||
const msg = buildPartialRunError(null, "MiniMax-M2.7", stdout);
|
||||
expect(msg).toContain("MiniMax-M2.7");
|
||||
expect(msg).toContain("did not produce a result");
|
||||
expect(msg).not.toContain("063ad6038e4c889faa7c95168e007d73");
|
||||
expect(msg).not.toContain("output_tokens");
|
||||
expect(msg).not.toContain("thinking");
|
||||
});
|
||||
|
||||
it("skips user events alongside system events", () => {
|
||||
const userEvent = JSON.stringify({ type: "user", message: { role: "user", content: [] } });
|
||||
const stdout = [initLine, userEvent, "Error: API quota exceeded"].join("\n");
|
||||
const msg = buildPartialRunError(1, "claude-sonnet-4-6", stdout);
|
||||
expect(msg).toBe("Claude exited with code 1: Error: API quota exceeded");
|
||||
});
|
||||
|
||||
it("null exitCode renders as -1 in message", () => {
|
||||
@@ -1220,3 +1276,262 @@ describe("execute: concurrency guard — multiple orphans", () => {
|
||||
expect(result.errorMessage).toContain("different task");
|
||||
});
|
||||
});
|
||||
|
||||
// ─── shouldAbortForCancellation ──────────────────────────────────────────────
|
||||
|
||||
describe("shouldAbortForCancellation", () => {
|
||||
it("returns false for undefined", () => {
|
||||
expect(shouldAbortForCancellation(undefined)).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false for empty string", () => {
|
||||
expect(shouldAbortForCancellation("")).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false when status is 'running'", () => {
|
||||
expect(shouldAbortForCancellation("running")).toBe(false);
|
||||
});
|
||||
|
||||
it("returns true when status is 'cancelled'", () => {
|
||||
expect(shouldAbortForCancellation("cancelled")).toBe(true);
|
||||
});
|
||||
|
||||
it("returns true when status is 'failed'", () => {
|
||||
expect(shouldAbortForCancellation("failed")).toBe(true);
|
||||
});
|
||||
|
||||
it("returns true when status is 'completed'", () => {
|
||||
expect(shouldAbortForCancellation("completed")).toBe(true);
|
||||
});
|
||||
|
||||
it("returns true for any non-running non-empty string", () => {
|
||||
expect(shouldAbortForCancellation("unknown")).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
// ─── execute: cancel-polling path ────────────────────────────────────────────
|
||||
|
||||
describe("execute: cancel-polling via keepalive tick", () => {
|
||||
const mockFetch = vi.fn();
|
||||
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
vi.useFakeTimers();
|
||||
// Replace global fetch for this suite
|
||||
vi.stubGlobal("fetch", mockFetch);
|
||||
process.env.PAPERCLIP_API_URL = "http://paperclip-test.local";
|
||||
|
||||
mockReadSkillEntries.mockResolvedValue([]);
|
||||
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
|
||||
mockBatchListJobs.mockResolvedValue({ items: [] });
|
||||
mockPrepareBundle.mockResolvedValue(makeBundle());
|
||||
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } });
|
||||
mockBatchPatchJob.mockResolvedValue({});
|
||||
mockBatchDeleteJob.mockResolvedValue({});
|
||||
mockCoreDeleteSecret.mockResolvedValue({});
|
||||
|
||||
mockCoreListPods
|
||||
.mockResolvedValueOnce({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
|
||||
}],
|
||||
})
|
||||
.mockResolvedValue({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] },
|
||||
}],
|
||||
});
|
||||
|
||||
// Job never reaches terminal on its own (cancel kicks in first)
|
||||
mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } });
|
||||
|
||||
// Log stream never ends (hung — simulates long-running Claude)
|
||||
mockLogFn.mockImplementation(() => new Promise(() => { /* never resolves */ }));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
vi.unstubAllGlobals();
|
||||
delete process.env.PAPERCLIP_API_URL;
|
||||
});
|
||||
|
||||
it("returns errorCode=cancelled when poll detects non-running status within one keepalive tick", async () => {
|
||||
// Use a flag so readJob throws 404 only AFTER deleteJob is called (simulating
|
||||
// K8s state where the job disappears after deletion).
|
||||
let jobDeleted = false;
|
||||
mockBatchDeleteJob.mockImplementation(async () => { jobDeleted = true; return {}; });
|
||||
mockBatchReadJob.mockImplementation(async () => {
|
||||
if (jobDeleted) {
|
||||
throw Object.assign(new Error("Not Found"), { response: { statusCode: 404 } });
|
||||
}
|
||||
return { status: { conditions: [] } };
|
||||
});
|
||||
|
||||
// Cancel poll returns "cancelled" status.
|
||||
mockFetch.mockResolvedValue({
|
||||
ok: true,
|
||||
json: async () => ({ status: "cancelled" }),
|
||||
});
|
||||
|
||||
const executePromise = execute(
|
||||
makeCtx({ authToken: "tok-abc" } as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
|
||||
// Timer sequence:
|
||||
// t=15100: keepalive fires → pre-check non-terminal → fetch → cancelled →
|
||||
// deleteJob (jobDeleted=true) → stop signal set
|
||||
// t=15300: stop poller fires (200ms) → destroys writable → starts 3s bail timer
|
||||
// t=17100: completion watcher polls → 404 (jobDeleted=true) → jobGone → settles
|
||||
// t=18300: bail timer fires → streamPodLogsOnce returns → streamPodLogs exits →
|
||||
// trackedLogStream settles → Promise.allSettled resolves
|
||||
await vi.advanceTimersByTimeAsync(15_100); // keepalive fires → cancel detected
|
||||
await vi.advanceTimersByTimeAsync(2_100); // completion watcher polls → 404 → settles
|
||||
await vi.advanceTimersByTimeAsync(3_100); // bail timer fires → log stream settles
|
||||
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.errorCode).toBe("cancelled");
|
||||
expect(result.errorMessage).toBe("Run cancelled");
|
||||
expect(result.timedOut).toBe(false);
|
||||
expect(mockBatchDeleteJob).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("treats HTTP 500 on cancel poll as transient and does not cancel", async () => {
|
||||
// Cancel poll returns 500 → transient, should not cancel.
|
||||
// After a while the job completes normally.
|
||||
mockFetch.mockResolvedValue({ ok: false, status: 500 });
|
||||
|
||||
// Override: job completes after keepalive tick fires
|
||||
mockBatchReadJob
|
||||
.mockResolvedValueOnce({ status: { conditions: [] } }) // first keepalive check: non-terminal
|
||||
.mockResolvedValue({ status: { conditions: [{ type: "Complete", status: "True" }] } });
|
||||
mockLogFn.mockImplementation(
|
||||
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
|
||||
writable.write(CLAUDE_HAPPY_OUTPUT);
|
||||
},
|
||||
);
|
||||
|
||||
const executePromise = execute(
|
||||
makeCtx({ authToken: "tok-abc" } as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(15_100); // keepalive fires: 500 → transient, no cancel
|
||||
await vi.advanceTimersByTimeAsync(3_100); // log reconnect sleep → stopSignal already true
|
||||
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.errorCode).toBeUndefined();
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(result.sessionId).toBe("sess_test123");
|
||||
});
|
||||
|
||||
it("skips cancel poll when authToken is absent", async () => {
|
||||
// No authToken → cancel poll must not be attempted → job completes normally
|
||||
mockBatchReadJob.mockResolvedValue({
|
||||
status: { conditions: [{ type: "Complete", status: "True" }] },
|
||||
});
|
||||
mockLogFn.mockImplementation(
|
||||
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
|
||||
writable.write(CLAUDE_HAPPY_OUTPUT);
|
||||
},
|
||||
);
|
||||
|
||||
const executePromise = execute(makeCtx()); // no authToken
|
||||
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(mockFetch).not.toHaveBeenCalled();
|
||||
expect(result.exitCode).toBe(0);
|
||||
});
|
||||
|
||||
it("skips cancel poll when PAPERCLIP_API_URL is not set", async () => {
|
||||
delete process.env.PAPERCLIP_API_URL;
|
||||
|
||||
mockBatchReadJob.mockResolvedValue({
|
||||
status: { conditions: [{ type: "Complete", status: "True" }] },
|
||||
});
|
||||
mockLogFn.mockImplementation(
|
||||
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
|
||||
writable.write(CLAUDE_HAPPY_OUTPUT);
|
||||
},
|
||||
);
|
||||
|
||||
const executePromise = execute(
|
||||
makeCtx({ authToken: "tok-abc" } as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(mockFetch).not.toHaveBeenCalled();
|
||||
expect(result.exitCode).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
// ─── execute: SIGTERM handler ─────────────────────────────────────────────────
|
||||
|
||||
describe("execute: SIGTERM handler best-effort cleanup", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
vi.useFakeTimers();
|
||||
mockReadSkillEntries.mockResolvedValue([]);
|
||||
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
|
||||
mockBatchListJobs.mockResolvedValue({ items: [] });
|
||||
mockPrepareBundle.mockResolvedValue(makeBundle());
|
||||
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } });
|
||||
mockBatchPatchJob.mockResolvedValue({});
|
||||
mockBatchDeleteJob.mockResolvedValue({});
|
||||
mockCoreDeleteSecret.mockResolvedValue({});
|
||||
mockCoreListPods
|
||||
.mockResolvedValueOnce({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
|
||||
}],
|
||||
})
|
||||
.mockResolvedValue({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] },
|
||||
}],
|
||||
});
|
||||
mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } });
|
||||
mockLogFn.mockImplementation(() => new Promise(() => { /* never resolves */ }));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("deletes the active Job when SIGTERM fires during execution", async () => {
|
||||
// Mock process.kill to prevent the test process from actually being killed.
|
||||
const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true);
|
||||
|
||||
// Start execute() and suppress unhandled rejection (we won't await it).
|
||||
const executePromise = execute(makeCtx());
|
||||
executePromise.catch(() => {});
|
||||
|
||||
// Flush microtasks through the async setup chain: getSelfPodInfo, listJobs,
|
||||
// readSkillEntries, prepareBundle, createJob, onLog, activeJobs.add(), and
|
||||
// ensureSigtermHandler() all complete before the try block enters streaming.
|
||||
// 30 rounds is more than enough for the ~7 sequential await points.
|
||||
for (let i = 0; i < 30; i++) await Promise.resolve();
|
||||
|
||||
// Emit SIGTERM — the process.once handler fires synchronously and kicks off
|
||||
// async cleanup (deleteNamespacedJob). The mock resolves immediately.
|
||||
process.emit("SIGTERM");
|
||||
|
||||
// Flush microtasks for deleteJob to resolve and the .then(process.kill) to run.
|
||||
for (let i = 0; i < 10; i++) await Promise.resolve();
|
||||
|
||||
expect(mockBatchDeleteJob).toHaveBeenCalled();
|
||||
expect(killSpy).toHaveBeenCalledWith(process.pid, "SIGTERM");
|
||||
|
||||
killSpy.mockRestore();
|
||||
// afterEach calls vi.useRealTimers() which clears all pending fake timers,
|
||||
// so we do not need to settle executePromise.
|
||||
});
|
||||
});
|
||||
|
||||
+140
-13
@@ -39,6 +39,48 @@ const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
|
||||
// minutes, causing stale "running" status in the UI (FAR-23).
|
||||
const LOG_EXIT_COMPLETION_GRACE_MS = 30_000;
|
||||
|
||||
// Module-level tracking of active Jobs for SIGTERM best-effort cleanup.
|
||||
interface ActiveJobRef {
|
||||
namespace: string;
|
||||
jobName: string;
|
||||
promptSecretName?: string;
|
||||
promptSecretNamespace?: string;
|
||||
kubeconfigPath?: string;
|
||||
}
|
||||
const activeJobs = new Set<ActiveJobRef>();
|
||||
let sigtermHandlerRegistered = false;
|
||||
|
||||
function ensureSigtermHandler(): void {
|
||||
if (sigtermHandlerRegistered) return;
|
||||
sigtermHandlerRegistered = true;
|
||||
process.once("SIGTERM", () => {
|
||||
const jobs = [...activeJobs];
|
||||
void Promise.allSettled(
|
||||
jobs.map(async (ref) => {
|
||||
try {
|
||||
const batchApi = getBatchApi(ref.kubeconfigPath);
|
||||
await batchApi.deleteNamespacedJob({
|
||||
name: ref.jobName,
|
||||
namespace: ref.namespace,
|
||||
body: { propagationPolicy: "Background" },
|
||||
});
|
||||
} catch { /* best-effort */ }
|
||||
if (ref.promptSecretName && ref.promptSecretNamespace) {
|
||||
try {
|
||||
const coreApi = getCoreApi(ref.kubeconfigPath);
|
||||
await coreApi.deleteNamespacedSecret({
|
||||
name: ref.promptSecretName,
|
||||
namespace: ref.promptSecretNamespace,
|
||||
});
|
||||
} catch { /* best-effort */ }
|
||||
}
|
||||
}),
|
||||
).then(() => {
|
||||
process.kill(process.pid, "SIGTERM");
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
|
||||
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
|
||||
@@ -53,6 +95,16 @@ export function isK8s404(err: unknown): boolean {
|
||||
return /HTTP-Code:\s*404\b/.test(err.message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true when the heartbeat-run status indicates the run is no longer
|
||||
* active and the K8s Job should be cancelled.
|
||||
* Exported for unit tests.
|
||||
*/
|
||||
export function shouldAbortForCancellation(runStatus: string | undefined): boolean {
|
||||
if (!runStatus) return false;
|
||||
return runStatus !== "running";
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the error message when Claude's stdout contains no result event.
|
||||
* Skips system/init event lines so the UI doesn't display the raw init JSON.
|
||||
@@ -65,22 +117,31 @@ export function buildPartialRunError(
|
||||
): string {
|
||||
if (exitCode === 0) return "Failed to parse Claude JSON output";
|
||||
|
||||
// Walk stdout lines, skip system events, return the first real content line.
|
||||
// Walk stdout lines and skip every structured streaming event (any JSON
|
||||
// object that carries a non-empty "type" field: system, assistant, user,
|
||||
// rate_limit_event, result, …). All of these are protocol artefacts and
|
||||
// produce confusing raw-JSON blobs when surfaced verbatim as an error
|
||||
// message. Only plain-text lines (non-JSON, or JSON without a type field)
|
||||
// are treated as human-readable content worth including in the error.
|
||||
const firstContentLine = stdout.split(/\r?\n/)
|
||||
.map((l) => l.trim())
|
||||
.find((l) => {
|
||||
if (!l) return false;
|
||||
try {
|
||||
const obj = JSON.parse(l);
|
||||
if (typeof obj === "object" && obj !== null && (obj as Record<string, unknown>).type === "system") return false;
|
||||
if (typeof obj === "object" && obj !== null) {
|
||||
const t = (obj as Record<string, unknown>).type;
|
||||
if (typeof t === "string" && t) return false;
|
||||
}
|
||||
} catch {
|
||||
// not JSON — treat as content
|
||||
}
|
||||
return true;
|
||||
}) ?? "";
|
||||
|
||||
// If we only have system/init events and nothing else, surface the model
|
||||
// name so the operator can diagnose missing credentials or unsupported model.
|
||||
// If the stream contained only structured events with no plain-text output,
|
||||
// surface the model name so the operator can diagnose missing credentials
|
||||
// or unsupported/misconfigured model.
|
||||
const initOnlyOutput = stdout.trim() !== "" && model !== "" && !firstContentLine;
|
||||
if (initOnlyOutput) {
|
||||
const modelHint = model ? ` (model: ${model})` : "";
|
||||
@@ -298,6 +359,10 @@ export async function streamPodLogsOnce(
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
// Forward raw stream-json lines unchanged. The Paperclip UI uses the
|
||||
// adapter's ui-parser export (src/ui-parser.ts) to render structured
|
||||
// transcript entries — pre-formatting here would strip that structure
|
||||
// and produce flat plain text that looks nothing like claude_local.
|
||||
void onLog("stdout", emitted).then(() => callback(), callback);
|
||||
},
|
||||
});
|
||||
@@ -546,6 +611,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
const graceSec = asNumber(config.graceSec, 60);
|
||||
const retainJobs = asBoolean(config.retainJobs, false);
|
||||
const kubeconfigPath = asString(config.kubeconfig, "") || undefined;
|
||||
const paperclipApiUrl = process.env.PAPERCLIP_API_URL ?? "";
|
||||
if (!paperclipApiUrl) {
|
||||
await onLog("stderr", "[paperclip] Warning: PAPERCLIP_API_URL not set — cancel polling disabled\n");
|
||||
}
|
||||
|
||||
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
|
||||
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
|
||||
@@ -580,7 +649,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
labelSelector: `paperclip.io/agent-id=${sanitizedAgentId},paperclip.io/adapter-type=claude_k8s`,
|
||||
});
|
||||
const running = existing.items.filter(
|
||||
(j) => !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"),
|
||||
(j) =>
|
||||
!j.metadata?.deletionTimestamp &&
|
||||
!j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"),
|
||||
);
|
||||
if (running.length > 0) {
|
||||
// Separate orphaned jobs (from a previous server-side run) from truly
|
||||
@@ -905,6 +976,15 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// delete a job that is still alive and the UI is waiting on.
|
||||
let skipCleanup = false;
|
||||
|
||||
const activeJobRef: ActiveJobRef = {
|
||||
namespace,
|
||||
jobName,
|
||||
...(promptSecret ? { promptSecretName: promptSecret.name, promptSecretNamespace: promptSecret.namespace } : {}),
|
||||
kubeconfigPath,
|
||||
};
|
||||
activeJobs.add(activeJobRef);
|
||||
ensureSigtermHandler();
|
||||
|
||||
try {
|
||||
// Wait for pod to be ready for log streaming
|
||||
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
|
||||
@@ -953,11 +1033,21 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
let lastLogAt = Date.now();
|
||||
let keepaliveJobTerminal = false;
|
||||
let consecutiveTerminalReadings = 0;
|
||||
// Shared signal: when job completion resolves, tell the log streamer to
|
||||
// stop reconnecting. Declared before keepaliveTimer so the cancel path
|
||||
// inside the timer can set it without temporal dead zone issues.
|
||||
const logStopSignal = { stopped: false };
|
||||
// Shared dedup filter: created here so the one-shot fallback can
|
||||
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
|
||||
const logDedup = new LogLineDedupFilter();
|
||||
// Set when the run is externally cancelled (cancel-poll path).
|
||||
let cancelled = false;
|
||||
|
||||
keepaliveTimer = setInterval(() => {
|
||||
// Fire-and-forget the async work; setInterval callbacks must be
|
||||
// synchronous or the timer will drift.
|
||||
void (async () => {
|
||||
if (keepaliveJobTerminal) return;
|
||||
if (keepaliveJobTerminal || cancelled) return;
|
||||
|
||||
// Verify the Job is still alive before announcing or refreshing.
|
||||
// Require two consecutive terminal readings before latching to
|
||||
@@ -992,6 +1082,37 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
return;
|
||||
}
|
||||
|
||||
// Cancel-polling: check if the Paperclip run was cancelled externally.
|
||||
// Skipped on the reattach path to avoid tearing down an adopted Job.
|
||||
// HTTP non-2xx is treated as transient — never interpret a 5xx as cancel.
|
||||
if (!reattachTarget && paperclipApiUrl && ctx.authToken) {
|
||||
try {
|
||||
const resp = await fetch(`${paperclipApiUrl}/api/heartbeat-runs/${runId}`, {
|
||||
headers: { Authorization: `Bearer ${ctx.authToken}` },
|
||||
});
|
||||
if (resp.ok) {
|
||||
const data = await resp.json() as { status?: string };
|
||||
if (shouldAbortForCancellation(data.status)) {
|
||||
void onLog("stdout", `[paperclip] Run cancelled externally — deleting Job ${jobName}\n`).catch(() => {});
|
||||
cancelled = true;
|
||||
logStopSignal.stopped = true;
|
||||
try {
|
||||
await batchApi.deleteNamespacedJob({
|
||||
name: jobName,
|
||||
namespace,
|
||||
body: { propagationPolicy: "Background" },
|
||||
});
|
||||
} catch { /* best-effort — completion watcher will see 404 and settle */ }
|
||||
return;
|
||||
}
|
||||
} else if (resp.status >= 500) {
|
||||
void onLog("stderr", `[paperclip] keepalive: cancel poll returned HTTP ${resp.status} — transient, ignoring\n`).catch(() => {});
|
||||
}
|
||||
} catch {
|
||||
// network error — transient, skip this tick
|
||||
}
|
||||
}
|
||||
|
||||
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
|
||||
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {});
|
||||
})();
|
||||
@@ -1001,13 +1122,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
return onLog(stream, chunk);
|
||||
};
|
||||
|
||||
// Shared signal: when job completion resolves, tell the log
|
||||
// streamer to stop reconnecting.
|
||||
const logStopSignal = { stopped: false };
|
||||
// Shared dedup filter: created here so the one-shot fallback can
|
||||
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
|
||||
const logDedup = new LogLineDedupFilter();
|
||||
|
||||
// Track when the log stream first exits so the grace-period can fire
|
||||
// if the K8s Job condition lags behind container exit (FAR-23).
|
||||
// Set via onFirstStreamExit callback (called after attempt=0 returns)
|
||||
@@ -1067,6 +1181,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
keepaliveTimer = null;
|
||||
}
|
||||
|
||||
// If the run was externally cancelled, return a clean cancelled result
|
||||
// without processing stdout (the finally block still runs for cleanup).
|
||||
if (cancelled) {
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorCode: "cancelled",
|
||||
errorMessage: "Run cancelled",
|
||||
};
|
||||
}
|
||||
|
||||
if (logResult.status === "fulfilled") {
|
||||
stdout = logResult.value;
|
||||
}
|
||||
@@ -1141,6 +1267,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
exitCode = await getPodExitCode(namespace, jobName, kubeconfigPath);
|
||||
} finally {
|
||||
if (keepaliveTimer) clearInterval(keepaliveTimer);
|
||||
activeJobs.delete(activeJobRef);
|
||||
if (skipCleanup) {
|
||||
await onLog("stdout", `[paperclip] Retaining job ${jobName} (state mismatch — UI is waiting on it)\n`);
|
||||
} else if (!retainJobs) {
|
||||
|
||||
Reference in New Issue
Block a user