Compare commits

...

7 Commits

Author SHA1 Message Date
Chris Farhood 88896eddcf 0.1.45 2026-04-26 01:54:48 +00:00
Chris Farhood a2874c0426 fix: detect mid-stream truncation and emit claude_truncated error code (FAR-95)
When Claude produces assistant content (output_tokens > 0) but the stream ends
without a result event, classify the run as truncated mid-stream rather than
falling through to the generic "did not produce a result — check API
credentials" message. The misleading hint pointed operators at auth/model
config when the real cause was pod termination, OOMKill, or CLI crash.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-26 01:54:35 +00:00
Chris Farhood 818aa0f1d6 feat: log bundled skill names and add skills to onMeta commandNotes (FAR-36)
Adds a diagnostic log line after skill resolution so operators can see exactly
which skills were bundled into each run, making it straightforward to diagnose
skill availability issues. Also surfaces the skill list in the onMeta
commandNotes for run metadata visibility.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 20:41:01 +00:00
Chris Farhood 55fd3021fb fix: add per-agent mutex to eliminate TOCTOU race in K8s concurrency guard (FAR-29)
Two concurrent execute() calls for the same agent can both pass the
list-then-create guard before either job appears in the other's query.
The new module-level agentCreationMutex serializes the guard+create phase
within the process so only one call enters listNamespacedJob at a time.

The mutex is acquired after sanitizing the agent ID and released in a
finally block that wraps the entire guard+create section, so all early
return paths (guard blocks, create failures) cleanly release it. Variables
used in both the guard+create and log-streaming phases are hoisted to
before the try block. Cross-agent calls use separate mutex slots and are
unaffected.

Added two vitest cases verifying same-agent serialization and that
different-agent calls are not serialized.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 20:10:01 +00:00
Chris Farhood 83b58f9207 fix: detect stop_reason:null + output_tokens:0 and emit llm_api_error (FAR-30)
parseClaudeStreamJson now tracks assistant events with stop_reason:null and
output_tokens:0 (the MiniMax degraded-response pattern). When no result event
follows, execute() returns errorCode:"llm_api_error" with a descriptive message
instead of the generic adapter_failed.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 20:00:42 +00:00
Chris Farhood 602afa9b84 fix: return k8s_job_deleted_externally error code when job deleted mid-run (FAR-31)
When a K8s Job is deleted externally (kubectl delete job or TTL before
terminal condition observed) and stdout has no result event, the adapter
now returns errorCode "k8s_job_deleted_externally" with the message
"K8s Job was deleted externally before Claude could complete" instead of
the misleading "Claude exited with code -1".

Tracks a jobDeletedExternally flag in execute() on the jobGone path and
checks it in the !parsed branch before falling through to buildPartialRunError.
Only applies when exitCode is null (pod gone alongside the job).

Adds regression test: FAR-31 scenario where job 404s mid-run with partial
stdout and missing pod produces the new error code.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 19:58:46 +00:00
Chris Farhood 986f2fc7fa test: add coverage for deletionTimestamp concurrency guard bypass (FAR-34)
Verifies that a terminating K8s job (deletionTimestamp set, no
Complete/Failed condition) is skipped by the concurrency guard so
subsequent heartbeat runs are not incorrectly blocked.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 19:57:10 +00:00
6 changed files with 517 additions and 14 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.42",
"version": "0.1.45",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.42",
"version": "0.1.45",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.43",
"version": "0.1.45",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+284
View File
@@ -577,6 +577,28 @@ describe("execute: concurrency guard", () => {
expect(result.errorMessage).toContain("still running for this agent");
});
it("ignores terminating jobs (deletionTimestamp set) and proceeds past the concurrency guard", async () => {
// A job being force-deleted has deletionTimestamp set but no Complete/Failed condition.
// The guard must treat it as terminal so subsequent runs are not blocked.
const terminating: k8s.V1Job = {
metadata: {
name: "terminating-job",
namespace: "paperclip",
labels: { "paperclip.io/agent-id": "agent-abc", "paperclip.io/adapter-type": "claude_k8s" },
deletionTimestamp: new Date(),
},
status: { conditions: [] },
};
mockBatchListJobs.mockResolvedValue({ items: [terminating] });
// Guard passes → next failure is job creation (no further mocks set up)
mockBatchCreateJob.mockRejectedValue(new Error("quota exceeded"));
mockPrepareBundle.mockResolvedValue(makeBundle());
const result = await execute(makeCtx());
// Must NOT be a concurrency error — the guard let us through
expect(result.errorCode).not.toBe("k8s_concurrent_run_blocked");
expect(result.errorCode).toBe("k8s_job_create_failed");
});
it("reattaches to a matching orphan and returns k8s_pod_reattach_failed when pod is missing", async () => {
// Orphan with matching taskId and sessionId → reattach classification → reattachTarget is set
const orphan = makeJob({
@@ -892,6 +914,124 @@ describe("execute: happy path", () => {
expect(result.sessionId).toBe("sess_test123");
});
it("returns k8s_job_deleted_externally when job 404s mid-run and stdout has no result event (FAR-31)", async () => {
// Reproduces the observed scenario: kubectl delete job while Claude is mid-run.
// The log stream captures only partial output (no result event), and the pod
// is also gone so getPodExitCode returns null. The adapter must emit a
// descriptive error instead of the misleading "Claude exited with code -1".
// Log stream writes only the init line — no result event (mid-run deletion)
const partialOutput = JSON.stringify({
type: "system",
subtype: "init",
model: "claude-sonnet-4-6",
session_id: "sess_x",
}) + "\n";
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: Writable) => {
writable.write(partialOutput);
},
);
// Job is gone (404) — matches the kubectl-delete-job-mid-run scenario
mockBatchReadJob.mockRejectedValue(
Object.assign(new Error("Not Found"), { response: { statusCode: 404 } }),
);
// Pod is also gone — getPodExitCode returns null (no pod found)
mockCoreListPods.mockReset();
mockCoreListPods
.mockResolvedValueOnce({
items: [{
metadata: { name: "pod-abc" },
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
}],
})
.mockResolvedValue({ items: [] }); // pod gone → exitCode null
const executePromise = execute(makeCtx());
await vi.advanceTimersByTimeAsync(3_100);
const result = await executePromise;
expect(result.errorCode).toBe("k8s_job_deleted_externally");
expect(result.errorMessage).toBe("K8s Job was deleted externally before Claude could complete");
expect(result.exitCode).toBeNull();
});
it("returns llm_api_error when assistant event has stop_reason:null and output_tokens:0 (FAR-30)", async () => {
// Reproduces the MiniMax degradation pattern: init event + assistant event with
// stop_reason:null and output_tokens:0, no result event, Claude exits -1.
const emptyResponseOutput = [
JSON.stringify({ type: "system", subtype: "init", model: "MiniMax-M2.7", session_id: "sess_mm" }),
JSON.stringify({
type: "assistant",
session_id: "sess_mm",
message: {
id: "msg_empty",
stop_reason: null,
usage: { input_tokens: 500, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 },
content: [],
},
}),
].join("\n") + "\n";
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: Writable) => {
writable.write(emptyResponseOutput);
},
);
// getPodExitCode: exit code -1 (as reported in the issue)
mockCoreListPods.mockResolvedValue({
items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: -1 } } }] } }],
});
const executePromise = execute(makeCtx());
await vi.advanceTimersByTimeAsync(3_100);
const result = await executePromise;
expect(result.errorCode).toBe("llm_api_error");
expect(result.errorMessage).toContain("stop_reason: null");
expect(result.errorMessage).toContain("output_tokens: 0");
});
it("returns claude_truncated when assistant produced content but no result event arrived (FAR-95)", async () => {
const truncatedOutput = [
JSON.stringify({ type: "system", subtype: "init", model: "claude-opus-4-7", session_id: "sess_trunc" }),
JSON.stringify({
type: "assistant",
session_id: "sess_trunc",
message: {
id: "msg_trunc",
stop_reason: null,
usage: { input_tokens: 1, output_tokens: 35, cache_creation_input_tokens: 523, cache_read_input_tokens: 46295 },
content: [{ type: "tool_use", id: "tool_1", name: "Bash", input: { command: "echo hi" } }],
},
}),
JSON.stringify({
type: "user",
message: { role: "user", content: [{ tool_use_id: "tool_1", type: "tool_result", content: "hi", is_error: false }] },
}),
].join("\n") + "\n";
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: Writable) => {
writable.write(truncatedOutput);
},
);
mockCoreListPods.mockResolvedValue({
items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 137 } } }] } }],
});
const executePromise = execute(makeCtx());
await vi.advanceTimersByTimeAsync(3_100);
const result = await executePromise;
expect(result.errorCode).toBe("claude_truncated");
expect(result.errorMessage).toContain("truncated mid-stream");
expect(result.errorMessage).toContain("claude-opus-4-7");
expect(result.errorMessage).toContain("exit code 137");
});
it("reconnects log stream and logs status when job completion takes > 3s", async () => {
// Make waitForJobCompletion take 4s so the 3s stream reconnect fires first.
// timeoutSec=4, graceSec=0 → completionTimeoutMs=4000.
@@ -1065,6 +1205,62 @@ describe("execute: happy path", () => {
expect(result.exitCode).toBe(0);
});
it("logs bundled skill names and count (FAR-36 diagnostic)", async () => {
const skills = [
{ key: "safety--abc123", runtimeName: "safety--abc123", desired: true, managed: true, required: true, state: "configured" as const },
{ key: "sdlc--def456", runtimeName: "sdlc--def456", desired: true, managed: true, required: true, state: "configured" as const },
];
mockReadSkillEntries.mockResolvedValue(skills);
const logs: Array<{ stream: string; msg: string }> = [];
const onLog = vi.fn().mockImplementation(async (stream: string, msg: string) => { logs.push({ stream, msg }); });
const executePromise = execute(makeCtx({ onLog } as Partial<AdapterExecutionContext>));
await vi.advanceTimersByTimeAsync(3_100);
await executePromise;
const skillLine = logs.find((l) => l.msg.includes("Skills bundled"));
expect(skillLine).toBeDefined();
expect(skillLine?.stream).toBe("stdout");
expect(skillLine?.msg).toContain("(2):");
expect(skillLine?.msg).toContain("safety--abc123");
expect(skillLine?.msg).toContain("sdlc--def456");
});
it("logs Skills bundled (0): none when no skills are configured (FAR-36 diagnostic)", async () => {
mockReadSkillEntries.mockResolvedValue([]);
const logs: Array<{ stream: string; msg: string }> = [];
const onLog = vi.fn().mockImplementation(async (stream: string, msg: string) => { logs.push({ stream, msg }); });
const executePromise = execute(makeCtx({ onLog } as Partial<AdapterExecutionContext>));
await vi.advanceTimersByTimeAsync(3_100);
await executePromise;
const skillLine = logs.find((l) => l.msg.includes("Skills bundled"));
expect(skillLine).toBeDefined();
expect(skillLine?.msg).toContain("(0): none");
});
it("includes skill count in onMeta commandNotes (FAR-36 diagnostic)", async () => {
const skills = [
{ key: "safety--abc123", runtimeName: "safety--abc123", desired: true, managed: true, required: true, state: "configured" as const },
];
mockReadSkillEntries.mockResolvedValue(skills);
const onMeta = vi.fn().mockResolvedValue(undefined);
const executePromise = execute(makeCtx({ onMeta } as Partial<AdapterExecutionContext>));
await vi.advanceTimersByTimeAsync(3_100);
await executePromise;
expect(onMeta).toHaveBeenCalled();
const notes: string[] = onMeta.mock.calls[0][0].commandNotes;
const skillsNote = notes.find((n: string) => n.startsWith("Skills"));
expect(skillsNote).toBeDefined();
expect(skillsNote).toContain("(1):");
expect(skillsNote).toContain("safety--abc123");
});
});
// ─── execute: waitForPod edge cases ──────────────────────────────────────────
@@ -1535,3 +1731,91 @@ describe("execute: SIGTERM handler best-effort cleanup", () => {
// so we do not need to settle executePromise.
});
});
// ─── execute: per-agent creation mutex (FAR-29 TOCTOU fix) ───────────────────
//
// Verifies that two concurrent execute() calls for the same agent cannot both
// enter the listNamespacedJob → createNamespacedJob sequence simultaneously.
// Without the per-agent mutex, both would pass the concurrency guard before
// either job appears in the other's list query.
describe("execute: per-agent creation mutex prevents TOCTOU race", () => {
beforeEach(() => {
vi.resetAllMocks();
mockReadSkillEntries.mockResolvedValue([]);
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
mockPrepareBundle.mockResolvedValue(makeBundle());
// Make job creation fail so the guard+create phase exits quickly and
// releases the mutex without needing to mock the full streaming path.
mockBatchCreateJob.mockRejectedValue(new Error("mock: create not configured"));
mockBatchDeleteJob.mockResolvedValue({});
mockCoreDeleteSecret.mockResolvedValue({});
});
it("serializes guard phases for the same agent: call-2 waits until call-1 exits guard+create", async () => {
const listCalls: string[] = [];
let resolveFirstList!: (v: { items: [] }) => void;
mockBatchListJobs
.mockImplementationOnce(() => {
listCalls.push("call-1");
return new Promise<{ items: [] }>((resolve) => { resolveFirstList = resolve; });
})
.mockImplementation(() => {
listCalls.push("call-2");
return Promise.resolve({ items: [] });
});
const p1 = execute(makeCtx({ runId: "run-1" }));
const p2 = execute(makeCtx({ runId: "run-2" }));
// Drain microtasks: call-1 should be suspended in listNamespacedJob while
// call-2 waits behind the per-agent mutex, not yet calling list.
for (let i = 0; i < 20; i++) await Promise.resolve();
expect(listCalls).toEqual(["call-1"]);
// Let call-1's guard resolve (no running jobs). It will proceed to job
// creation, fail (mock rejects), and release the mutex in finally.
resolveFirstList({ items: [] });
await Promise.allSettled([p1, p2]);
// call-2 must have listed, and only AFTER call-1's guard resolved.
// The exact order: call-1 listed → call-1 list resolved → call-2 listed.
expect(listCalls).toEqual(["call-1", "call-2"]);
});
it("does not serialize guard phases for different agents", async () => {
const listCalls: string[] = [];
let resolveAgentAList!: (v: { items: [] }) => void;
// Agent A's list is artificially slow. Agent B (different id) should
// proceed immediately without waiting — the mutex is keyed by agent id.
mockBatchListJobs
.mockImplementationOnce(() => {
listCalls.push("A");
return new Promise<{ items: [] }>((resolve) => { resolveAgentAList = resolve; });
})
.mockImplementation(() => {
listCalls.push("B");
return Promise.resolve({ items: [] });
});
const ctxA = makeCtx({ runId: "run-A" });
const ctxB = makeCtx({
runId: "run-B",
agent: { id: "agent-other", companyId: "co1", name: "Other Agent", adapterType: "claude_k8s", adapterConfig: {} },
} as Partial<AdapterExecutionContext>);
const pA = execute(ctxA);
const pB = execute(ctxB);
// Drain microtasks — B should have called list even though A is still
// suspended, because they use separate mutex slots.
for (let i = 0; i < 20; i++) await Promise.resolve();
expect(listCalls).toContain("B");
// Let A complete so the promises settle cleanly.
resolveAgentAList({ items: [] });
await Promise.allSettled([pA, pB]);
});
});
+76 -11
View File
@@ -48,6 +48,10 @@ interface ActiveJobRef {
kubeconfigPath?: string;
}
const activeJobs = new Set<ActiveJobRef>();
// Per-agent serialization lock: prevents the TOCTOU race (FAR-29) where two
// concurrent execute() calls for the same agent both pass the list-then-create
// guard and create K8s Jobs simultaneously on the shared PVC.
const agentCreationMutex = new Map<string, Promise<void>>();
let sigtermHandlerRegistered = false;
function ensureSigtermHandler(): void {
@@ -633,17 +637,40 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
errorCode: "k8s_agent_id_invalid",
};
}
// FAR-29: serialize guard+create per agent within this process to prevent the
// TOCTOU race where two concurrent execute() calls both pass the list-then-create
// guard and create K8s Jobs simultaneously on the shared PVC.
const _prevCreation = agentCreationMutex.get(agentId) ?? Promise.resolve();
let _releaseMutex: () => void = () => {};
const _mutexSlot = new Promise<void>((resolve) => { _releaseMutex = resolve; });
// Chain: next caller for this agent waits on _mutexSlot, which resolves in finally.
agentCreationMutex.set(agentId, _prevCreation.then(() => _mutexSlot, () => _mutexSlot));
// Wait for any prior execute() call to finish its guard+create phase.
await _prevCreation.catch(() => {});
// Hoist declarations used in both the guard+create phase and the log-streaming
// section so the mutex try/finally can be added without a large re-indent.
let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null;
// eslint-disable-next-line prefer-const
let jobName!: string;
// eslint-disable-next-line prefer-const
let namespace!: string;
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
// runtimeSessionParams and currentSessionIdRaw are also used after the
// try block (in the result-parsing section) so hoist them here.
const runtimeSessionParams = parseObject(runtime.sessionParams);
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const coreApi = getCoreApi(kubeconfigPath);
const batchApi = getBatchApi(kubeconfigPath);
try {
const selfPod = await getSelfPodInfo(kubeconfigPath);
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
const runtimeSessionParams = parseObject(runtime.sessionParams);
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null;
const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, "");
const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null;
let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null;
try {
const batchApi = getBatchApi(kubeconfigPath);
const existing = await batchApi.listNamespacedJob({
namespace: guardNamespace,
labelSelector: `paperclip.io/agent-id=${sanitizedAgentId},paperclip.io/adapter-type=claude_k8s`,
@@ -763,19 +790,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
const coreApi = getCoreApi(kubeconfigPath);
const batchApi = getBatchApi(kubeconfigPath);
let jobName: string;
let namespace: string;
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
// Prepare the prompt bundle (skills + instructions) on the server filesystem.
// The K8s Job pod mounts the same PVC at /paperclip, so bundle paths written
// here are accessible inside the pod at the identical absolute path.
const skillEntries = await readPaperclipRuntimeSkillEntries(config, import.meta.dirname ?? __dirname);
const desiredSkillNames = new Set(resolvePaperclipDesiredSkillNames(config, skillEntries));
const desiredSkills = skillEntries.filter((e) => desiredSkillNames.has(e.key));
const skillSummary = desiredSkills.length > 0 ? desiredSkills.map((s) => s.runtimeName ?? s.key).join(", ") : "none";
await onLog("stdout", `[paperclip] Skills bundled (${desiredSkills.length}): ${skillSummary}\n`);
const instructionsFilePath = asString(config.instructionsFilePath, "").trim();
const instructionsFileDir = instructionsFilePath ? `${path.dirname(instructionsFilePath)}/` : "";
let instructionsContents: string | null = null;
@@ -872,6 +894,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
`Skills (${desiredSkills.length}): ${skillSummary}`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
@@ -967,6 +990,11 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
}
} finally {
// Release the per-agent creation mutex so the next queued execute() call
// can proceed with its guard+create phase (FAR-29).
_releaseMutex();
}
let stdout = "";
let exitCode: number | null = null;
@@ -975,6 +1003,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// Set when we return a mismatch error so the finally block knows not to
// delete a job that is still alive and the UI is waiting on.
let skipCleanup = false;
// Set when the job disappeared (404) or grace-timer fired before we saw a
// terminal condition — used to emit a clearer error when stdout parsing fails.
let jobDeletedExternally = false;
const activeJobRef: ActiveJobRef = {
namespace,
@@ -1231,6 +1262,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// condition. The container must have exited first (TTL only fires after
// completion), so log streaming has captured the full output — continue
// to stdout parsing rather than returning an error.
jobDeletedExternally = true;
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
}
} else {
@@ -1247,6 +1279,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
} else if (actualState.jobGone) {
// Job was deleted before we could confirm terminal state — same as the
// fulfilled+jobGone case above: proceed with captured output.
jobDeletedExternally = true;
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
} else if (!actualState.succeeded) {
// Job still not terminal — the completion error was likely transient.
@@ -1314,6 +1347,38 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
if (!parsed) {
if (jobDeletedExternally && exitCode === null) {
return {
exitCode,
signal: null,
timedOut: false,
errorMessage: "K8s Job was deleted externally before Claude could complete",
errorCode: "k8s_job_deleted_externally",
resultJson: { stdout },
};
}
if (parsedStream.llmApiEmptyResponse) {
return {
exitCode,
signal: null,
timedOut: false,
errorMessage: "LLM API returned an empty response (stop_reason: null, output_tokens: 0) — the upstream model API may be degraded or misconfigured",
errorCode: "llm_api_error",
resultJson: { stdout },
};
}
if (parsedStream.truncatedMidStream) {
const exitHint = exitCode === null ? "no exit code" : `exit code ${exitCode}`;
const modelHint = parsedStream.model ? ` (model: ${parsedStream.model})` : "";
return {
exitCode,
signal: null,
timedOut: false,
errorMessage: `Claude run was truncated mid-stream${modelHint} — assistant produced content but no result event arrived (${exitHint}); pod may have been terminated, OOMKilled, or the CLI crashed`,
errorCode: "claude_truncated",
resultJson: { stdout },
};
}
return {
exitCode,
signal: null,
+125
View File
@@ -154,6 +154,131 @@ more raw output`;
// Should not be "Hello world\n\nHello world"
expect(result.summary.split("Hello world").length).toBe(2);
});
it("sets llmApiEmptyResponse=true when stop_reason:null and usage.output_tokens:0", () => {
const initLine = JSON.stringify({ type: "system", subtype: "init", model: "MiniMax-M2.7", session_id: "sess_1" });
const assistantEvent = JSON.stringify({
type: "assistant",
session_id: "sess_1",
message: {
id: "msg_abc",
stop_reason: null,
usage: { input_tokens: 100, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 },
content: [],
},
});
const result = parseClaudeStreamJson([initLine, assistantEvent].join("\n"));
expect(result.llmApiEmptyResponse).toBe(true);
expect(result.resultJson).toBeNull();
});
it("sets llmApiEmptyResponse=true when stop_reason:null and message-level output_tokens:0", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: { stop_reason: null, output_tokens: 0, content: [] },
});
const result = parseClaudeStreamJson(assistantEvent);
expect(result.llmApiEmptyResponse).toBe(true);
});
it("does not set llmApiEmptyResponse when stop_reason is non-null", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: {
stop_reason: "end_turn",
usage: { output_tokens: 0 },
content: [],
},
});
const result = parseClaudeStreamJson(assistantEvent);
expect(result.llmApiEmptyResponse).toBe(false);
});
it("does not set llmApiEmptyResponse when output_tokens > 0", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: {
stop_reason: null,
usage: { output_tokens: 5 },
content: [{ type: "text", text: "hello" }],
},
});
const result = parseClaudeStreamJson(assistantEvent);
expect(result.llmApiEmptyResponse).toBe(false);
});
it("clears llmApiEmptyResponse when a result event follows the empty assistant event", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: { stop_reason: null, usage: { output_tokens: 0 }, content: [] },
});
const resultEvent = JSON.stringify({
type: "result",
result: "Done",
subtype: "stop",
total_cost_usd: 0.001,
usage: { input_tokens: 10, output_tokens: 5, cache_read_input_tokens: 0 },
});
const result = parseClaudeStreamJson([assistantEvent, resultEvent].join("\n"));
expect(result.llmApiEmptyResponse).toBe(false);
expect(result.resultJson).not.toBeNull();
});
it("sets truncatedMidStream=true when assistant event with output_tokens>0 has no result (FAR-95)", () => {
const initLine = JSON.stringify({ type: "system", subtype: "init", model: "claude-opus-4-7", session_id: "sess_1" });
const assistantEvent = JSON.stringify({
type: "assistant",
session_id: "sess_1",
message: {
id: "msg_abc",
stop_reason: null,
usage: { input_tokens: 1, output_tokens: 35, cache_creation_input_tokens: 523, cache_read_input_tokens: 46295 },
content: [{ type: "tool_use", id: "tool_1", name: "Bash", input: { command: "echo hi" } }],
},
});
const result = parseClaudeStreamJson([initLine, assistantEvent].join("\n"));
expect(result.truncatedMidStream).toBe(true);
expect(result.llmApiEmptyResponse).toBe(false);
expect(result.resultJson).toBeNull();
});
it("clears truncatedMidStream when a result event follows assistant content", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: { stop_reason: null, usage: { output_tokens: 35 }, content: [] },
});
const resultEvent = JSON.stringify({
type: "result",
result: "Done",
subtype: "stop",
total_cost_usd: 0.001,
usage: { input_tokens: 10, output_tokens: 5, cache_read_input_tokens: 0 },
});
const result = parseClaudeStreamJson([assistantEvent, resultEvent].join("\n"));
expect(result.truncatedMidStream).toBe(false);
expect(result.resultJson).not.toBeNull();
});
it("does not set truncatedMidStream when assistant has output_tokens=0", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: { stop_reason: null, usage: { output_tokens: 0 }, content: [] },
});
const result = parseClaudeStreamJson(assistantEvent);
expect(result.truncatedMidStream).toBe(false);
});
it("sets llmApiEmptyResponse=false for normal result", () => {
const resultEvent = JSON.stringify({
type: "result",
result: "Done",
subtype: "stop",
total_cost_usd: 0.005,
usage: { input_tokens: 100, output_tokens: 200, cache_read_input_tokens: 50 },
});
const result = parseClaudeStreamJson(resultEvent);
expect(result.llmApiEmptyResponse).toBe(false);
});
});
describe("extractClaudeLoginUrl", () => {
+29
View File
@@ -15,6 +15,14 @@ export function parseClaudeStreamJson(stdout: string) {
// at the line level; this guard only needs to protect against the same
// message block being parsed twice.
const seenBlocks = new Set<string>();
// Set when we see stop_reason:null + output_tokens:0 on an assistant event
// with no subsequent result event — indicates the upstream LLM API returned
// an empty/malformed response (e.g. MiniMax degraded performance).
let llmApiEmptyResponse = false;
// Set when an assistant event with output_tokens > 0 was seen but no result
// event arrived — indicates the run was truncated mid-stream (pod terminated,
// OOMKill, or claude CLI crash after producing content).
let assistantContentSeen = false;
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
@@ -34,6 +42,21 @@ export function parseClaudeStreamJson(stdout: string) {
const message = parseObject(event.message);
const messageId = asString(message.id, "");
const content = Array.isArray(message.content) ? message.content : [];
// Detect empty LLM API response: stop_reason:null with zero output tokens.
// output_tokens may appear directly on message or nested under message.usage.
const stopReason = message.stop_reason;
const usageObj = parseObject(message.usage as Record<string, unknown>);
const outputTokens = typeof message.output_tokens === "number"
? message.output_tokens
: asNumber(usageObj.output_tokens, -1);
if (stopReason === null && outputTokens === 0) {
llmApiEmptyResponse = true;
}
if (outputTokens > 0) {
assistantContentSeen = true;
}
for (let i = 0; i < content.length; i++) {
const entry = content[i];
if (typeof entry !== "object" || entry === null || Array.isArray(entry)) continue;
@@ -55,6 +78,8 @@ export function parseClaudeStreamJson(stdout: string) {
if (type === "result") {
finalResult = event;
llmApiEmptyResponse = false; // result event means Claude completed normally
assistantContentSeen = false; // result event means stream was not truncated
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
}
}
@@ -67,6 +92,8 @@ export function parseClaudeStreamJson(stdout: string) {
usage: null as UsageSummary | null,
summary: assistantTexts.join("\n\n").trim(),
resultJson: null as Record<string, unknown> | null,
llmApiEmptyResponse,
truncatedMidStream: assistantContentSeen,
};
}
@@ -87,6 +114,8 @@ export function parseClaudeStreamJson(stdout: string) {
usage,
summary,
resultJson: finalResult,
llmApiEmptyResponse: false,
truncatedMidStream: false,
};
}