Compare commits

...

12 Commits

Author SHA1 Message Date
Chris Farhood 357f035418 fix: skip K8s jobs with deletionTimestamp in concurrency guard (FAR-34)
Jobs being deleted via kubectl enter a Terminating state where
deletionTimestamp is set but no Complete/Failed condition is added.
The concurrency guard previously treated these as running, blocking
all subsequent heartbeat runs for the agent until the job fully
disappeared from the K8s API.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 18:36:19 +00:00
Chris Farhood f340ce52ee 0.1.42 2026-04-24 17:56:14 +00:00
Chris Farhood ecc477d0be fix: stream raw stream-json to onLog so Paperclip UI renders structured transcript entries (FAR-32)
The prior approach (commit b607657) converted Claude's stream-json into
flat plain text before calling onLog.  This stripped the structure the
Paperclip UI needs — its adapter ui-parser (src/ui-parser.ts, exported
via the package's ./ui-parser entry) expects raw stream-json lines and
emits structured transcript entries (assistant / thinking / tool_call /
tool_result / init / result) that the UI renders as rich blocks, just
like claude_local.

claude_local passes stdout through unchanged to onLog for the same
reason — the server persists raw lines and the UI parser turns them
into rendered transcript entries.  Mirror that here.

formatClaudeStreamLine stays as an internal helper for future CLI use,
but is no longer applied in the K8s streaming path.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 17:56:10 +00:00
Chris Farhood f9ba77527a 0.1.41 2026-04-24 17:43:16 +00:00
Chris Farhood f304c70899 fix: keep formatClaudeStreamLine internal to avoid ESM hot-reload link failure (FAR-32)
Exposing formatClaudeStreamLine at the package root caused Paperclip reinstalls
to fail with "'./cli/index.js' does not provide an export named
'formatClaudeStreamLine'".  The host process caches child ESM module records
across reinstalls; linking the new dist/index.js re-export against the cached
old dist/cli/index.js fails.

The symbol is only used internally by server/execute.ts (which imports from
./cli/format-event.js directly), so drop the public re-export.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 17:43:16 +00:00
Chris Farhood 727d9494da 0.1.40 2026-04-24 17:35:08 +00:00
Chris Farhood b60765785b feat: format Claude stream-json events in K8s streaming path for consistency with claude_local (FAR-32)
All output sent to Paperclip via onLog now passes through formatClaudeStreamLine,
converting raw stream-json blobs into human-readable text consistent with how
the CLI and claude_local adapter format events.

Changes:
- format-event.ts: add formatClaudeStreamLine(raw) -> string | null
  Plain-text equivalent of printClaudeStreamEvent — no ANSI colours, returns
  null for lines to suppress (assistant with no content, unknown events).
  Handles: system/init, assistant (text/thinking/tool_use), user (tool_result),
  result (summary + tokens), rate_limit_event. Non-JSON lines pass through.
- execute.ts: wire formatClaudeStreamLine into streamPodLogsOnce write handler.
  raw chunks still stored in 'chunks[]' for parseClaudeStreamJson; only the
  onLog path receives formatted text.
- 12 new tests for formatClaudeStreamLine covering all event types.
- 352/352 tests pass.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 17:26:37 +00:00
Chris Farhood 28d6451265 feat: add rate_limit_event formatting to printClaudeStreamEvent (FAR-32)
rate_limit_event was previously falling through to the debug-only branch
and silently dropped in non-debug mode.  Now it surfaces a concise,
human-readable line for CLI consumers:

  rate_limit: type=five_hour status=allowed resets=2026-04-22T06:00:00.000Z

Two tests cover the exact FAR-32 repro payload and graceful handling of
missing rate_limit_info fields.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 17:22:15 +00:00
Chris Farhood cabdc3df98 fix: skip all structured streaming events in buildPartialRunError (FAR-32 followup)
Extends the previous fix (which only covered assistant/user) to skip every
JSON object with a non-empty "type" field — system, assistant, user,
rate_limit_event, result, and any future event types.  This prevents all
structured protocol artefacts from being surfaced verbatim as error messages.

Root cause of the new repro: when Claude emits a rate_limit_event before
producing output and then exits without a result event, the rate_limit_event
JSON blob was becoming the "first content line" and appearing in the error:

  Claude exited with code -1: {"type":"rate_limit_event","rate_limit_info":{...}}

With this fix, all typed events are filtered and the initOnlyOutput branch
fires, producing the clean diagnostic:

  Claude started but did not produce a result (model: claude-opus-4-7)
  — check API credentials, model support, and adapter config

Updated the "result event as content" test to match the new (correct) behaviour:
in production buildPartialRunError is only called when parseClaudeStreamJson
returns null (no result event), so the prior test was exercising a degenerate
state that cannot occur through execute().

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 17:17:48 +00:00
Chris Farhood f9ff04a354 fix: skip assistant/user events in buildPartialRunError to avoid raw JSON blobs in error messages (FAR-32)
When a model produces assistant events with output_tokens=0 but no result
event (e.g. MiniMax-M2.7 thinking-only output), the partial-run error
previously surfaced the raw assistant JSON blob verbatim, producing an
unreadable message like "Claude exited with code -1: {\"type\":\"assistant\",...}".

Fix: extend the content-line filter in buildPartialRunError to also skip
assistant and user event types (intermediate streaming events), in addition
to system events. result events are still retained since they may carry
useful terminal error details. When all stdout lines are filtered, the
existing initOnlyOutput branch triggers and surfaces a clean diagnostic:
"Claude started but did not produce a result (model: MiniMax-M2.7) — check
API credentials, model support, and adapter config".

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 17:11:20 +00:00
Chris Farhood e611f26d32 0.1.39 2026-04-24 15:20:59 +00:00
Chris Farhood f097440f3c feat: implement cancel support via keepalive poll and SIGTERM handler (FAR-26)
- Poll GET /api/heartbeat-runs/:runId on every keepalive tick (15s); when
  status != 'running', delete the K8s Job, set logStopSignal, and return
  errorCode='cancelled' — Job gone within ~15s of external cancellation.
- SIGTERM handler best-effort deletes all active Jobs/Secrets and re-emits
  the signal to let the process exit naturally.
- Export shouldAbortForCancellation() helper; add tests for helper, cancel
  poll path, and SIGTERM cleanup.
- Guard: PAPERCLIP_API_URL missing logs a warning and skips cancel polling;
  HTTP 5xx from poll treated as transient; reattach path skips cancel poll.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 15:20:45 +00:00
6 changed files with 742 additions and 28 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.38",
"version": "0.1.42",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.38",
"version": "0.1.42",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.38",
"version": "0.1.43",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+134 -1
View File
@@ -1,5 +1,5 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { printClaudeStreamEvent } from "./format-event.js";
import { printClaudeStreamEvent, formatClaudeStreamLine } from "./format-event.js";
// Mock console methods to capture output
const consoleMock = {
@@ -138,6 +138,39 @@ describe("printClaudeStreamEvent", () => {
expect(output()).toBe("some output text");
});
it("prints rate_limit_event with type, status, and reset time", () => {
printClaudeStreamEvent(JSON.stringify({
type: "rate_limit_event",
rate_limit_info: {
status: "allowed",
resetsAt: 1777056000,
rateLimitType: "five_hour",
overageStatus: "allowed",
isUsingOverage: false,
},
uuid: "3ab8f9eb-b9d6-4bf6-9c39-4608427717fc",
session_id: "ad5f3e11-3c0c-4144-b53d-d4b959e57cee",
}), false);
expect(output()).toContain("rate_limit:");
expect(output()).toContain("five_hour");
expect(output()).toContain("allowed");
expect(output()).toContain("resets=");
// Raw JSON must not be surfaced verbatim
expect(output()).not.toContain("3ab8f9eb-b9d6-4bf6-9c39-4608427717fc");
});
it("prints rate_limit_event with unknown fields gracefully", () => {
printClaudeStreamEvent(JSON.stringify({
type: "rate_limit_event",
rate_limit_info: {},
}), false);
expect(output()).toContain("rate_limit:");
expect(output()).toContain("type=unknown");
expect(output()).toContain("status=unknown");
// No resetsAt present — reset clause omitted
expect(output()).not.toContain("resets=");
});
it("does not print unknown types in non-debug mode", () => {
printClaudeStreamEvent(JSON.stringify({ type: "unknown", data: "stuff" }), false);
expect(output()).toBe("");
@@ -148,3 +181,103 @@ describe("printClaudeStreamEvent", () => {
expect(output()).toContain("stuff");
});
});
describe("formatClaudeStreamLine", () => {
it("returns null for empty/blank lines", () => {
expect(formatClaudeStreamLine("")).toBeNull();
expect(formatClaudeStreamLine(" ")).toBeNull();
});
it("returns raw text for non-JSON lines (adapter status messages pass through)", () => {
expect(formatClaudeStreamLine("[paperclip] Pod running: pod-abc")).toBe("[paperclip] Pod running: pod-abc");
expect(formatClaudeStreamLine("Error: disk full")).toBe("Error: disk full");
});
it("formats system/init event", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "system", subtype: "init", model: "claude-opus-4-7", session_id: "sess_abc",
}));
expect(result).toContain("Claude initialized");
expect(result).toContain("claude-opus-4-7");
expect(result).toContain("sess_abc");
expect(result).not.toContain("{");
});
it("formats assistant text block", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "assistant",
message: { content: [{ type: "text", text: "Hello world" }] },
}));
expect(result).toBe("assistant: Hello world");
});
it("formats assistant thinking block", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "assistant",
message: { content: [{ type: "thinking", thinking: "Let me think..." }] },
}));
expect(result).toBe("thinking: Let me think...");
});
it("formats assistant tool_use block", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "assistant",
message: { content: [{ type: "tool_use", name: "Bash", input: { command: "ls" } }] },
}));
expect(result).toContain("tool_call: Bash");
expect(result).toContain("ls");
});
it("returns null for assistant with no printable content (thinking-only with empty text)", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "assistant",
message: { content: [{ type: "thinking", thinking: "" }] },
}));
expect(result).toBeNull();
});
it("formats user tool_result", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "user",
message: { content: [{ type: "tool_result", content: "file1.txt\nfile2.txt" }] },
}));
expect(result).toContain("tool_result");
expect(result).toContain("file1.txt");
});
it("formats user tool_result error", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "user",
message: { content: [{ type: "tool_result", is_error: true, content: "Permission denied" }] },
}));
expect(result).toContain("tool_result (error)");
expect(result).toContain("Permission denied");
});
it("formats result event with tokens and cost", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "result", result: "Done", subtype: "stop", total_cost_usd: 0.005,
usage: { input_tokens: 100, output_tokens: 200, cache_read_input_tokens: 50 },
}));
expect(result).toContain("result:");
expect(result).toContain("Done");
expect(result).toContain("in=100");
expect(result).toContain("out=200");
expect(result).toContain("cached=50");
});
it("formats rate_limit_event (FAR-32 repro)", () => {
const result = formatClaudeStreamLine(JSON.stringify({
type: "rate_limit_event",
rate_limit_info: { status: "allowed", resetsAt: 1777056000, rateLimitType: "five_hour" },
}));
expect(result).toContain("rate_limit:");
expect(result).toContain("five_hour");
expect(result).toContain("allowed");
expect(result).not.toContain("{");
});
it("returns null for unknown event types", () => {
expect(formatClaudeStreamLine(JSON.stringify({ type: "unknown_event", data: "x" }))).toBeNull();
});
});
+146 -7
View File
@@ -17,27 +17,150 @@ function asErrorText(value: unknown): string {
}
}
function printToolResult(block: Record<string, unknown>): void {
const isError = block.is_error === true;
let text = "";
if (typeof block.content === "string") {
text = block.content;
} else if (Array.isArray(block.content)) {
function extractToolResultText(block: Record<string, unknown>): string {
if (typeof block.content === "string") return block.content;
if (Array.isArray(block.content)) {
const parts: string[] = [];
for (const part of block.content) {
if (typeof part !== "object" || part === null || Array.isArray(part)) continue;
const record = part as Record<string, unknown>;
if (typeof record.text === "string") parts.push(record.text);
}
text = parts.join("\n");
return parts.join("\n");
}
return "";
}
function printToolResult(block: Record<string, unknown>): void {
const isError = block.is_error === true;
const text = extractToolResultText(block);
console.log((isError ? pc.red : pc.cyan)(`tool_result${isError ? " (error)" : ""}`));
if (text) {
console.log((isError ? pc.red : pc.gray)(text));
}
}
/**
* Format a single raw Claude stream-json line into a plain-text human-readable
* string (no ANSI colour codes) suitable for forwarding to the Paperclip server
* via onLog. Returns null for lines that should be suppressed (empty,
* assistant events with no printable content, etc.). Non-JSON lines are
* returned as-is so plain-text adapter status messages pass through unchanged.
*
* Mirrors the event coverage of printClaudeStreamEvent so the K8s server
* streaming path and the CLI display path produce consistent output.
*/
export function formatClaudeStreamLine(raw: string): string | null {
const line = raw.trim();
if (!line) return null;
let parsed: Record<string, unknown> | null = null;
try {
parsed = JSON.parse(line) as Record<string, unknown>;
} catch {
return line;
}
const type = typeof parsed.type === "string" ? parsed.type : "";
if (type === "system" && parsed.subtype === "init") {
const model = typeof parsed.model === "string" ? parsed.model : "unknown";
const sessionId = typeof parsed.session_id === "string" ? parsed.session_id : "";
return `Claude initialized (model: ${model}${sessionId ? `, session: ${sessionId}` : ""})`;
}
if (type === "assistant") {
const message =
typeof parsed.message === "object" && parsed.message !== null && !Array.isArray(parsed.message)
? (parsed.message as Record<string, unknown>)
: {};
const content = Array.isArray(message.content) ? message.content : [];
const lines: string[] = [];
for (const blockRaw of content) {
if (typeof blockRaw !== "object" || blockRaw === null || Array.isArray(blockRaw)) continue;
const block = blockRaw as Record<string, unknown>;
const blockType = typeof block.type === "string" ? block.type : "";
if (blockType === "text") {
const text = typeof block.text === "string" ? block.text : "";
if (text) lines.push(`assistant: ${text}`);
} else if (blockType === "thinking") {
const text = typeof block.thinking === "string" ? block.thinking : "";
if (text) lines.push(`thinking: ${text}`);
} else if (blockType === "tool_use") {
const name = typeof block.name === "string" ? block.name : "unknown";
lines.push(`tool_call: ${name}`);
if (block.input !== undefined) {
lines.push(JSON.stringify(block.input, null, 2));
}
}
}
return lines.length > 0 ? lines.join("\n") : null;
}
if (type === "user") {
const message =
typeof parsed.message === "object" && parsed.message !== null && !Array.isArray(parsed.message)
? (parsed.message as Record<string, unknown>)
: {};
const content = Array.isArray(message.content) ? message.content : [];
const lines: string[] = [];
for (const blockRaw of content) {
if (typeof blockRaw !== "object" || blockRaw === null || Array.isArray(blockRaw)) continue;
const block = blockRaw as Record<string, unknown>;
if (typeof block.type === "string" && block.type === "tool_result") {
const isError = block.is_error === true;
const text = extractToolResultText(block);
lines.push(`tool_result${isError ? " (error)" : ""}`);
if (text) lines.push(text);
}
}
return lines.length > 0 ? lines.join("\n") : null;
}
if (type === "result") {
const usage =
typeof parsed.usage === "object" && parsed.usage !== null && !Array.isArray(parsed.usage)
? (parsed.usage as Record<string, unknown>)
: {};
const input = Number(usage.input_tokens ?? 0);
const output = Number(usage.output_tokens ?? 0);
const cached = Number(usage.cache_read_input_tokens ?? 0);
const cost = Number(parsed.total_cost_usd ?? 0);
const subtype = typeof parsed.subtype === "string" ? parsed.subtype : "";
const isError = parsed.is_error === true;
const resultText = typeof parsed.result === "string" ? parsed.result : "";
const errors = Array.isArray(parsed.errors) ? parsed.errors.map(asErrorText).filter(Boolean) : [];
const lines: string[] = [];
if (resultText) {
lines.push("result:");
lines.push(resultText);
}
if (subtype.startsWith("error") || isError || errors.length > 0) {
lines.push(`claude_result: subtype=${subtype || "unknown"} is_error=${isError ? "true" : "false"}`);
if (errors.length > 0) lines.push(`claude_errors: ${errors.join(" | ")}`);
}
lines.push(`tokens: in=${Number.isFinite(input) ? input : 0} out=${Number.isFinite(output) ? output : 0} cached=${Number.isFinite(cached) ? cached : 0} cost=$${Number.isFinite(cost) ? cost.toFixed(6) : "0.000000"}`);
return lines.join("\n");
}
if (type === "rate_limit_event") {
const info =
typeof parsed.rate_limit_info === "object" && parsed.rate_limit_info !== null
? (parsed.rate_limit_info as Record<string, unknown>)
: {};
const limitType = typeof info.rateLimitType === "string" ? info.rateLimitType : "unknown";
const status = typeof info.status === "string" ? info.status : "unknown";
const resetsAt = typeof info.resetsAt === "number"
? new Date(info.resetsAt * 1000).toISOString()
: "";
const parts = [`rate_limit: type=${limitType} status=${status}`];
if (resetsAt) parts.push(`resets=${resetsAt}`);
return parts.join(" ");
}
return null;
}
export function printClaudeStreamEvent(raw: string, debug: boolean): void {
const line = raw.trim();
if (!line) return;
@@ -133,6 +256,22 @@ export function printClaudeStreamEvent(raw: string, debug: boolean): void {
return;
}
if (type === "rate_limit_event") {
const info =
typeof parsed.rate_limit_info === "object" && parsed.rate_limit_info !== null
? (parsed.rate_limit_info as Record<string, unknown>)
: {};
const limitType = typeof info.rateLimitType === "string" ? info.rateLimitType : "unknown";
const status = typeof info.status === "string" ? info.status : "unknown";
const resetsAt = typeof info.resetsAt === "number"
? new Date(info.resetsAt * 1000).toISOString()
: "";
const parts = [`rate_limit: type=${limitType} status=${status}`];
if (resetsAt) parts.push(`resets=${resetsAt}`);
console.log(pc.yellow(parts.join(" ")));
return;
}
if (debug) {
console.log(pc.gray(line));
}
+319 -4
View File
@@ -60,7 +60,7 @@ vi.mock("@paperclipai/adapter-utils/server-utils", async (importOriginal) => {
});
});
const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, execute } = await import("./execute.js");
const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, shouldAbortForCancellation, execute } = await import("./execute.js");
function makeJob(opts: {
runId?: string;
@@ -170,12 +170,68 @@ describe("buildPartialRunError", () => {
expect(msg).toBe("Claude exited with code 1: Error: no API key configured");
});
it("uses first non-system JSON event as content", () => {
it("skips result events (structured protocol artefact — not surfaced verbatim)", () => {
// In production, buildPartialRunError is only called when parseClaudeStreamJson
// returns null (no result event). If somehow a result event appears here, the
// raw JSON blob must not be shown — the "did not produce a result" message is
// cleaner and avoids leaking protocol internals to the UI.
const resultLike = JSON.stringify({ type: "result", subtype: "error", result: "rate limit" });
const stdout = [initLine, resultLike].join("\n");
const msg = buildPartialRunError(2, "claude-sonnet-4-6", stdout);
expect(msg).toContain("rate limit");
expect(msg).toContain("code 2");
expect(msg).toContain("did not produce a result");
expect(msg).toContain("claude-sonnet-4-6");
expect(msg).not.toMatch(/\{.*type.*result/);
});
it("skips rate_limit_event and surfaces model hint (FAR-32 Anthropic/Nancy repro)", () => {
// Reproduces the second variant from FAR-32: init event + rate_limit_event +
// assistant event (thinking only, no result). The rate_limit_event JSON blob
// must not appear verbatim in the error message.
const rateLimitEvent = JSON.stringify({
type: "rate_limit_event",
rate_limit_info: { status: "allowed", resetsAt: 1777056000, rateLimitType: "five_hour" },
uuid: "3ab8f9eb-b9d6-4bf6-9c39-4608427717fc",
session_id: "ad5f3e11-3c0c-4144-b53d-d4b959e57cee",
});
const stdout = [initLine, rateLimitEvent].join("\n");
const msg = buildPartialRunError(null, "claude-opus-4-7", stdout);
expect(msg).toContain("claude-opus-4-7");
expect(msg).toContain("did not produce a result");
expect(msg).not.toContain("rate_limit_event");
expect(msg).not.toContain("rateLimitType");
});
it("skips assistant events and surfaces model hint (FAR-32: MiniMax-M2.7 output_tokens=0)", () => {
// Reproduces the exact failure: init event + assistant event with only a
// thinking block and output_tokens=0, no result event. The assistant JSON
// blob must not be surfaced verbatim as the error message.
const assistantEvent = JSON.stringify({
type: "assistant",
message: {
id: "063ad6038e4c889faa7c95168e007d73",
type: "message",
role: "assistant",
content: [{ type: "thinking", thinking: "Let me start…", signature: "abc123" }],
model: "MiniMax-M2.7",
stop_reason: null,
stop_sequence: null,
usage: { input_tokens: 11013, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 },
},
});
const stdout = [initLine, assistantEvent].join("\n");
const msg = buildPartialRunError(null, "MiniMax-M2.7", stdout);
expect(msg).toContain("MiniMax-M2.7");
expect(msg).toContain("did not produce a result");
expect(msg).not.toContain("063ad6038e4c889faa7c95168e007d73");
expect(msg).not.toContain("output_tokens");
expect(msg).not.toContain("thinking");
});
it("skips user events alongside system events", () => {
const userEvent = JSON.stringify({ type: "user", message: { role: "user", content: [] } });
const stdout = [initLine, userEvent, "Error: API quota exceeded"].join("\n");
const msg = buildPartialRunError(1, "claude-sonnet-4-6", stdout);
expect(msg).toBe("Claude exited with code 1: Error: API quota exceeded");
});
it("null exitCode renders as -1 in message", () => {
@@ -1220,3 +1276,262 @@ describe("execute: concurrency guard — multiple orphans", () => {
expect(result.errorMessage).toContain("different task");
});
});
// ─── shouldAbortForCancellation ──────────────────────────────────────────────
describe("shouldAbortForCancellation", () => {
it("returns false for undefined", () => {
expect(shouldAbortForCancellation(undefined)).toBe(false);
});
it("returns false for empty string", () => {
expect(shouldAbortForCancellation("")).toBe(false);
});
it("returns false when status is 'running'", () => {
expect(shouldAbortForCancellation("running")).toBe(false);
});
it("returns true when status is 'cancelled'", () => {
expect(shouldAbortForCancellation("cancelled")).toBe(true);
});
it("returns true when status is 'failed'", () => {
expect(shouldAbortForCancellation("failed")).toBe(true);
});
it("returns true when status is 'completed'", () => {
expect(shouldAbortForCancellation("completed")).toBe(true);
});
it("returns true for any non-running non-empty string", () => {
expect(shouldAbortForCancellation("unknown")).toBe(true);
});
});
// ─── execute: cancel-polling path ────────────────────────────────────────────
describe("execute: cancel-polling via keepalive tick", () => {
const mockFetch = vi.fn();
beforeEach(() => {
vi.resetAllMocks();
vi.useFakeTimers();
// Replace global fetch for this suite
vi.stubGlobal("fetch", mockFetch);
process.env.PAPERCLIP_API_URL = "http://paperclip-test.local";
mockReadSkillEntries.mockResolvedValue([]);
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
mockBatchListJobs.mockResolvedValue({ items: [] });
mockPrepareBundle.mockResolvedValue(makeBundle());
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } });
mockBatchPatchJob.mockResolvedValue({});
mockBatchDeleteJob.mockResolvedValue({});
mockCoreDeleteSecret.mockResolvedValue({});
mockCoreListPods
.mockResolvedValueOnce({
items: [{
metadata: { name: "pod-abc" },
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
}],
})
.mockResolvedValue({
items: [{
metadata: { name: "pod-abc" },
status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] },
}],
});
// Job never reaches terminal on its own (cancel kicks in first)
mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } });
// Log stream never ends (hung — simulates long-running Claude)
mockLogFn.mockImplementation(() => new Promise(() => { /* never resolves */ }));
});
afterEach(() => {
vi.useRealTimers();
vi.unstubAllGlobals();
delete process.env.PAPERCLIP_API_URL;
});
it("returns errorCode=cancelled when poll detects non-running status within one keepalive tick", async () => {
// Use a flag so readJob throws 404 only AFTER deleteJob is called (simulating
// K8s state where the job disappears after deletion).
let jobDeleted = false;
mockBatchDeleteJob.mockImplementation(async () => { jobDeleted = true; return {}; });
mockBatchReadJob.mockImplementation(async () => {
if (jobDeleted) {
throw Object.assign(new Error("Not Found"), { response: { statusCode: 404 } });
}
return { status: { conditions: [] } };
});
// Cancel poll returns "cancelled" status.
mockFetch.mockResolvedValue({
ok: true,
json: async () => ({ status: "cancelled" }),
});
const executePromise = execute(
makeCtx({ authToken: "tok-abc" } as Partial<AdapterExecutionContext>),
);
// Timer sequence:
// t=15100: keepalive fires → pre-check non-terminal → fetch → cancelled →
// deleteJob (jobDeleted=true) → stop signal set
// t=15300: stop poller fires (200ms) → destroys writable → starts 3s bail timer
// t=17100: completion watcher polls → 404 (jobDeleted=true) → jobGone → settles
// t=18300: bail timer fires → streamPodLogsOnce returns → streamPodLogs exits →
// trackedLogStream settles → Promise.allSettled resolves
await vi.advanceTimersByTimeAsync(15_100); // keepalive fires → cancel detected
await vi.advanceTimersByTimeAsync(2_100); // completion watcher polls → 404 → settles
await vi.advanceTimersByTimeAsync(3_100); // bail timer fires → log stream settles
const result = await executePromise;
expect(result.errorCode).toBe("cancelled");
expect(result.errorMessage).toBe("Run cancelled");
expect(result.timedOut).toBe(false);
expect(mockBatchDeleteJob).toHaveBeenCalled();
});
it("treats HTTP 500 on cancel poll as transient and does not cancel", async () => {
// Cancel poll returns 500 → transient, should not cancel.
// After a while the job completes normally.
mockFetch.mockResolvedValue({ ok: false, status: 500 });
// Override: job completes after keepalive tick fires
mockBatchReadJob
.mockResolvedValueOnce({ status: { conditions: [] } }) // first keepalive check: non-terminal
.mockResolvedValue({ status: { conditions: [{ type: "Complete", status: "True" }] } });
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
writable.write(CLAUDE_HAPPY_OUTPUT);
},
);
const executePromise = execute(
makeCtx({ authToken: "tok-abc" } as Partial<AdapterExecutionContext>),
);
await vi.advanceTimersByTimeAsync(15_100); // keepalive fires: 500 → transient, no cancel
await vi.advanceTimersByTimeAsync(3_100); // log reconnect sleep → stopSignal already true
const result = await executePromise;
expect(result.errorCode).toBeUndefined();
expect(result.exitCode).toBe(0);
expect(result.sessionId).toBe("sess_test123");
});
it("skips cancel poll when authToken is absent", async () => {
// No authToken → cancel poll must not be attempted → job completes normally
mockBatchReadJob.mockResolvedValue({
status: { conditions: [{ type: "Complete", status: "True" }] },
});
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
writable.write(CLAUDE_HAPPY_OUTPUT);
},
);
const executePromise = execute(makeCtx()); // no authToken
await vi.advanceTimersByTimeAsync(3_100);
const result = await executePromise;
expect(mockFetch).not.toHaveBeenCalled();
expect(result.exitCode).toBe(0);
});
it("skips cancel poll when PAPERCLIP_API_URL is not set", async () => {
delete process.env.PAPERCLIP_API_URL;
mockBatchReadJob.mockResolvedValue({
status: { conditions: [{ type: "Complete", status: "True" }] },
});
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
writable.write(CLAUDE_HAPPY_OUTPUT);
},
);
const executePromise = execute(
makeCtx({ authToken: "tok-abc" } as Partial<AdapterExecutionContext>),
);
await vi.advanceTimersByTimeAsync(3_100);
const result = await executePromise;
expect(mockFetch).not.toHaveBeenCalled();
expect(result.exitCode).toBe(0);
});
});
// ─── execute: SIGTERM handler ─────────────────────────────────────────────────
describe("execute: SIGTERM handler best-effort cleanup", () => {
beforeEach(() => {
vi.resetAllMocks();
vi.useFakeTimers();
mockReadSkillEntries.mockResolvedValue([]);
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
mockBatchListJobs.mockResolvedValue({ items: [] });
mockPrepareBundle.mockResolvedValue(makeBundle());
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } });
mockBatchPatchJob.mockResolvedValue({});
mockBatchDeleteJob.mockResolvedValue({});
mockCoreDeleteSecret.mockResolvedValue({});
mockCoreListPods
.mockResolvedValueOnce({
items: [{
metadata: { name: "pod-abc" },
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
}],
})
.mockResolvedValue({
items: [{
metadata: { name: "pod-abc" },
status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] },
}],
});
mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } });
mockLogFn.mockImplementation(() => new Promise(() => { /* never resolves */ }));
});
afterEach(() => {
vi.useRealTimers();
});
it("deletes the active Job when SIGTERM fires during execution", async () => {
// Mock process.kill to prevent the test process from actually being killed.
const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true);
// Start execute() and suppress unhandled rejection (we won't await it).
const executePromise = execute(makeCtx());
executePromise.catch(() => {});
// Flush microtasks through the async setup chain: getSelfPodInfo, listJobs,
// readSkillEntries, prepareBundle, createJob, onLog, activeJobs.add(), and
// ensureSigtermHandler() all complete before the try block enters streaming.
// 30 rounds is more than enough for the ~7 sequential await points.
for (let i = 0; i < 30; i++) await Promise.resolve();
// Emit SIGTERM — the process.once handler fires synchronously and kicks off
// async cleanup (deleteNamespacedJob). The mock resolves immediately.
process.emit("SIGTERM");
// Flush microtasks for deleteJob to resolve and the .then(process.kill) to run.
for (let i = 0; i < 10; i++) await Promise.resolve();
expect(mockBatchDeleteJob).toHaveBeenCalled();
expect(killSpy).toHaveBeenCalledWith(process.pid, "SIGTERM");
killSpy.mockRestore();
// afterEach calls vi.useRealTimers() which clears all pending fake timers,
// so we do not need to settle executePromise.
});
});
+140 -13
View File
@@ -39,6 +39,48 @@ const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
// minutes, causing stale "running" status in the UI (FAR-23).
const LOG_EXIT_COMPLETION_GRACE_MS = 30_000;
// Module-level tracking of active Jobs for SIGTERM best-effort cleanup.
interface ActiveJobRef {
namespace: string;
jobName: string;
promptSecretName?: string;
promptSecretNamespace?: string;
kubeconfigPath?: string;
}
const activeJobs = new Set<ActiveJobRef>();
let sigtermHandlerRegistered = false;
function ensureSigtermHandler(): void {
if (sigtermHandlerRegistered) return;
sigtermHandlerRegistered = true;
process.once("SIGTERM", () => {
const jobs = [...activeJobs];
void Promise.allSettled(
jobs.map(async (ref) => {
try {
const batchApi = getBatchApi(ref.kubeconfigPath);
await batchApi.deleteNamespacedJob({
name: ref.jobName,
namespace: ref.namespace,
body: { propagationPolicy: "Background" },
});
} catch { /* best-effort */ }
if (ref.promptSecretName && ref.promptSecretNamespace) {
try {
const coreApi = getCoreApi(ref.kubeconfigPath);
await coreApi.deleteNamespacedSecret({
name: ref.promptSecretName,
namespace: ref.promptSecretNamespace,
});
} catch { /* best-effort */ }
}
}),
).then(() => {
process.kill(process.pid, "SIGTERM");
});
});
}
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
@@ -53,6 +95,16 @@ export function isK8s404(err: unknown): boolean {
return /HTTP-Code:\s*404\b/.test(err.message);
}
/**
* Returns true when the heartbeat-run status indicates the run is no longer
* active and the K8s Job should be cancelled.
* Exported for unit tests.
*/
export function shouldAbortForCancellation(runStatus: string | undefined): boolean {
if (!runStatus) return false;
return runStatus !== "running";
}
/**
* Build the error message when Claude's stdout contains no result event.
* Skips system/init event lines so the UI doesn't display the raw init JSON.
@@ -65,22 +117,31 @@ export function buildPartialRunError(
): string {
if (exitCode === 0) return "Failed to parse Claude JSON output";
// Walk stdout lines, skip system events, return the first real content line.
// Walk stdout lines and skip every structured streaming event (any JSON
// object that carries a non-empty "type" field: system, assistant, user,
// rate_limit_event, result, …). All of these are protocol artefacts and
// produce confusing raw-JSON blobs when surfaced verbatim as an error
// message. Only plain-text lines (non-JSON, or JSON without a type field)
// are treated as human-readable content worth including in the error.
const firstContentLine = stdout.split(/\r?\n/)
.map((l) => l.trim())
.find((l) => {
if (!l) return false;
try {
const obj = JSON.parse(l);
if (typeof obj === "object" && obj !== null && (obj as Record<string, unknown>).type === "system") return false;
if (typeof obj === "object" && obj !== null) {
const t = (obj as Record<string, unknown>).type;
if (typeof t === "string" && t) return false;
}
} catch {
// not JSON — treat as content
}
return true;
}) ?? "";
// If we only have system/init events and nothing else, surface the model
// name so the operator can diagnose missing credentials or unsupported model.
// If the stream contained only structured events with no plain-text output,
// surface the model name so the operator can diagnose missing credentials
// or unsupported/misconfigured model.
const initOnlyOutput = stdout.trim() !== "" && model !== "" && !firstContentLine;
if (initOnlyOutput) {
const modelHint = model ? ` (model: ${model})` : "";
@@ -298,6 +359,10 @@ export async function streamPodLogsOnce(
callback();
return;
}
// Forward raw stream-json lines unchanged. The Paperclip UI uses the
// adapter's ui-parser export (src/ui-parser.ts) to render structured
// transcript entries — pre-formatting here would strip that structure
// and produce flat plain text that looks nothing like claude_local.
void onLog("stdout", emitted).then(() => callback(), callback);
},
});
@@ -546,6 +611,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const graceSec = asNumber(config.graceSec, 60);
const retainJobs = asBoolean(config.retainJobs, false);
const kubeconfigPath = asString(config.kubeconfig, "") || undefined;
const paperclipApiUrl = process.env.PAPERCLIP_API_URL ?? "";
if (!paperclipApiUrl) {
await onLog("stderr", "[paperclip] Warning: PAPERCLIP_API_URL not set — cancel polling disabled\n");
}
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
@@ -580,7 +649,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
labelSelector: `paperclip.io/agent-id=${sanitizedAgentId},paperclip.io/adapter-type=claude_k8s`,
});
const running = existing.items.filter(
(j) => !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"),
(j) =>
!j.metadata?.deletionTimestamp &&
!j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"),
);
if (running.length > 0) {
// Separate orphaned jobs (from a previous server-side run) from truly
@@ -905,6 +976,15 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// delete a job that is still alive and the UI is waiting on.
let skipCleanup = false;
const activeJobRef: ActiveJobRef = {
namespace,
jobName,
...(promptSecret ? { promptSecretName: promptSecret.name, promptSecretNamespace: promptSecret.namespace } : {}),
kubeconfigPath,
};
activeJobs.add(activeJobRef);
ensureSigtermHandler();
try {
// Wait for pod to be ready for log streaming
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
@@ -953,11 +1033,21 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let lastLogAt = Date.now();
let keepaliveJobTerminal = false;
let consecutiveTerminalReadings = 0;
// Shared signal: when job completion resolves, tell the log streamer to
// stop reconnecting. Declared before keepaliveTimer so the cancel path
// inside the timer can set it without temporal dead zone issues.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
// Set when the run is externally cancelled (cancel-poll path).
let cancelled = false;
keepaliveTimer = setInterval(() => {
// Fire-and-forget the async work; setInterval callbacks must be
// synchronous or the timer will drift.
void (async () => {
if (keepaliveJobTerminal) return;
if (keepaliveJobTerminal || cancelled) return;
// Verify the Job is still alive before announcing or refreshing.
// Require two consecutive terminal readings before latching to
@@ -992,6 +1082,37 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return;
}
// Cancel-polling: check if the Paperclip run was cancelled externally.
// Skipped on the reattach path to avoid tearing down an adopted Job.
// HTTP non-2xx is treated as transient — never interpret a 5xx as cancel.
if (!reattachTarget && paperclipApiUrl && ctx.authToken) {
try {
const resp = await fetch(`${paperclipApiUrl}/api/heartbeat-runs/${runId}`, {
headers: { Authorization: `Bearer ${ctx.authToken}` },
});
if (resp.ok) {
const data = await resp.json() as { status?: string };
if (shouldAbortForCancellation(data.status)) {
void onLog("stdout", `[paperclip] Run cancelled externally — deleting Job ${jobName}\n`).catch(() => {});
cancelled = true;
logStopSignal.stopped = true;
try {
await batchApi.deleteNamespacedJob({
name: jobName,
namespace,
body: { propagationPolicy: "Background" },
});
} catch { /* best-effort — completion watcher will see 404 and settle */ }
return;
}
} else if (resp.status >= 500) {
void onLog("stderr", `[paperclip] keepalive: cancel poll returned HTTP ${resp.status} — transient, ignoring\n`).catch(() => {});
}
} catch {
// network error — transient, skip this tick
}
}
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {});
})();
@@ -1001,13 +1122,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return onLog(stream, chunk);
};
// Shared signal: when job completion resolves, tell the log
// streamer to stop reconnecting.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
// Track when the log stream first exits so the grace-period can fire
// if the K8s Job condition lags behind container exit (FAR-23).
// Set via onFirstStreamExit callback (called after attempt=0 returns)
@@ -1067,6 +1181,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
keepaliveTimer = null;
}
// If the run was externally cancelled, return a clean cancelled result
// without processing stdout (the finally block still runs for cleanup).
if (cancelled) {
return {
exitCode: null,
signal: null,
timedOut: false,
errorCode: "cancelled",
errorMessage: "Run cancelled",
};
}
if (logResult.status === "fulfilled") {
stdout = logResult.value;
}
@@ -1141,6 +1267,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
exitCode = await getPodExitCode(namespace, jobName, kubeconfigPath);
} finally {
if (keepaliveTimer) clearInterval(keepaliveTimer);
activeJobs.delete(activeJobRef);
if (skipCleanup) {
await onLog("stdout", `[paperclip] Retaining job ${jobName} (state mismatch — UI is waiting on it)\n`);
} else if (!retainJobs) {