Compare commits

...

23 Commits

Author SHA1 Message Date
Chris Farhood 34756f8215 0.1.53 2026-04-27 00:28:45 +00:00
Chris Farhood 07ef106c66 fix: gate grace timer on stream-output silence, not first disconnect (FAR-107)
The 30s grace timer that bounds K8s Job condition propagation lag was
armed by streamPodLogs's onFirstStreamExit callback the moment
streamPodLogsOnce returned for the first time.  A transient K8s log-API
disconnect mid-run also returns from streamPodLogsOnce — so the grace
timer fired 30s later regardless of whether streamPodLogs had already
reconnected and the container was still producing output.

Nancy / Privileged Escalation reproduced this on long Opus-4-6 runs:
the prod paperclip pod was stable, the cancel-poll guard was already
narrowed in 0.1.51, but every long run truncated with claude_truncated
+ "container terminated state not yet observable (pod phase=Running)"
because the run was being abandoned mid-output.

Replace the boolean onFirstStreamExit signal with a streamActivity ref
carrying lastActiveAt + streamHasExited.  streamPodLogs refreshes
lastActiveAt every time a streamPodLogsOnce attempt returns non-empty
output, so reconnects that resume real output keep the grace clock
reset.  The grace timer fires only once the stream has exited at least
once AND no chunk has arrived for the full grace window — which
preserves the original FAR-23 behaviour (container truly exited but
Job condition lags) while ending the false-truncation of healthy
streams.  Adds a regression test that asserts a stream drop + reconnect
+ deferred Job completion does not surface as truncated.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-27 00:28:44 +00:00
Chris Farhood fd7dce7239 0.1.52 2026-04-27 00:00:57 +00:00
Chris Farhood b1878c684e fix: retry-aware pod state lookup + honest truncation cause messages (FAR-107)
The single-shot getPodTerminatedState query lost a real race against
kubelet's containerStatus update: when Claude exited cleanly but quickly,
listNamespacedPod often returned the pod with phase=Succeeded/Failed but
without a populated state.terminated, so describeTruncationCause fell into
the catch-all "pod state unavailable — likely deleted before exit could
be read" branch.  That message is doubly wrong: the pod was not deleted
and the exit cause was readable a few hundred ms later.  Operators
chasing claude_truncated runs (Nancy/Privileged Escalation) had no
visibility into the actual exit code, OOMKilled flag, or reason.

Two changes:

1. Introduce lookupPodState + getPodLookupWithRetry — the lookup result
   carries the pod phase and a podMissing flag, and retries up to 4×500ms
   when the pod is in a terminal phase but containerStatuses lag.  When
   the pod is in a non-terminal phase or genuinely gone we bail
   immediately without burning the retry budget.

2. describeTruncationCause now distinguishes three states:
   - "pod is gone" (eviction, preemption, external delete)
   - "container terminated state not yet observable (pod phase=…)"
   - the existing populated-state path with exit code / reason / signal

The truncation error path re-queries with the retry-aware lookup right
before producing the message, so subsequent claude_truncated errors
surface the actual exit cause (137=OOMKilled, 143=SIGTERM, kubelet
reason text) instead of a misleading deletion claim.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-27 00:00:56 +00:00
Chris Farhood 83e105393c 0.1.51 2026-04-26 21:24:15 +00:00
Chris Farhood 49288fa5c7 fix: scope cancel-polling to explicit cancellation states only (FAR-107)
shouldAbortForCancellation previously treated any non-`running` runStatus
as a cancellation signal — which made the keepalive's cancel-poll delete
the K8s Job whenever the heartbeat-runs API briefly returned a transient
or stale status (e.g. queued, pending, succeeded, failed, completed,
unknown) for an in-flight run.  The follow-up `waitForJobCompletion`
poll then observed the 404 and surfaced a spurious
`k8s_job_deleted_externally` error to the user, even though no human
or external system deleted the Job.

Privileged Escalation's "null-pointer-nancy" agent reproduced this on
runs that were never cancelled and were not adjacent to a paperclip
restart, ruling out the SIGTERM path that 0.1.50 already addressed.

Tighten the guard to fire only on `cancelled` / `cancelling`.  Other
terminal statuses are unreachable while the adapter is still executing
(the adapter's own return is what flips them) and even if observed
mid-run, they do not justify deleting a Job that may still be doing
real work — the natural completion path will tear it down.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-26 21:24:11 +00:00
Chris Farhood dae9e18659 0.1.50 2026-04-26 21:19:03 +00:00
Chris Farhood 6923597b31 fix: do not delete active Jobs on SIGTERM — leave for orphan reattach (FAR-107)
Root cause of Nancy's k8s_job_deleted_externally false positive: the
paperclip server itself receives SIGTERM during rolling deploys,
evictions, scale-down, etc.  The previous SIGTERM handler iterated
activeJobs and deleted every Job before exiting, which surfaced in the
in-flight heartbeat as "K8s Job was deleted externally" — even though
nothing external touched it.

With reattachOrphanedJobs=true (default), this is exactly the wrong
behaviour: leaving the Jobs alive lets the next paperclip process
discover them via the orphan-classification path and reattach their
log streams.  With reattachOrphanedJobs=false the operator opted into
manual cleanup, so we still must not auto-delete.

The Job's ownerReference (FAR-15) keeps the prompt Secret tied to the
Job, so both survive together and TTL handles cleanup on natural
completion.  Test rewritten to assert the new contract: SIGTERM must
not touch K8s Jobs.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-26 21:19:02 +00:00
Chris Farhood d184a1732b 0.1.49 2026-04-26 21:06:19 +00:00
Chris Farhood be84428226 fix: enrich k8s_job_deleted_externally error with forensics + verify Job presence on grace fire (FAR-107)
The error previously fired with no diagnostic context, making it impossible
to distinguish (a) self-delete by our SIGTERM/cancel path, (b) TTL after a
missed Complete condition, or (c) actual external deletion without cluster
shell access.  Two changes:

1. Grace-period verification: when the log stream exits and the 30s grace
   timer fires, do a one-shot readNamespacedJob before declaring the Job
   gone.  If it's still there, settle as gracePeriodFired (not jobGone) so
   we don't mis-classify K8s condition propagation lag as deletion.

2. Forensic capture: track which of the three detection paths
   (completion-poll-404, grace-period-verify-404, recheck-poll-404)
   first observed the 404, the last successful Job conditions read, the
   poll count, elapsed time since pod-running, and stdout size.  Append
   all of it to the errorMessage so the next occurrence is self-diagnosing.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-26 21:05:04 +00:00
Chris Farhood d9928030d6 0.1.48 2026-04-26 14:48:22 +00:00
Chris Farhood 76fc6fcdfc fix: surface pod terminated reason/message in adapter_failed errors (FAR-100)
The init-only and partial-run error paths now embed the K8s container
terminated state (reason, message, signal, OOM hint) directly in the
errorMessage. This eliminates the kubectl round-trip when diagnosing
adapter_failed runs — the surfaced error self-explains.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-26 14:48:12 +00:00
Chris Farhood 3169f49f23 0.1.47 2026-04-26 13:04:54 +00:00
Chris Farhood e0b35d230f fix: distinguish init-only non-zero exits in buildPartialRunError (FAR-100)
Init-only runs that exit with a non-zero code now surface a more actionable
message naming the exit code and the likely cause (unsupported model or
rejected session) instead of the generic "did not produce a result" text.
Helps operators diagnose model-id / billing-tier failures (e.g. opus 4.6).

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-26 13:04:43 +00:00
Chris Farhood 4e2c36319d 0.1.46 2026-04-26 01:57:43 +00:00
Chris Farhood 8474f78fe1 fix: include pod terminated reason/message in claude_truncated error (FAR-95)
Capture the claude container's terminated state (exit code, reason, message,
signal) and surface it in the truncation error so operators see *why* the run
was cut short — e.g. "exit code 137, SIGKILL (commonly OOMKilled),
reason=OOMKilled, message=Memory cgroup out of memory" instead of just a
"truncated" label with no diagnostic context.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-26 01:57:43 +00:00
Chris Farhood 88896eddcf 0.1.45 2026-04-26 01:54:48 +00:00
Chris Farhood a2874c0426 fix: detect mid-stream truncation and emit claude_truncated error code (FAR-95)
When Claude produces assistant content (output_tokens > 0) but the stream ends
without a result event, classify the run as truncated mid-stream rather than
falling through to the generic "did not produce a result — check API
credentials" message. The misleading hint pointed operators at auth/model
config when the real cause was pod termination, OOMKill, or CLI crash.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-26 01:54:35 +00:00
Chris Farhood 818aa0f1d6 feat: log bundled skill names and add skills to onMeta commandNotes (FAR-36)
Adds a diagnostic log line after skill resolution so operators can see exactly
which skills were bundled into each run, making it straightforward to diagnose
skill availability issues. Also surfaces the skill list in the onMeta
commandNotes for run metadata visibility.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 20:41:01 +00:00
Chris Farhood 55fd3021fb fix: add per-agent mutex to eliminate TOCTOU race in K8s concurrency guard (FAR-29)
Two concurrent execute() calls for the same agent can both pass the
list-then-create guard before either job appears in the other's query.
The new module-level agentCreationMutex serializes the guard+create phase
within the process so only one call enters listNamespacedJob at a time.

The mutex is acquired after sanitizing the agent ID and released in a
finally block that wraps the entire guard+create section, so all early
return paths (guard blocks, create failures) cleanly release it. Variables
used in both the guard+create and log-streaming phases are hoisted to
before the try block. Cross-agent calls use separate mutex slots and are
unaffected.

Added two vitest cases verifying same-agent serialization and that
different-agent calls are not serialized.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 20:10:01 +00:00
Chris Farhood 83b58f9207 fix: detect stop_reason:null + output_tokens:0 and emit llm_api_error (FAR-30)
parseClaudeStreamJson now tracks assistant events with stop_reason:null and
output_tokens:0 (the MiniMax degraded-response pattern). When no result event
follows, execute() returns errorCode:"llm_api_error" with a descriptive message
instead of the generic adapter_failed.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 20:00:42 +00:00
Chris Farhood 602afa9b84 fix: return k8s_job_deleted_externally error code when job deleted mid-run (FAR-31)
When a K8s Job is deleted externally (kubectl delete job or TTL before
terminal condition observed) and stdout has no result event, the adapter
now returns errorCode "k8s_job_deleted_externally" with the message
"K8s Job was deleted externally before Claude could complete" instead of
the misleading "Claude exited with code -1".

Tracks a jobDeletedExternally flag in execute() on the jobGone path and
checks it in the !parsed branch before falling through to buildPartialRunError.
Only applies when exitCode is null (pod gone alongside the job).

Adds regression test: FAR-31 scenario where job 404s mid-run with partial
stdout and missing pod produces the new error code.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 19:58:46 +00:00
Chris Farhood 986f2fc7fa test: add coverage for deletionTimestamp concurrency guard bypass (FAR-34)
Verifies that a terminating K8s job (deletionTimestamp set, no
Complete/Failed condition) is skipped by the concurrency guard so
subsequent heartbeat runs are not incorrectly blocked.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 19:57:10 +00:00
6 changed files with 1021 additions and 111 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.42",
"version": "0.1.53",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.42",
"version": "0.1.53",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.43",
"version": "0.1.53",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+430 -19
View File
@@ -60,7 +60,7 @@ vi.mock("@paperclipai/adapter-utils/server-utils", async (importOriginal) => {
});
});
const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, shouldAbortForCancellation, execute } = await import("./execute.js");
const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, describeTruncationCause, streamPodLogsOnce, shouldAbortForCancellation, execute } = await import("./execute.js");
function makeJob(opts: {
runId?: string;
@@ -150,10 +150,10 @@ describe("buildPartialRunError", () => {
expect(buildPartialRunError(null, "", "")).toBe("Claude exited with code -1");
});
it("skips system/init events and returns generic message when only init captured", () => {
it("returns init-only message when stdout is init-only with non-zero exit code (FAR-101)", () => {
const msg = buildPartialRunError(1, "claude-sonnet-4-6", initLine);
expect(msg).toBe(
"Claude started but did not produce a result (model: claude-sonnet-4-6) — check API credentials, model support, and adapter config",
"Claude exited immediately after init (model: claude-sonnet-4-6) (exit code 1) — the model may be unsupported or the session may have been rejected before producing output",
);
});
@@ -170,15 +170,15 @@ describe("buildPartialRunError", () => {
expect(msg).toBe("Claude exited with code 1: Error: no API key configured");
});
it("skips result events (structured protocol artefact not surfaced verbatim)", () => {
it("returns init-only message when stdout has init + result event but no plain content (structured artefact, not surfaced verbatim)", () => {
// In production, buildPartialRunError is only called when parseClaudeStreamJson
// returns null (no result event). If somehow a result event appears here, the
// raw JSON blob must not be shown — the "did not produce a result" message is
// cleaner and avoids leaking protocol internals to the UI.
// raw JSON blob must not be shown — the init-only message is cleaner and avoids
// leaking protocol internals to the UI.
const resultLike = JSON.stringify({ type: "result", subtype: "error", result: "rate limit" });
const stdout = [initLine, resultLike].join("\n");
const msg = buildPartialRunError(2, "claude-sonnet-4-6", stdout);
expect(msg).toContain("did not produce a result");
expect(msg).toContain("Claude exited immediately after init");
expect(msg).toContain("claude-sonnet-4-6");
expect(msg).not.toMatch(/\{.*type.*result/);
});
@@ -245,6 +245,44 @@ describe("buildPartialRunError", () => {
const msg = buildPartialRunError(1, "model-x", stdout);
expect(msg).toBe("Claude exited with code 1: real error line");
});
it("appends pod terminated reason/message when state is provided (FAR-100)", () => {
const msg = buildPartialRunError(1, "claude-sonnet-4-6", initLine, {
exitCode: 1,
reason: "Error",
message: "model not supported",
signal: null,
});
expect(msg).toContain("Claude exited immediately after init");
expect(msg).toContain("claude-sonnet-4-6");
expect(msg).toContain("[pod: reason=Error, message=model not supported]");
});
it("flags exit 137 as OOMKilled in pod cause", () => {
const msg = buildPartialRunError(137, "claude-sonnet-4-6", initLine, {
exitCode: 137,
reason: "OOMKilled",
message: null,
signal: null,
});
expect(msg).toContain("[pod: reason=OOMKilled, SIGKILL (commonly OOMKilled)]");
});
it("appends pod cause to content-line message", () => {
const stdout = [initLine, "Error: bad request"].join("\n");
const msg = buildPartialRunError(1, "claude-sonnet-4-6", stdout, {
exitCode: 1,
reason: "Error",
message: null,
signal: null,
});
expect(msg).toBe("Claude exited with code 1: Error: bad request [pod: reason=Error]");
});
it("does not append anything when podState is null (back-compat)", () => {
const msg = buildPartialRunError(1, "claude-sonnet-4-6", initLine, null);
expect(msg).not.toContain("[pod:");
});
});
describe("classifyOrphan", () => {
@@ -362,6 +400,33 @@ describe("describePodTerminatedError", () => {
});
});
describe("describeTruncationCause", () => {
it("annotates exit code 137 as SIGKILL/OOM", () => {
const msg = describeTruncationCause({ exitCode: 137, reason: "OOMKilled", message: "Memory cgroup out of memory", signal: null });
expect(msg).toContain("exit code 137");
expect(msg).toContain("SIGKILL");
expect(msg).toContain("OOMKilled");
expect(msg).toContain("Memory cgroup out of memory");
});
it("annotates exit code 143 as SIGTERM", () => {
const msg = describeTruncationCause({ exitCode: 143, reason: null, message: null, signal: null });
expect(msg).toContain("exit code 143");
expect(msg).toContain("SIGTERM");
});
it("falls back to 'pod state unavailable' when state is null", () => {
const msg = describeTruncationCause(null);
expect(msg).toContain("pod state unavailable");
});
it("emits 'no exit code' when exitCode is null but state exists", () => {
const msg = describeTruncationCause({ exitCode: null, reason: "Error", message: null, signal: null });
expect(msg).toContain("no exit code");
expect(msg).toContain("reason=Error");
});
});
describe("execute: all-invalid agent.id (N4)", () => {
it("returns hard error without creating a Job when agent.id sanitizes to null", async () => {
const logs: string[] = [];
@@ -577,6 +642,28 @@ describe("execute: concurrency guard", () => {
expect(result.errorMessage).toContain("still running for this agent");
});
it("ignores terminating jobs (deletionTimestamp set) and proceeds past the concurrency guard", async () => {
// A job being force-deleted has deletionTimestamp set but no Complete/Failed condition.
// The guard must treat it as terminal so subsequent runs are not blocked.
const terminating: k8s.V1Job = {
metadata: {
name: "terminating-job",
namespace: "paperclip",
labels: { "paperclip.io/agent-id": "agent-abc", "paperclip.io/adapter-type": "claude_k8s" },
deletionTimestamp: new Date(),
},
status: { conditions: [] },
};
mockBatchListJobs.mockResolvedValue({ items: [terminating] });
// Guard passes → next failure is job creation (no further mocks set up)
mockBatchCreateJob.mockRejectedValue(new Error("quota exceeded"));
mockPrepareBundle.mockResolvedValue(makeBundle());
const result = await execute(makeCtx());
// Must NOT be a concurrency error — the guard let us through
expect(result.errorCode).not.toBe("k8s_concurrent_run_blocked");
expect(result.errorCode).toBe("k8s_job_create_failed");
});
it("reattaches to a matching orphan and returns k8s_pod_reattach_failed when pod is missing", async () => {
// Orphan with matching taskId and sessionId → reattach classification → reattachTarget is set
const orphan = makeJob({
@@ -892,6 +979,128 @@ describe("execute: happy path", () => {
expect(result.sessionId).toBe("sess_test123");
});
it("returns k8s_job_deleted_externally when job 404s mid-run and stdout has no result event (FAR-31)", async () => {
// Reproduces the observed scenario: kubectl delete job while Claude is mid-run.
// The log stream captures only partial output (no result event), and the pod
// is also gone so getPodExitCode returns null. The adapter must emit a
// descriptive error instead of the misleading "Claude exited with code -1".
// Log stream writes only the init line — no result event (mid-run deletion)
const partialOutput = JSON.stringify({
type: "system",
subtype: "init",
model: "claude-sonnet-4-6",
session_id: "sess_x",
}) + "\n";
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: Writable) => {
writable.write(partialOutput);
},
);
// Job is gone (404) — matches the kubectl-delete-job-mid-run scenario
mockBatchReadJob.mockRejectedValue(
Object.assign(new Error("Not Found"), { response: { statusCode: 404 } }),
);
// Pod is also gone — getPodExitCode returns null (no pod found)
mockCoreListPods.mockReset();
mockCoreListPods
.mockResolvedValueOnce({
items: [{
metadata: { name: "pod-abc" },
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
}],
})
.mockResolvedValue({ items: [] }); // pod gone → exitCode null
const executePromise = execute(makeCtx());
await vi.advanceTimersByTimeAsync(3_100);
const result = await executePromise;
expect(result.errorCode).toBe("k8s_job_deleted_externally");
expect(result.errorMessage).toMatch(/^K8s Job was deleted externally before Claude could complete \[/);
expect(result.errorMessage).toContain("detected_via=");
expect(result.exitCode).toBeNull();
});
it("returns llm_api_error when assistant event has stop_reason:null and output_tokens:0 (FAR-30)", async () => {
// Reproduces the MiniMax degradation pattern: init event + assistant event with
// stop_reason:null and output_tokens:0, no result event, Claude exits -1.
const emptyResponseOutput = [
JSON.stringify({ type: "system", subtype: "init", model: "MiniMax-M2.7", session_id: "sess_mm" }),
JSON.stringify({
type: "assistant",
session_id: "sess_mm",
message: {
id: "msg_empty",
stop_reason: null,
usage: { input_tokens: 500, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 },
content: [],
},
}),
].join("\n") + "\n";
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: Writable) => {
writable.write(emptyResponseOutput);
},
);
// getPodExitCode: exit code -1 (as reported in the issue)
mockCoreListPods.mockResolvedValue({
items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: -1 } } }] } }],
});
const executePromise = execute(makeCtx());
await vi.advanceTimersByTimeAsync(3_100);
const result = await executePromise;
expect(result.errorCode).toBe("llm_api_error");
expect(result.errorMessage).toContain("stop_reason: null");
expect(result.errorMessage).toContain("output_tokens: 0");
});
it("returns claude_truncated when assistant produced content but no result event arrived (FAR-95)", async () => {
const truncatedOutput = [
JSON.stringify({ type: "system", subtype: "init", model: "claude-opus-4-7", session_id: "sess_trunc" }),
JSON.stringify({
type: "assistant",
session_id: "sess_trunc",
message: {
id: "msg_trunc",
stop_reason: null,
usage: { input_tokens: 1, output_tokens: 35, cache_creation_input_tokens: 523, cache_read_input_tokens: 46295 },
content: [{ type: "tool_use", id: "tool_1", name: "Bash", input: { command: "echo hi" } }],
},
}),
JSON.stringify({
type: "user",
message: { role: "user", content: [{ tool_use_id: "tool_1", type: "tool_result", content: "hi", is_error: false }] },
}),
].join("\n") + "\n";
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: Writable) => {
writable.write(truncatedOutput);
},
);
mockCoreListPods.mockResolvedValue({
items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 137, reason: "OOMKilled", message: "Memory cgroup out of memory" } } }] } }],
});
const executePromise = execute(makeCtx());
await vi.advanceTimersByTimeAsync(3_100);
const result = await executePromise;
expect(result.errorCode).toBe("claude_truncated");
expect(result.errorMessage).toContain("truncated mid-stream");
expect(result.errorMessage).toContain("claude-opus-4-7");
expect(result.errorMessage).toContain("exit code 137");
expect(result.errorMessage).toContain("SIGKILL");
expect(result.errorMessage).toContain("OOMKilled");
expect(result.errorMessage).toContain("Memory cgroup out of memory");
});
it("reconnects log stream and logs status when job completion takes > 3s", async () => {
// Make waitForJobCompletion take 4s so the 3s stream reconnect fires first.
// timeoutSec=4, graceSec=0 → completionTimeoutMs=4000.
@@ -1065,6 +1274,62 @@ describe("execute: happy path", () => {
expect(result.exitCode).toBe(0);
});
it("logs bundled skill names and count (FAR-36 diagnostic)", async () => {
const skills = [
{ key: "safety--abc123", runtimeName: "safety--abc123", desired: true, managed: true, required: true, state: "configured" as const },
{ key: "sdlc--def456", runtimeName: "sdlc--def456", desired: true, managed: true, required: true, state: "configured" as const },
];
mockReadSkillEntries.mockResolvedValue(skills);
const logs: Array<{ stream: string; msg: string }> = [];
const onLog = vi.fn().mockImplementation(async (stream: string, msg: string) => { logs.push({ stream, msg }); });
const executePromise = execute(makeCtx({ onLog } as Partial<AdapterExecutionContext>));
await vi.advanceTimersByTimeAsync(3_100);
await executePromise;
const skillLine = logs.find((l) => l.msg.includes("Skills bundled"));
expect(skillLine).toBeDefined();
expect(skillLine?.stream).toBe("stdout");
expect(skillLine?.msg).toContain("(2):");
expect(skillLine?.msg).toContain("safety--abc123");
expect(skillLine?.msg).toContain("sdlc--def456");
});
it("logs Skills bundled (0): none when no skills are configured (FAR-36 diagnostic)", async () => {
mockReadSkillEntries.mockResolvedValue([]);
const logs: Array<{ stream: string; msg: string }> = [];
const onLog = vi.fn().mockImplementation(async (stream: string, msg: string) => { logs.push({ stream, msg }); });
const executePromise = execute(makeCtx({ onLog } as Partial<AdapterExecutionContext>));
await vi.advanceTimersByTimeAsync(3_100);
await executePromise;
const skillLine = logs.find((l) => l.msg.includes("Skills bundled"));
expect(skillLine).toBeDefined();
expect(skillLine?.msg).toContain("(0): none");
});
it("includes skill count in onMeta commandNotes (FAR-36 diagnostic)", async () => {
const skills = [
{ key: "safety--abc123", runtimeName: "safety--abc123", desired: true, managed: true, required: true, state: "configured" as const },
];
mockReadSkillEntries.mockResolvedValue(skills);
const onMeta = vi.fn().mockResolvedValue(undefined);
const executePromise = execute(makeCtx({ onMeta } as Partial<AdapterExecutionContext>));
await vi.advanceTimersByTimeAsync(3_100);
await executePromise;
expect(onMeta).toHaveBeenCalled();
const notes: string[] = onMeta.mock.calls[0][0].commandNotes;
const skillsNote = notes.find((n: string) => n.startsWith("Skills"));
expect(skillsNote).toBeDefined();
expect(skillsNote).toContain("(1):");
expect(skillsNote).toContain("safety--abc123");
});
});
// ─── execute: waitForPod edge cases ──────────────────────────────────────────
@@ -1247,6 +1512,54 @@ describe("execute: log-stream-exit grace period (FAR-23)", () => {
// (grace did not fire, real completion arrived)
expect(result.errorMessage).toBeNull();
});
it("does NOT fire grace when stream drops mid-output and reconnects with more output (FAR-107)", async () => {
// Reproduces Nancy / Privileged Escalation symptom: the K8s log API drops
// the streaming connection mid-run; streamPodLogs reconnects and the
// container is still producing. Before the fix, the grace timer was
// armed on first stream exit and fired 30s later regardless of whether
// output had resumed, surfacing claude_truncated even though the pod was
// still phase=Running.
let attemptIndex = 0;
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
if (attemptIndex === 0) {
// Stream a partial init line then "drop" the connection without a
// result event — this is the transient API disconnect.
writable.write(JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_test123" }) + "\n");
attemptIndex++;
return;
}
// Reconnect produces the rest of the stream including the result event.
writable.write(CLAUDE_HAPPY_OUTPUT);
},
);
// Job condition arrives only after the reconnect produces output, well
// beyond the 30s grace window; the old code would have grace-fired at
// ~30s and treated the run as truncated.
let readJobCalls = 0;
mockBatchReadJob.mockImplementation(async () => {
readJobCalls++;
// Stay non-terminal until the reconnect has had time to run and the
// grace window has fully elapsed since the FIRST disconnect.
if (readJobCalls < 25) return { status: { conditions: [] } };
return { status: { conditions: [{ type: "Complete", status: "True" }] } };
});
const executePromise = execute(makeCtx());
// t=3000: first reconnect sleep fires → second streamPodLogsOnce attempt
await vi.advanceTimersByTimeAsync(3_100);
// Drive past the old (buggy) 30s grace boundary without firing real completion
await vi.advanceTimersByTimeAsync(35_000);
// Then let the Job's Complete condition land
await vi.advanceTimersByTimeAsync(20_000);
const result = await executePromise;
// Run completed normally — grace must not have falsely truncated it.
expect(result.exitCode).toBe(0);
expect(result.errorCode).toBeUndefined();
expect(result.sessionId).toBe("sess_test123");
}, 80_000);
});
// ─── execute: concurrency guard — multiple orphan sorting ────────────────────
@@ -1296,16 +1609,24 @@ describe("shouldAbortForCancellation", () => {
expect(shouldAbortForCancellation("cancelled")).toBe(true);
});
it("returns true when status is 'failed'", () => {
expect(shouldAbortForCancellation("failed")).toBe(true);
it("returns true when status is 'cancelling'", () => {
expect(shouldAbortForCancellation("cancelling")).toBe(true);
});
it("returns true when status is 'completed'", () => {
expect(shouldAbortForCancellation("completed")).toBe(true);
// FAR-107: terminal-but-not-cancelled statuses MUST NOT trigger Job deletion.
// The previous "anything but running" guard caused k8s_job_deleted_externally
// false positives for in-flight runs whenever the API briefly reported a
// transient/stale status.
it("returns false for non-cancellation terminal statuses (FAR-107)", () => {
expect(shouldAbortForCancellation("succeeded")).toBe(false);
expect(shouldAbortForCancellation("failed")).toBe(false);
expect(shouldAbortForCancellation("completed")).toBe(false);
});
it("returns true for any non-running non-empty string", () => {
expect(shouldAbortForCancellation("unknown")).toBe(true);
it("returns false for unknown statuses (FAR-107)", () => {
expect(shouldAbortForCancellation("unknown")).toBe(false);
expect(shouldAbortForCancellation("queued")).toBe(false);
expect(shouldAbortForCancellation("pending")).toBe(false);
});
});
@@ -1506,7 +1827,7 @@ describe("execute: SIGTERM handler best-effort cleanup", () => {
vi.useRealTimers();
});
it("deletes the active Job when SIGTERM fires during execution", async () => {
it("does NOT delete active Jobs on SIGTERM — leaves them for orphan reattach (FAR-107)", async () => {
// Mock process.kill to prevent the test process from actually being killed.
const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true);
@@ -1517,17 +1838,19 @@ describe("execute: SIGTERM handler best-effort cleanup", () => {
// Flush microtasks through the async setup chain: getSelfPodInfo, listJobs,
// readSkillEntries, prepareBundle, createJob, onLog, activeJobs.add(), and
// ensureSigtermHandler() all complete before the try block enters streaming.
// 30 rounds is more than enough for the ~7 sequential await points.
for (let i = 0; i < 30; i++) await Promise.resolve();
// Emit SIGTERM — the process.once handler fires synchronously and kicks off
// async cleanup (deleteNamespacedJob). The mock resolves immediately.
// Reset deleteJob spy after setup so we can detect any SIGTERM-driven calls.
mockBatchDeleteJob.mockClear();
// Emit SIGTERM — the handler must re-raise to the default handler without
// touching the K8s Job. Deleting the Job here would surface as
// k8s_job_deleted_externally in the in-flight run (FAR-107).
process.emit("SIGTERM");
// Flush microtasks for deleteJob to resolve and the .then(process.kill) to run.
for (let i = 0; i < 10; i++) await Promise.resolve();
expect(mockBatchDeleteJob).toHaveBeenCalled();
expect(mockBatchDeleteJob).not.toHaveBeenCalled();
expect(killSpy).toHaveBeenCalledWith(process.pid, "SIGTERM");
killSpy.mockRestore();
@@ -1535,3 +1858,91 @@ describe("execute: SIGTERM handler best-effort cleanup", () => {
// so we do not need to settle executePromise.
});
});
// ─── execute: per-agent creation mutex (FAR-29 TOCTOU fix) ───────────────────
//
// Verifies that two concurrent execute() calls for the same agent cannot both
// enter the listNamespacedJob → createNamespacedJob sequence simultaneously.
// Without the per-agent mutex, both would pass the concurrency guard before
// either job appears in the other's list query.
describe("execute: per-agent creation mutex prevents TOCTOU race", () => {
beforeEach(() => {
vi.resetAllMocks();
mockReadSkillEntries.mockResolvedValue([]);
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
mockPrepareBundle.mockResolvedValue(makeBundle());
// Make job creation fail so the guard+create phase exits quickly and
// releases the mutex without needing to mock the full streaming path.
mockBatchCreateJob.mockRejectedValue(new Error("mock: create not configured"));
mockBatchDeleteJob.mockResolvedValue({});
mockCoreDeleteSecret.mockResolvedValue({});
});
it("serializes guard phases for the same agent: call-2 waits until call-1 exits guard+create", async () => {
const listCalls: string[] = [];
let resolveFirstList!: (v: { items: [] }) => void;
mockBatchListJobs
.mockImplementationOnce(() => {
listCalls.push("call-1");
return new Promise<{ items: [] }>((resolve) => { resolveFirstList = resolve; });
})
.mockImplementation(() => {
listCalls.push("call-2");
return Promise.resolve({ items: [] });
});
const p1 = execute(makeCtx({ runId: "run-1" }));
const p2 = execute(makeCtx({ runId: "run-2" }));
// Drain microtasks: call-1 should be suspended in listNamespacedJob while
// call-2 waits behind the per-agent mutex, not yet calling list.
for (let i = 0; i < 20; i++) await Promise.resolve();
expect(listCalls).toEqual(["call-1"]);
// Let call-1's guard resolve (no running jobs). It will proceed to job
// creation, fail (mock rejects), and release the mutex in finally.
resolveFirstList({ items: [] });
await Promise.allSettled([p1, p2]);
// call-2 must have listed, and only AFTER call-1's guard resolved.
// The exact order: call-1 listed → call-1 list resolved → call-2 listed.
expect(listCalls).toEqual(["call-1", "call-2"]);
});
it("does not serialize guard phases for different agents", async () => {
const listCalls: string[] = [];
let resolveAgentAList!: (v: { items: [] }) => void;
// Agent A's list is artificially slow. Agent B (different id) should
// proceed immediately without waiting — the mutex is keyed by agent id.
mockBatchListJobs
.mockImplementationOnce(() => {
listCalls.push("A");
return new Promise<{ items: [] }>((resolve) => { resolveAgentAList = resolve; });
})
.mockImplementation(() => {
listCalls.push("B");
return Promise.resolve({ items: [] });
});
const ctxA = makeCtx({ runId: "run-A" });
const ctxB = makeCtx({
runId: "run-B",
agent: { id: "agent-other", companyId: "co1", name: "Other Agent", adapterType: "claude_k8s", adapterConfig: {} },
} as Partial<AdapterExecutionContext>);
const pA = execute(ctxA);
const pB = execute(ctxB);
// Drain microtasks — B should have called list even though A is still
// suspended, because they use separate mutex slots.
for (let i = 0; i < 20; i++) await Promise.resolve();
expect(listCalls).toContain("B");
// Let A complete so the promises settle cleanly.
resolveAgentAList({ items: [] });
await Promise.allSettled([pA, pB]);
});
});
+434 -89
View File
@@ -48,36 +48,30 @@ interface ActiveJobRef {
kubeconfigPath?: string;
}
const activeJobs = new Set<ActiveJobRef>();
// Per-agent serialization lock: prevents the TOCTOU race (FAR-29) where two
// concurrent execute() calls for the same agent both pass the list-then-create
// guard and create K8s Jobs simultaneously on the shared PVC.
const agentCreationMutex = new Map<string, Promise<void>>();
let sigtermHandlerRegistered = false;
function ensureSigtermHandler(): void {
if (sigtermHandlerRegistered) return;
sigtermHandlerRegistered = true;
process.once("SIGTERM", () => {
const jobs = [...activeJobs];
void Promise.allSettled(
jobs.map(async (ref) => {
try {
const batchApi = getBatchApi(ref.kubeconfigPath);
await batchApi.deleteNamespacedJob({
name: ref.jobName,
namespace: ref.namespace,
body: { propagationPolicy: "Background" },
});
} catch { /* best-effort */ }
if (ref.promptSecretName && ref.promptSecretNamespace) {
try {
const coreApi = getCoreApi(ref.kubeconfigPath);
await coreApi.deleteNamespacedSecret({
name: ref.promptSecretName,
namespace: ref.promptSecretNamespace,
});
} catch { /* best-effort */ }
}
}),
).then(() => {
process.kill(process.pid, "SIGTERM");
});
// Do NOT delete active K8s Jobs on SIGTERM (FAR-107). Paperclip itself
// receives SIGTERM during rolling deploys, evictions, scale-down, etc.
// Deleting the Jobs we own there causes the in-flight heartbeat to surface
// a false-positive `k8s_job_deleted_externally` error and tears down work
// the user expected to keep running.
//
// The correct behaviour with `reattachOrphanedJobs=true` (default) is to
// leave the Jobs alive: the next paperclip process discovers them via the
// orphan-classification path and reattaches their log streams. When
// `reattachOrphanedJobs=false` the operator explicitly opted into manual
// cleanup and should not have us auto-deleting either. The owning Job's
// ownerReference (FAR-15) keeps the prompt Secret tied to the Job, so
// both survive together and TTL cleans them up after natural completion.
process.kill(process.pid, "SIGTERM");
});
}
@@ -96,34 +90,32 @@ export function isK8s404(err: unknown): boolean {
}
/**
* Returns true when the heartbeat-run status indicates the run is no longer
* active and the K8s Job should be cancelled.
* Returns true when the heartbeat-run status indicates the run was explicitly
* cancelled and the K8s Job must be torn down.
*
* Only `cancelled` / `cancelling` qualify. Treating any non-`running` status
* as cancellation (the previous behaviour) produced spurious
* k8s_job_deleted_externally errors for in-flight runs whenever the API
* briefly reported a transient or stale status — Nancy's runs at
* Privileged Escalation hit this without anyone actually cancelling them
* (FAR-107). Other terminal statuses (`succeeded`/`failed`/`completed`)
* are unreachable in practice while the adapter is still executing
* (the adapter's own return is what flips them) and even if observed,
* they do not warrant our deleting a Job that may still be doing work.
* Exported for unit tests.
*/
export function shouldAbortForCancellation(runStatus: string | undefined): boolean {
if (!runStatus) return false;
return runStatus !== "running";
return runStatus === "cancelled" || runStatus === "cancelling";
}
/**
* Build the error message when Claude's stdout contains no result event.
* Skips system/init event lines so the UI doesn't display the raw init JSON.
* Exported for unit tests.
* Returns the first non-JSON/plain-text line in stdout, treating JSON objects
* with a "type" field as protocol artefacts and skipping them.
* Used by buildPartialRunError to detect init-only runs.
*/
export function buildPartialRunError(
exitCode: number | null,
model: string,
stdout: string,
): string {
if (exitCode === 0) return "Failed to parse Claude JSON output";
// Walk stdout lines and skip every structured streaming event (any JSON
// object that carries a non-empty "type" field: system, assistant, user,
// rate_limit_event, result, …). All of these are protocol artefacts and
// produce confusing raw-JSON blobs when surfaced verbatim as an error
// message. Only plain-text lines (non-JSON, or JSON without a type field)
// are treated as human-readable content worth including in the error.
const firstContentLine = stdout.split(/\r?\n/)
function firstContentLine(stdout: string): string {
return stdout.split(/\r?\n/)
.map((l) => l.trim())
.find((l) => {
if (!l) return false;
@@ -138,19 +130,82 @@ export function buildPartialRunError(
}
return true;
}) ?? "";
}
/**
* Returns true when stdout contains only init/system/assistant events from the
* given model with no human-readable content lines. Used to detect init-only
* non-zero-exit runs that should be classified as claude_init_failed rather than
* the generic "Claude exited with code N" message.
*/
function isInitOnlyRun(model: string, stdout: string): boolean {
if (!stdout.trim() || !model) return false;
const content = firstContentLine(stdout);
if (content) return false;
// Check that at least the init event for this model was seen
const hasModelInit = stdout.includes(`"model":"${model}"`) || stdout.includes(`"model":"${model.replace(/-/g, "_")}"`);
return hasModelInit;
}
/**
* Append the pod's terminated-state detail (reason/message/signal) to a
* partial-run error message when available. Exit code is already in the
* caller-supplied message, so we only append fields that add new signal —
* specifically reason (e.g. OOMKilled, Error, ContainerCannotRun), message
* (kubelet diagnostic text), and signal. Saves the operator a kubectl trip.
*/
function appendPodCause(message: string, state: PodTerminatedState | null): string {
if (!state) return message;
const parts: string[] = [];
if (state.reason) parts.push(`reason=${state.reason}`);
if (state.message) parts.push(`message=${state.message}`);
if (state.signal !== null) parts.push(`signal=${state.signal}`);
if (state.exitCode === 137) parts.push("SIGKILL (commonly OOMKilled)");
if (parts.length === 0) return message;
return `${message} [pod: ${parts.join(", ")}]`;
}
/**
* Build the error message when Claude's stdout contains no result event.
* Skips system/init event lines so the UI doesn't display the raw init JSON.
* When `podState` is provided, appends the K8s container terminated reason/
* message so failures self-explain without requiring `kubectl`.
* Exported for unit tests.
*/
export function buildPartialRunError(
exitCode: number | null,
model: string,
stdout: string,
podState: PodTerminatedState | null = null,
): string {
if (exitCode === 0) return "Failed to parse Claude JSON output";
// If the stream contained only structured events with no plain-text output,
// surface the model name so the operator can diagnose missing credentials
// or unsupported/misconfigured model.
const initOnlyOutput = stdout.trim() !== "" && model !== "" && !firstContentLine;
if (initOnlyOutput) {
const modelHint = model ? ` (model: ${model})` : "";
return `Claude started but did not produce a result${modelHint} — check API credentials, model support, and adapter config`;
const contentLine = firstContentLine(stdout);
if (contentLine) {
return appendPodCause(`Claude exited with code ${exitCode ?? -1}: ${contentLine}`, podState);
}
return firstContentLine
? `Claude exited with code ${exitCode ?? -1}: ${firstContentLine}`
: `Claude exited with code ${exitCode ?? -1}`;
if (isInitOnlyRun(model, stdout) && (exitCode ?? 0) !== 0) {
const modelHint = model ? ` (model: ${model})` : "";
return appendPodCause(
`Claude exited immediately after init${modelHint} (exit code ${exitCode ?? -1}) — the model may be unsupported or the session may have been rejected before producing output`,
podState,
);
}
const initOnlyOutput = stdout.trim() !== "" && model !== "";
if (initOnlyOutput) {
const modelHint = model ? ` (model: ${model})` : "";
return appendPodCause(
`Claude started but did not produce a result${modelHint} — check API credentials, model support, and adapter config`,
podState,
);
}
return appendPodCause(`Claude exited with code ${exitCode ?? -1}`, podState);
}
export type OrphanClassification =
@@ -426,10 +481,18 @@ export async function streamPodLogsOnce(
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*
* onFirstStreamExit is called the first time streamPodLogsOnce returns
* (container has exited or stream disconnected). Used by execute() to
* start the LOG_EXIT_COMPLETION_GRACE_MS grace timer (FAR-23) without
* waiting for all reconnects to exhaust.
* `activity` tracks stream liveness so execute()'s grace timer can
* distinguish a transient K8s log-API reconnect from a real container
* exit (FAR-107). Two signals:
* - `streamHasExited` becomes true on the first return from
* streamPodLogsOnce. Until then we are still in the warm-up window
* and waitForJobCompletion is the authoritative signal — grace must
* not fire.
* - `lastActiveAt` advances every time a streamPodLogsOnce attempt
* returns non-empty output (the container is still producing).
* The grace timer fires only once GRACE_MS have passed since the
* last chunk, so output that resumes after a transient drop keeps
* the run alive.
*/
async function streamPodLogs(
namespace: string,
@@ -438,7 +501,7 @@ async function streamPodLogs(
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
dedup?: LogLineDedupFilter,
onFirstStreamExit?: () => void,
activity?: { lastActiveAt: number; streamHasExited: boolean },
): Promise<string> {
const allChunks: string[] = [];
let attempt = 0;
@@ -469,14 +532,15 @@ async function streamPodLogs(
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
// Signal first stream exit immediately so the grace-period timer in
// execute() can start without waiting for all reconnects to complete.
if (attempt === 0) onFirstStreamExit?.();
if (activity) activity.streamHasExited = true;
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
// so any log lines in `result` were received up to this moment).
lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Refresh stream liveness so the grace timer in execute() does not
// fire while output is still flowing through reconnects (FAR-107).
if (activity) activity.lastActiveAt = Date.now();
} else if (attempt === 0) {
// First attempt returned nothing — update timestamp so reconnect
// window stays reasonable.
@@ -527,11 +591,14 @@ async function readPodLogs(
* is treated as a soft terminal: succeeded=false, timedOut=false, jobGone=true.
* The caller should log this and fall through to stdout parsing.
*/
type JobConditionSnapshot = { type?: string; status?: string; reason?: string; message?: string };
async function waitForJobCompletion(
namespace: string,
jobName: string,
timeoutMs: number,
kubeconfigPath?: string,
observer?: { lastConditions: JobConditionSnapshot[] | null; pollCount: number },
): Promise<{ succeeded: boolean; timedOut: boolean; jobGone?: boolean }> {
const batchApi = getBatchApi(kubeconfigPath);
const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : 0;
@@ -550,6 +617,12 @@ async function waitForJobCompletion(
throw err;
}
const conditions = job.status?.conditions ?? [];
if (observer) {
observer.pollCount += 1;
observer.lastConditions = conditions.map((c) => ({
type: c.type, status: c.status, reason: c.reason, message: c.message,
}));
}
const complete = conditions.find((c) => c.type === "Complete" && c.status === "True");
if (complete) return { succeeded: true, timedOut: false };
@@ -570,16 +643,130 @@ async function waitForJobCompletion(
* Get the exit code from the Job's pod.
*/
async function getPodExitCode(namespace: string, jobName: string, kubeconfigPath?: string): Promise<number | null> {
const state = await getPodTerminatedState(namespace, jobName, kubeconfigPath);
return state?.exitCode ?? null;
}
/**
* Get the claude container's terminated state (exit code, reason, message,
* signal) from the Job's pod. Returns null if the pod or container is gone.
* Used by the no-result error path to explain *why* a run was truncated.
*/
export interface PodTerminatedState {
exitCode: number | null;
reason: string | null;
message: string | null;
signal: number | null;
}
/**
* Result of a pod-state lookup. `state` is the terminated state when available;
* `phase` and `podMissing` give the caller enough context to render an honest
* truncation-cause message instead of guessing "likely deleted" (FAR-107).
*/
export interface PodLookupResult {
state: PodTerminatedState | null;
phase: string | null;
podMissing: boolean;
}
async function lookupPodState(
namespace: string,
jobName: string,
kubeconfigPath?: string,
): Promise<PodLookupResult> {
const coreApi = getCoreApi(kubeconfigPath);
const podList = await coreApi.listNamespacedPod({
namespace,
labelSelector: `job-name=${jobName}`,
});
const pod = podList.items[0];
if (!pod) return null;
if (!pod) return { state: null, phase: null, podMissing: true };
const phase = pod.status?.phase ?? null;
const containerStatus = pod.status?.containerStatuses?.find((s) => s.name === "claude");
return containerStatus?.state?.terminated?.exitCode ?? null;
const terminated = containerStatus?.state?.terminated;
if (!terminated) return { state: null, phase, podMissing: false };
return {
state: {
exitCode: terminated.exitCode ?? null,
reason: terminated.reason ?? null,
message: (terminated.message ?? "").trim() || null,
signal: terminated.signal ?? null,
},
phase,
podMissing: false,
};
}
/**
* Read the claude container's terminated state, retrying briefly when the pod
* exists in a terminal phase but kubelet has not yet propagated the
* containerStatuses[].state.terminated field. Without this retry, fast
* truncated-stream exits surface as "pod state unavailable" (FAR-107) and
* mask the real exit code / OOMKilled / SIGTERM cause.
*/
async function getPodLookupWithRetry(
namespace: string,
jobName: string,
kubeconfigPath?: string,
attempts = 4,
delayMs = 500,
): Promise<PodLookupResult> {
let last: PodLookupResult = { state: null, phase: null, podMissing: true };
for (let i = 0; i < attempts; i++) {
last = await lookupPodState(namespace, jobName, kubeconfigPath);
if (last.state) return last;
if (last.podMissing) return last;
// Pod exists but no terminated state. If it is in a terminal phase the
// containerStatuses update is in flight — wait briefly and retry. If it
// is still Running/Pending, retrying is unlikely to help, so bail.
if (last.phase !== "Succeeded" && last.phase !== "Failed") return last;
if (i < attempts - 1) await new Promise((r) => setTimeout(r, delayMs));
}
return last;
}
async function getPodTerminatedState(
namespace: string,
jobName: string,
kubeconfigPath?: string,
): Promise<PodTerminatedState | null> {
return (await lookupPodState(namespace, jobName, kubeconfigPath)).state;
}
/**
* Format a human-readable explanation for a truncated run, including the
* pod's claude-container terminated state when available. Exit code 137
* is annotated as SIGKILL/OOM since that is the most common cause.
* Exported for unit tests.
*/
export function describeTruncationCause(
state: PodTerminatedState | null,
lookup?: PodLookupResult,
): string {
if (!state) {
if (lookup?.podMissing) {
return "pod is gone — Job pod was removed (eviction, preemption, or external delete) before exit could be read";
}
if (lookup && !lookup.podMissing) {
const phaseHint = lookup.phase ? `pod phase=${lookup.phase}` : "pod present";
return `container terminated state not yet observable (${phaseHint}) — kubelet status update did not land within retry window; exit cause unknown`;
}
return "pod state unavailable — exit cause unknown";
}
const parts: string[] = [];
if (state.exitCode !== null) {
parts.push(`exit code ${state.exitCode}`);
if (state.exitCode === 137) parts.push("SIGKILL (commonly OOMKilled)");
else if (state.exitCode === 143) parts.push("SIGTERM");
} else {
parts.push("no exit code");
}
if (state.signal !== null) parts.push(`signal ${state.signal}`);
if (state.reason) parts.push(`reason=${state.reason}`);
if (state.message) parts.push(`message=${state.message}`);
return parts.join(", ");
}
/**
@@ -633,17 +820,40 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
errorCode: "k8s_agent_id_invalid",
};
}
// FAR-29: serialize guard+create per agent within this process to prevent the
// TOCTOU race where two concurrent execute() calls both pass the list-then-create
// guard and create K8s Jobs simultaneously on the shared PVC.
const _prevCreation = agentCreationMutex.get(agentId) ?? Promise.resolve();
let _releaseMutex: () => void = () => {};
const _mutexSlot = new Promise<void>((resolve) => { _releaseMutex = resolve; });
// Chain: next caller for this agent waits on _mutexSlot, which resolves in finally.
agentCreationMutex.set(agentId, _prevCreation.then(() => _mutexSlot, () => _mutexSlot));
// Wait for any prior execute() call to finish its guard+create phase.
await _prevCreation.catch(() => {});
// Hoist declarations used in both the guard+create phase and the log-streaming
// section so the mutex try/finally can be added without a large re-indent.
let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null;
// eslint-disable-next-line prefer-const
let jobName!: string;
// eslint-disable-next-line prefer-const
let namespace!: string;
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
// runtimeSessionParams and currentSessionIdRaw are also used after the
// try block (in the result-parsing section) so hoist them here.
const runtimeSessionParams = parseObject(runtime.sessionParams);
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const coreApi = getCoreApi(kubeconfigPath);
const batchApi = getBatchApi(kubeconfigPath);
try {
const selfPod = await getSelfPodInfo(kubeconfigPath);
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
const runtimeSessionParams = parseObject(runtime.sessionParams);
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null;
const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, "");
const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null;
let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null;
try {
const batchApi = getBatchApi(kubeconfigPath);
const existing = await batchApi.listNamespacedJob({
namespace: guardNamespace,
labelSelector: `paperclip.io/agent-id=${sanitizedAgentId},paperclip.io/adapter-type=claude_k8s`,
@@ -763,19 +973,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
const coreApi = getCoreApi(kubeconfigPath);
const batchApi = getBatchApi(kubeconfigPath);
let jobName: string;
let namespace: string;
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
// Prepare the prompt bundle (skills + instructions) on the server filesystem.
// The K8s Job pod mounts the same PVC at /paperclip, so bundle paths written
// here are accessible inside the pod at the identical absolute path.
const skillEntries = await readPaperclipRuntimeSkillEntries(config, import.meta.dirname ?? __dirname);
const desiredSkillNames = new Set(resolvePaperclipDesiredSkillNames(config, skillEntries));
const desiredSkills = skillEntries.filter((e) => desiredSkillNames.has(e.key));
const skillSummary = desiredSkills.length > 0 ? desiredSkills.map((s) => s.runtimeName ?? s.key).join(", ") : "none";
await onLog("stdout", `[paperclip] Skills bundled (${desiredSkills.length}): ${skillSummary}\n`);
const instructionsFilePath = asString(config.instructionsFilePath, "").trim();
const instructionsFileDir = instructionsFilePath ? `${path.dirname(instructionsFilePath)}/` : "";
let instructionsContents: string | null = null;
@@ -872,6 +1077,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
`Skills (${desiredSkills.length}): ${skillSummary}`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
@@ -967,14 +1173,34 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
}
} finally {
// Release the per-agent creation mutex so the next queued execute() call
// can proceed with its guard+create phase (FAR-29).
_releaseMutex();
}
let stdout = "";
let exitCode: number | null = null;
let podTerminatedState: PodTerminatedState | null = null;
let jobTimedOut = false;
let keepaliveTimer: ReturnType<typeof setInterval> | null = null;
// Set when we return a mismatch error so the finally block knows not to
// delete a job that is still alive and the UI is waiting on.
let skipCleanup = false;
// Set when the job disappeared (404) or grace-timer fired before we saw a
// terminal condition — used to emit a clearer error when stdout parsing fails.
let jobDeletedExternally = false;
// Forensics for k8s_job_deleted_externally — captures which of the three
// detection paths observed the 404, the last successful Job-condition read
// before deletion, and timing. Surfaced in the error message so the next
// occurrence is self-diagnosing instead of opaque (FAR-107).
let jobGoneDetectionPath: string | null = null;
let jobGoneAt: number | null = null;
const jobObserver: { lastConditions: JobConditionSnapshot[] | null; pollCount: number } = {
lastConditions: null,
pollCount: 0,
};
let podRunningAt: number | null = null;
const activeJobRef: ActiveJobRef = {
namespace,
@@ -1007,6 +1233,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
}
podRunningAt = Date.now();
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
@@ -1122,17 +1349,16 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return onLog(stream, chunk);
};
// Track when the log stream first exits so the grace-period can fire
// if the K8s Job condition lags behind container exit (FAR-23).
// Set via onFirstStreamExit callback (called after attempt=0 returns)
// rather than in .then() of streamPodLogs, which would create a
// deadlock: streamPodLogs only resolves after stopSignal is set, but
// stopSignal is set by the grace timer which needs logExitTime to be
// non-null.
let logExitTime: number | null = null;
// Track stream liveness so the grace timer below only fires when output
// has actually stopped — not on a transient K8s log-API reconnect that
// streamPodLogs heals on its own (FAR-107).
const streamActivity: { lastActiveAt: number; streamHasExited: boolean } = {
lastActiveAt: Date.now(),
streamHasExited: false,
};
const trackedLogStream = streamPodLogs(
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
() => { logExitTime = Date.now(); },
streamActivity,
);
// completionWithGrace races waitForJobCompletion against a grace timer
@@ -1142,7 +1368,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// while streamPodLogs reconnects, holding execute() open for minutes.
// logStopSignal.stopped is set on every settled path (fulfilled, rejected,
// or grace) so streamPodLogs stops reconnecting promptly.
type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean };
type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean; gracePeriodFired?: boolean };
let gracePoller: ReturnType<typeof setInterval> | null = null;
const completionWithGrace = new Promise<CompletionResult>((resolve, reject) => {
let settled = false;
@@ -1160,11 +1386,47 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
logStopSignal.stopped = true;
reject(err);
};
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr);
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr);
gracePoller = setInterval(() => {
if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) {
void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, jobGone: true });
// Only consider grace once the stream has exited at least once.
// Until then we are still in the warm-up window and
// waitForJobCompletion is the authoritative signal. Once the
// stream has exited, fire only after GRACE_MS of inactivity
// measured against the last received chunk — output that resumes
// through a reconnect resets the clock so transient drops do not
// truncate live runs (FAR-107).
if (
streamActivity.streamHasExited &&
Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS
) {
// Stop the grace poller immediately so we don't double-fire while the
// verification read below is in flight.
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
// The log stream exiting only means the container stopped producing
// output — it does NOT prove the Job was deleted. Verify Job
// presence with a one-shot read so we can distinguish:
// (a) Job 404 → truly gone (TTL or external deletion)
// (b) Job still present → K8s condition propagation lag (FAR-23)
// Without this check we mis-classify (b) as "deleted externally" and
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
void (async () => {
try {
await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace });
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
} catch (err: unknown) {
if (isK8s404(err)) {
jobGoneDetectionPath = "grace-period-verify-404";
jobGoneAt = Date.now();
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago and Job ${jobName} is gone (TTL or external deletion) — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, jobGone: true });
} else {
// K8s API hiccup — bail out without claiming external deletion.
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago; Job state unverifiable (${err instanceof Error ? err.message : String(err)}) — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
}
}
})();
}
}, 1_000);
});
@@ -1231,6 +1493,11 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// condition. The container must have exited first (TTL only fires after
// completion), so log streaming has captured the full output — continue
// to stdout parsing rather than returning an error.
jobDeletedExternally = true;
if (!jobGoneDetectionPath) {
jobGoneDetectionPath = "completion-poll-404";
jobGoneAt = Date.now();
}
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
}
} else {
@@ -1239,7 +1506,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// (60s) so we don't hang the heartbeat indefinitely if the K8s API is degraded.
jobTimedOut = false;
const RECHECK_TIMEOUT_MS = 60_000;
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath);
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath, jobObserver);
if (actualState.timedOut) {
// Re-check itself timed out — the job may still be running.
// Return an error so the UI knows the run is not done.
@@ -1247,6 +1514,11 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
} else if (actualState.jobGone) {
// Job was deleted before we could confirm terminal state — same as the
// fulfilled+jobGone case above: proceed with captured output.
jobDeletedExternally = true;
if (!jobGoneDetectionPath) {
jobGoneDetectionPath = "recheck-poll-404";
jobGoneAt = Date.now();
}
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
} else if (!actualState.succeeded) {
// Job still not terminal — the completion error was likely transient.
@@ -1264,7 +1536,8 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
}
exitCode = await getPodExitCode(namespace, jobName, kubeconfigPath);
podTerminatedState = await getPodTerminatedState(namespace, jobName, kubeconfigPath);
exitCode = podTerminatedState?.exitCode ?? null;
} finally {
if (keepaliveTimer) clearInterval(keepaliveTimer);
activeJobs.delete(activeJobRef);
@@ -1314,11 +1587,83 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
if (!parsed) {
if (jobDeletedExternally && exitCode === null) {
// Forensic context (FAR-107): users sometimes see this error when nothing
// actually deleted the Job manually. Surface enough state in the message
// to distinguish self-delete (SIGTERM/cancel), TTL-after-completion, and
// genuine external deletion without needing cluster shell access.
const detailParts: string[] = [];
if (jobGoneDetectionPath) detailParts.push(`detected_via=${jobGoneDetectionPath}`);
detailParts.push(`job=${jobName}`);
detailParts.push(`ns=${namespace}`);
if (podRunningAt !== null && jobGoneAt !== null) {
detailParts.push(`elapsed_since_pod_running=${Math.round((jobGoneAt - podRunningAt) / 1000)}s`);
}
detailParts.push(`completion_polls=${jobObserver.pollCount}`);
const lastConds = jobObserver.lastConditions;
if (lastConds && lastConds.length > 0) {
const summary = lastConds
.map((c) => `${c.type}=${c.status}${c.reason ? `(${c.reason})` : ""}`)
.join(",");
detailParts.push(`last_job_conditions=[${summary}]`);
} else {
detailParts.push("last_job_conditions=none_observed");
}
detailParts.push(`stdout_bytes=${stdout.length}`);
const stdoutLines = stdout.split("\n").filter((l) => l.trim()).length;
detailParts.push(`stdout_nonempty_lines=${stdoutLines}`);
return {
exitCode,
signal: null,
timedOut: false,
errorMessage: `K8s Job was deleted externally before Claude could complete [${detailParts.join(", ")}]`,
errorCode: "k8s_job_deleted_externally",
resultJson: { stdout },
};
}
if (parsedStream.llmApiEmptyResponse) {
return {
exitCode,
signal: null,
timedOut: false,
errorMessage: "LLM API returned an empty response (stop_reason: null, output_tokens: 0) — the upstream model API may be degraded or misconfigured",
errorCode: "llm_api_error",
resultJson: { stdout },
};
}
if (parsedStream.truncatedMidStream) {
// Re-query pod state with retry — the initial single-shot read can lose
// to kubelet propagation lag and surface a useless "pod state unavailable"
// message that hides the real exit cause (OOMKilled, SIGTERM, etc). The
// retry distinguishes pod-genuinely-gone from terminated-state-lag and
// gives the operator the actual exit code/reason where possible (FAR-107).
let lookup: PodLookupResult | undefined;
let refreshedState = podTerminatedState;
try {
lookup = await getPodLookupWithRetry(namespace, jobName, kubeconfigPath);
refreshedState = lookup.state;
if (refreshedState && refreshedState.exitCode !== null) {
exitCode = refreshedState.exitCode;
}
} catch (err) {
await onLog("stderr", `[paperclip] truncation diagnostic: pod re-query failed (${err instanceof Error ? err.message : String(err)})\n`).catch(() => {});
}
const cause = describeTruncationCause(refreshedState, lookup);
const modelHint = parsedStream.model ? ` (model: ${parsedStream.model})` : "";
return {
exitCode,
signal: null,
timedOut: false,
errorMessage: `Claude run was truncated mid-stream${modelHint} — assistant produced content but no result event arrived; ${cause}`,
errorCode: "claude_truncated",
resultJson: { stdout },
};
}
return {
exitCode,
signal: null,
timedOut: false,
errorMessage: buildPartialRunError(exitCode, parsedStream.model, stdout),
errorMessage: buildPartialRunError(exitCode, parsedStream.model, stdout, podTerminatedState),
resultJson: { stdout },
};
}
+125
View File
@@ -154,6 +154,131 @@ more raw output`;
// Should not be "Hello world\n\nHello world"
expect(result.summary.split("Hello world").length).toBe(2);
});
it("sets llmApiEmptyResponse=true when stop_reason:null and usage.output_tokens:0", () => {
const initLine = JSON.stringify({ type: "system", subtype: "init", model: "MiniMax-M2.7", session_id: "sess_1" });
const assistantEvent = JSON.stringify({
type: "assistant",
session_id: "sess_1",
message: {
id: "msg_abc",
stop_reason: null,
usage: { input_tokens: 100, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 },
content: [],
},
});
const result = parseClaudeStreamJson([initLine, assistantEvent].join("\n"));
expect(result.llmApiEmptyResponse).toBe(true);
expect(result.resultJson).toBeNull();
});
it("sets llmApiEmptyResponse=true when stop_reason:null and message-level output_tokens:0", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: { stop_reason: null, output_tokens: 0, content: [] },
});
const result = parseClaudeStreamJson(assistantEvent);
expect(result.llmApiEmptyResponse).toBe(true);
});
it("does not set llmApiEmptyResponse when stop_reason is non-null", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: {
stop_reason: "end_turn",
usage: { output_tokens: 0 },
content: [],
},
});
const result = parseClaudeStreamJson(assistantEvent);
expect(result.llmApiEmptyResponse).toBe(false);
});
it("does not set llmApiEmptyResponse when output_tokens > 0", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: {
stop_reason: null,
usage: { output_tokens: 5 },
content: [{ type: "text", text: "hello" }],
},
});
const result = parseClaudeStreamJson(assistantEvent);
expect(result.llmApiEmptyResponse).toBe(false);
});
it("clears llmApiEmptyResponse when a result event follows the empty assistant event", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: { stop_reason: null, usage: { output_tokens: 0 }, content: [] },
});
const resultEvent = JSON.stringify({
type: "result",
result: "Done",
subtype: "stop",
total_cost_usd: 0.001,
usage: { input_tokens: 10, output_tokens: 5, cache_read_input_tokens: 0 },
});
const result = parseClaudeStreamJson([assistantEvent, resultEvent].join("\n"));
expect(result.llmApiEmptyResponse).toBe(false);
expect(result.resultJson).not.toBeNull();
});
it("sets truncatedMidStream=true when assistant event with output_tokens>0 has no result (FAR-95)", () => {
const initLine = JSON.stringify({ type: "system", subtype: "init", model: "claude-opus-4-7", session_id: "sess_1" });
const assistantEvent = JSON.stringify({
type: "assistant",
session_id: "sess_1",
message: {
id: "msg_abc",
stop_reason: null,
usage: { input_tokens: 1, output_tokens: 35, cache_creation_input_tokens: 523, cache_read_input_tokens: 46295 },
content: [{ type: "tool_use", id: "tool_1", name: "Bash", input: { command: "echo hi" } }],
},
});
const result = parseClaudeStreamJson([initLine, assistantEvent].join("\n"));
expect(result.truncatedMidStream).toBe(true);
expect(result.llmApiEmptyResponse).toBe(false);
expect(result.resultJson).toBeNull();
});
it("clears truncatedMidStream when a result event follows assistant content", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: { stop_reason: null, usage: { output_tokens: 35 }, content: [] },
});
const resultEvent = JSON.stringify({
type: "result",
result: "Done",
subtype: "stop",
total_cost_usd: 0.001,
usage: { input_tokens: 10, output_tokens: 5, cache_read_input_tokens: 0 },
});
const result = parseClaudeStreamJson([assistantEvent, resultEvent].join("\n"));
expect(result.truncatedMidStream).toBe(false);
expect(result.resultJson).not.toBeNull();
});
it("does not set truncatedMidStream when assistant has output_tokens=0", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: { stop_reason: null, usage: { output_tokens: 0 }, content: [] },
});
const result = parseClaudeStreamJson(assistantEvent);
expect(result.truncatedMidStream).toBe(false);
});
it("sets llmApiEmptyResponse=false for normal result", () => {
const resultEvent = JSON.stringify({
type: "result",
result: "Done",
subtype: "stop",
total_cost_usd: 0.005,
usage: { input_tokens: 100, output_tokens: 200, cache_read_input_tokens: 50 },
});
const result = parseClaudeStreamJson(resultEvent);
expect(result.llmApiEmptyResponse).toBe(false);
});
});
describe("extractClaudeLoginUrl", () => {
+29
View File
@@ -15,6 +15,14 @@ export function parseClaudeStreamJson(stdout: string) {
// at the line level; this guard only needs to protect against the same
// message block being parsed twice.
const seenBlocks = new Set<string>();
// Set when we see stop_reason:null + output_tokens:0 on an assistant event
// with no subsequent result event — indicates the upstream LLM API returned
// an empty/malformed response (e.g. MiniMax degraded performance).
let llmApiEmptyResponse = false;
// Set when an assistant event with output_tokens > 0 was seen but no result
// event arrived — indicates the run was truncated mid-stream (pod terminated,
// OOMKill, or claude CLI crash after producing content).
let assistantContentSeen = false;
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
@@ -34,6 +42,21 @@ export function parseClaudeStreamJson(stdout: string) {
const message = parseObject(event.message);
const messageId = asString(message.id, "");
const content = Array.isArray(message.content) ? message.content : [];
// Detect empty LLM API response: stop_reason:null with zero output tokens.
// output_tokens may appear directly on message or nested under message.usage.
const stopReason = message.stop_reason;
const usageObj = parseObject(message.usage as Record<string, unknown>);
const outputTokens = typeof message.output_tokens === "number"
? message.output_tokens
: asNumber(usageObj.output_tokens, -1);
if (stopReason === null && outputTokens === 0) {
llmApiEmptyResponse = true;
}
if (outputTokens > 0) {
assistantContentSeen = true;
}
for (let i = 0; i < content.length; i++) {
const entry = content[i];
if (typeof entry !== "object" || entry === null || Array.isArray(entry)) continue;
@@ -55,6 +78,8 @@ export function parseClaudeStreamJson(stdout: string) {
if (type === "result") {
finalResult = event;
llmApiEmptyResponse = false; // result event means Claude completed normally
assistantContentSeen = false; // result event means stream was not truncated
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
}
}
@@ -67,6 +92,8 @@ export function parseClaudeStreamJson(stdout: string) {
usage: null as UsageSummary | null,
summary: assistantTexts.join("\n\n").trim(),
resultJson: null as Record<string, unknown> | null,
llmApiEmptyResponse,
truncatedMidStream: assistantContentSeen,
};
}
@@ -87,6 +114,8 @@ export function parseClaudeStreamJson(stdout: string) {
usage,
summary,
resultJson: finalResult,
llmApiEmptyResponse: false,
truncatedMidStream: false,
};
}