Compare commits

...

9 Commits

Author SHA1 Message Date
Test User 78fd702ccb 0.1.29 2026-04-23 02:48:58 +00:00
Chris Farhood 0bc1bb1dd1 Merge pull request #8 from farhoodliquor/fix/far-124-reattach-orphan-k8s-jobs
fix: reattach to orphaned K8s Jobs across Paperclip restarts (FAR-124)
2026-04-22 22:33:05 -04:00
Test User c8968598e4 fix: reattach to orphaned K8s Jobs across Paperclip restarts (FAR-124)
When the Paperclip pod restarts mid-run, the in-process setInterval
keepalive dies, `updatedAt` goes stale, and the server's orphan reaper
fails the run with the (misleading) "child pid 1 is no longer running"
message.  Paperclip then dispatches a continuation run, whose execute()
finds the previous run's K8s Job still happily running and deletes it
as an "orphan" — throwing away work and producing the transcript/run
cascade reported on FAR-124.

Changes:

- job-manifest: add `paperclip.io/task-id` and `paperclip.io/session-id`
  labels (sanitized via new `sanitizeLabelValue` helper) so a later
  execute() can identify an orphan as the continuation of the same
  logical unit of work.
- execute: in the concurrency guard, when `reattachOrphanedJobs` is on
  (default) and an orphan matches agent + task + session + is not
  terminal, pick it as the reattach target; delete only the other
  orphans.  Branch the build/create/waitForPod block so the reattach
  path skips manifest building, Secret creation, Job creation, and
  scheduling wait — it jumps straight to streaming logs and waiting
  for the existing pod's completion.
- config-schema: expose `reattachOrphanedJobs` toggle (default true).
- Tests: `sanitizeLabelValue`, `isReattachableOrphan`, new label
  presence/absence, config default.

No server-side changes; the misleading reaper message and lack of a
non-local retry path will be addressed in a follow-up upstream PR.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-22 21:59:25 +00:00
Test User a4631ac756 0.1.28
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-22 19:52:51 +00:00
Chris Farhood 1fc6a9c626 Merge pull request #7 from farhoodliquor/fix/far-123-duplicate-output-logs
fix(FAR-123): dedup replayed K8s log lines at the streaming UI boundary
2026-04-22 15:51:40 -04:00
Chris Farhood d71ff15443 Merge pull request #6 from farhoodliquor/fix/far-122-partial-log-stream-and-error-message
fix: partial log stream + better error messages for fast-exit containers (FAR-122 follow-up)
2026-04-22 15:51:05 -04:00
Test User 5e01ae99b3 fix: dedup replayed K8s log lines at the streaming UI boundary (FAR-123)
The K8s log follow stream replays the trailing few seconds of output on
every reconnect because `sinceSeconds` uses integer-second granularity
with a small safety buffer.  FAR-105 dedupped those replays at the final
parser (parse.ts), but the streaming UI consumes raw onLog chunks and
still showed each replayed assistant/tool event as a fresh entry — which
is how the duplicate "Three nits to fix…" blocks in the screenshot
appeared between successive tool calls.

Fix: add a stateful line-level dedup filter around onLog, shared across
reconnects.  Claude stream-json events are keyed by their stable
structural IDs (message.id, tool_use_id, session_id); non-JSON output
(paperclip status lines, shell output) passes through unchanged.

- New `src/server/log-dedup.ts` + tests: LogLineDedupFilter handles
  chunk-to-line buffering, replay dedup, and end-of-stream flush.
- `streamPodLogs` instantiates one filter per run so dedup state persists
  across reconnect attempts.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-22 19:49:04 +00:00
Test User 8c8c2f2ec0 fix: address review nits — refactor fallbacks, add unit tests (FAR-122)
- Merge both one-shot log fallbacks into a single conditional block using a
  cheap string-scan guard (`stdout.includes('"type":"result"')`) to avoid
  calling parseClaudeStreamJson twice and prevent double readPodLogs calls
  when the first fallback already ran.

- Extract error-message logic into `buildPartialRunError(exitCode, model, stdout)`
  (exported for tests) so the `!parsed` branch is a one-liner and the logic
  is independently testable.

- Export `isK8s404` for tests.

- Add execute.test.ts with 15 unit tests covering:
  - isK8s404: v0.x response.statusCode, v1.0+ response.status, direct
    statusCode, message-based detection, non-404 codes
  - buildPartialRunError: exitCode=0 path, empty stdout, init-only output
    (model surfaced), first non-system content line, null exitCode (-1),
    multiple consecutive system events

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-22 19:42:57 +00:00
Test User b9def0964e fix: improve partial-log handling and error messages for fast-exit containers (FAR-122)
- Add a second log fallback: if the follow stream captured partial output (init
  event present but no result event), attempt a one-shot readPodLogs before the
  pod is cleaned up. Fast-exiting containers (bad model, missing API key, etc.)
  can cause the follow stream to return only the init line before the connection
  drops; the one-shot read is more reliable for already-terminated containers.

- Improve the `!parsed` error message: skip system/init events when searching
  for the first content line, so the error reads "Claude started but did not
  produce a result (model: MiniMax-M2.7) — check API credentials..." instead of
  "Claude exited with code -1: {"type":"system","subtype":"init",...}".

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-22 19:33:15 +00:00
10 changed files with 878 additions and 96 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.27",
"version": "0.1.29",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.27",
"version": "0.1.29",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.27",
"version": "0.1.29",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+8
View File
@@ -42,6 +42,14 @@ describe("getConfigSchema", () => {
expect(field!.default).toBe(true);
});
it("reattachOrphanedJobs defaults to true", () => {
const schema = getConfigSchema();
const field = schema.fields.find((f: ConfigFieldSchema) => f.key === "reattachOrphanedJobs");
expect(field).toBeDefined();
expect(field!.type).toBe("toggle");
expect(field!.default).toBe(true);
});
it("has imagePullPolicy as select with correct options", () => {
const schema = getConfigSchema();
const field = schema.fields.find((f: ConfigFieldSchema) => f.key === "imagePullPolicy");
+7
View File
@@ -89,6 +89,13 @@ export function getConfigSchema(): AdapterConfigSchema {
label: "Retain Jobs",
hint: "Skip cleanup of completed Jobs for debugging purposes.",
},
{
type: "toggle",
key: "reattachOrphanedJobs",
label: "Reattach to Orphaned Jobs",
hint: "If a prior K8s Job for the same agent/task/session is still running (e.g. Paperclip restarted mid-run), attach to it and stream its output instead of deleting it and starting a new pod. Default: on.",
default: true,
},
// Resource Limits
{
type: "text",
+188
View File
@@ -0,0 +1,188 @@
import { describe, it, expect } from "vitest";
import type * as k8s from "@kubernetes/client-node";
import { isK8s404, buildPartialRunError, isReattachableOrphan } from "./execute.js";
function makeJob(opts: {
runId?: string;
agentId?: string;
taskId?: string;
sessionId?: string;
adapterType?: string;
terminal?: boolean;
}): k8s.V1Job {
const labels: Record<string, string> = {
"paperclip.io/adapter-type": opts.adapterType ?? "claude_k8s",
};
if (opts.agentId) labels["paperclip.io/agent-id"] = opts.agentId;
if (opts.runId) labels["paperclip.io/run-id"] = opts.runId;
if (opts.taskId) labels["paperclip.io/task-id"] = opts.taskId;
if (opts.sessionId) labels["paperclip.io/session-id"] = opts.sessionId;
return {
metadata: { name: "ac-job", namespace: "paperclip", labels },
status: opts.terminal
? { conditions: [{ type: "Complete", status: "True" }] }
: { conditions: [] },
} as k8s.V1Job;
}
describe("isK8s404", () => {
it("returns false for non-Error values", () => {
expect(isK8s404(null)).toBe(false);
expect(isK8s404(undefined)).toBe(false);
expect(isK8s404("string error")).toBe(false);
expect(isK8s404(404)).toBe(false);
});
it("returns false for unrelated errors", () => {
expect(isK8s404(new Error("something went wrong"))).toBe(false);
expect(isK8s404(new Error("HTTP-Code: 500 Message: Internal Server Error"))).toBe(false);
});
it("detects 404 from v1.0+ message format", () => {
const err = new Error("HTTP-Code: 404 Message: Unknown API Status Code! Body: ...");
expect(isK8s404(err)).toBe(true);
});
it("detects 404 from v0.x response.statusCode", () => {
const err = Object.assign(new Error("Not Found"), {
response: { statusCode: 404 },
});
expect(isK8s404(err)).toBe(true);
});
it("detects 404 from v1.0+ response.status", () => {
const err = Object.assign(new Error("Not Found"), {
response: { status: 404 },
});
expect(isK8s404(err)).toBe(true);
});
it("detects 404 from direct statusCode property", () => {
const err = Object.assign(new Error("Not Found"), { statusCode: 404 });
expect(isK8s404(err)).toBe(true);
});
it("does not match non-404 status codes on response", () => {
const err = Object.assign(new Error("Forbidden"), {
response: { statusCode: 403 },
});
expect(isK8s404(err)).toBe(false);
});
});
describe("buildPartialRunError", () => {
const initLine = JSON.stringify({
type: "system",
subtype: "init",
model: "claude-sonnet-4-6",
session_id: "sess_abc",
});
it("returns parse-failure message when exitCode is 0", () => {
expect(buildPartialRunError(0, "", "")).toBe("Failed to parse Claude JSON output");
expect(buildPartialRunError(0, "claude-sonnet-4-6", initLine)).toBe(
"Failed to parse Claude JSON output",
);
});
it("returns generic exit message when stdout is empty", () => {
expect(buildPartialRunError(1, "", "")).toBe("Claude exited with code 1");
expect(buildPartialRunError(null, "", "")).toBe("Claude exited with code -1");
});
it("skips system/init events and returns generic message when only init captured", () => {
const msg = buildPartialRunError(1, "claude-sonnet-4-6", initLine);
expect(msg).toBe(
"Claude started but did not produce a result (model: claude-sonnet-4-6) — check API credentials, model support, and adapter config",
);
});
it("includes model from parsedStream when stdout is init-only", () => {
const msg = buildPartialRunError(null, "MiniMax-M2.7", initLine);
expect(msg).toContain("MiniMax-M2.7");
expect(msg).not.toContain("type");
expect(msg).not.toContain("system");
});
it("uses first non-system line as content when present", () => {
const stdout = [initLine, "Error: no API key configured"].join("\n");
const msg = buildPartialRunError(1, "claude-sonnet-4-6", stdout);
expect(msg).toBe("Claude exited with code 1: Error: no API key configured");
});
it("uses first non-system JSON event as content", () => {
const resultLike = JSON.stringify({ type: "result", subtype: "error", result: "rate limit" });
const stdout = [initLine, resultLike].join("\n");
const msg = buildPartialRunError(2, "claude-sonnet-4-6", stdout);
expect(msg).toContain("rate limit");
expect(msg).toContain("code 2");
});
it("null exitCode renders as -1 in message", () => {
const msg = buildPartialRunError(null, "", "Some plain error text");
expect(msg).toBe("Claude exited with code -1: Some plain error text");
});
it("skips multiple consecutive system events", () => {
const anotherSystem = JSON.stringify({ type: "system", subtype: "other" });
const stdout = [initLine, anotherSystem, "real error line"].join("\n");
const msg = buildPartialRunError(1, "model-x", stdout);
expect(msg).toBe("Claude exited with code 1: real error line");
});
});
describe("isReattachableOrphan", () => {
const agentId = "agent-abc";
const taskId = "task-xyz";
const sessionId = "sess-123";
it("returns true when agent/task/session all match and Job is not terminal", () => {
const job = makeJob({ agentId, taskId, sessionId, runId: "old-run" });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(true);
});
it("returns false when the Job is already Complete", () => {
const job = makeJob({ agentId, taskId, sessionId, runId: "old-run", terminal: true });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when expected taskId is null (caller couldn't derive one)", () => {
const job = makeJob({ agentId, taskId, sessionId });
expect(isReattachableOrphan(job, { agentId, taskId: null, sessionId })).toBe(false);
});
it("returns false when expected sessionId is null", () => {
const job = makeJob({ agentId, taskId, sessionId });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId: null })).toBe(false);
});
it("returns false when agent id doesn't match", () => {
const job = makeJob({ agentId: "agent-other", taskId, sessionId });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when task id doesn't match", () => {
const job = makeJob({ agentId, taskId: "task-other", sessionId });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when session id doesn't match", () => {
const job = makeJob({ agentId, taskId, sessionId: "sess-other" });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when the Job is from a different adapter type", () => {
const job = makeJob({ agentId, taskId, sessionId, adapterType: "claude_local" });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when Job has no task-id label (labels were introduced in FAR-124)", () => {
const job = makeJob({ agentId, sessionId });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when Job has no session-id label", () => {
const job = makeJob({ agentId, taskId });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
});
+273 -92
View File
@@ -7,7 +7,8 @@ import {
isClaudeUnknownSessionError,
} from "./parse.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
import { buildJobManifest } from "./job-manifest.js";
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import type * as k8s from "@kubernetes/client-node";
import { Writable } from "node:stream";
@@ -19,8 +20,9 @@ const MAX_LOG_RECONNECT_ATTEMPTS = 50;
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
* Exported for unit tests.
*/
function isK8s404(err: unknown): boolean {
export function isK8s404(err: unknown): boolean {
if (!(err instanceof Error)) return false;
const e = err as unknown as Record<string, unknown>;
const resp = e.response as Record<string, unknown> | undefined;
@@ -29,6 +31,71 @@ function isK8s404(err: unknown): boolean {
return /HTTP-Code:\s*404\b/.test(err.message);
}
/**
* Build the error message when Claude's stdout contains no result event.
* Skips system/init event lines so the UI doesn't display the raw init JSON.
* Exported for unit tests.
*/
export function buildPartialRunError(
exitCode: number | null,
model: string,
stdout: string,
): string {
if (exitCode === 0) return "Failed to parse Claude JSON output";
// Walk stdout lines, skip system events, return the first real content line.
const firstContentLine = stdout.split(/\r?\n/)
.map((l) => l.trim())
.find((l) => {
if (!l) return false;
try {
const obj = JSON.parse(l);
if (typeof obj === "object" && obj !== null && (obj as Record<string, unknown>).type === "system") return false;
} catch {
// not JSON — treat as content
}
return true;
}) ?? "";
// If we only have system/init events and nothing else, surface the model
// name so the operator can diagnose missing credentials or unsupported model.
const initOnlyOutput = stdout.trim() !== "" && model !== "" && !firstContentLine;
if (initOnlyOutput) {
const modelHint = model ? ` (model: ${model})` : "";
return `Claude started but did not produce a result${modelHint} — check API credentials, model support, and adapter config`;
}
return firstContentLine
? `Claude exited with code ${exitCode ?? -1}: ${firstContentLine}`
: `Claude exited with code ${exitCode ?? -1}`;
}
/**
* Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does
* not match the current runId) as a potential reattach target. A Job is
* reattachable when it belongs to the same agent, same task, and same resume
* session as the current run — meaning the previous Paperclip instance was
* mid-stream on the exact piece of work this new run was dispatched to do.
* Exported for unit tests.
*/
export function isReattachableOrphan(
job: k8s.V1Job,
expected: { agentId: string; taskId: string | null; sessionId: string | null },
): boolean {
if (!expected.taskId || !expected.sessionId) return false;
const labels = job.metadata?.labels ?? {};
if (labels["paperclip.io/adapter-type"] !== "claude_k8s") return false;
if (labels["paperclip.io/agent-id"] !== expected.agentId) return false;
if (labels["paperclip.io/task-id"] !== expected.taskId) return false;
if (labels["paperclip.io/session-id"] !== expected.sessionId) return false;
const conditions = job.status?.conditions ?? [];
const terminal = conditions.some(
(c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True",
);
if (terminal) return false;
return true;
}
/**
* Wait for the Job's pod to reach a terminal or running state.
* Returns the pod name once logs can be streamed, or throws on failure.
@@ -150,6 +217,7 @@ async function streamPodLogsOnce(
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
sinceSeconds?: number,
dedup?: LogLineDedupFilter,
): Promise<string> {
const logApi = getLogApi(kubeconfigPath);
const chunks: string[] = [];
@@ -158,7 +226,12 @@ async function streamPodLogsOnce(
write(chunk: Buffer, _encoding, callback) {
const text = chunk.toString("utf-8");
chunks.push(text);
void onLog("stdout", text).then(() => callback(), callback);
const emitted = dedup ? dedup.filter(text) : text;
if (!emitted) {
callback();
return;
}
void onLog("stdout", emitted).then(() => callback(), callback);
},
});
@@ -198,6 +271,9 @@ async function streamPodLogs(
// reconnects use a tight window instead of an ever-growing one anchored
// at stream start. This is the primary fix for FAR-105 duplicative logs.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Shared across reconnects so replayed lines inside the `sinceSeconds`
// overlap window are dropped before they reach the streaming UI (FAR-123).
const dedup = new LogLineDedupFilter();
while (!stopSignal?.stopped) {
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
@@ -217,7 +293,7 @@ async function streamPodLogs(
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup);
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
@@ -237,6 +313,11 @@ async function streamPodLogs(
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
}
// Flush any buffered partial line so the final assistant/result chunk
// isn't dropped when the stream ends mid-line.
const tail = dedup.flush();
if (tail) await onLog("stdout", tail);
return allChunks.join("");
}
@@ -356,10 +437,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
// still be running. We detect those by comparing the Job's run-id label against
// the current runId and clean them up so this execution can proceed.
// the current runId. When reattachOrphanedJobs is enabled and the orphan matches
// the current agent+task+session, we attach to it instead of deleting it (FAR-124).
const agentId = ctx.agent.id;
const selfPod = await getSelfPodInfo(kubeconfigPath);
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
const runtimeSessionParams = parseObject(runtime.sessionParams);
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null;
const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, "");
const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null;
let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null;
try {
const batchApi = getBatchApi(kubeconfigPath);
const existing = await batchApi.listNamespacedJob({
@@ -379,10 +468,42 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
(j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId,
);
if (orphaned.length > 0) {
const orphanNames = orphaned.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of orphaned) {
// Pick the most recent reattachable orphan — same agent + task + session,
// not terminal. Only one target is chosen; any other orphans get
// cleaned up as before.
if (reattachOrphanedJobs && orphaned.length > 0) {
const candidates = orphaned
.filter((j) =>
isReattachableOrphan(j, {
agentId,
taskId: currentTaskLabel,
sessionId: currentSessionLabel,
}),
)
.sort((a, b) => {
const at = new Date(a.metadata?.creationTimestamp ?? 0).getTime();
const bt = new Date(b.metadata?.creationTimestamp ?? 0).getTime();
return bt - at;
});
const chosen = candidates[0];
const chosenName = chosen?.metadata?.name;
if (chosen && chosenName) {
reattachTarget = {
jobName: chosenName,
namespace: chosen.metadata?.namespace ?? guardNamespace,
priorRunId: chosen.metadata?.labels?.["paperclip.io/run-id"] ?? "",
image: chosen.spec?.template?.spec?.containers?.[0]?.image ?? "unknown",
};
}
}
const toDelete = orphaned.filter(
(j) => !reattachTarget || j.metadata?.name !== reattachTarget.jobName,
);
if (toDelete.length > 0) {
const orphanNames = toDelete.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${toDelete.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of toDelete) {
const name = j.metadata?.name;
if (name) {
await cleanupJob(guardNamespace, name, onLog, kubeconfigPath);
@@ -420,84 +541,120 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
// Build Job manifest
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
ctx,
selfPod,
});
// Report invocation metadata
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: claudeArgs,
commandNotes: [
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
context: ctx.context,
} as Parameters<typeof onMeta>[0]);
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
const coreApi = getCoreApi(kubeconfigPath);
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
const batchApi = getBatchApi(kubeconfigPath);
let jobName: string;
let namespace: string;
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
if (reattachTarget) {
jobName = reattachTarget.jobName;
namespace = reattachTarget.namespace;
// Announce reattach metadata. Prompt and args aren't known here — they
// belong to the prior run that created this pod and are already present
// on the running container.
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: [],
commandNotes: [
`Image: ${reattachTarget.image}`,
`Namespace: ${namespace}`,
`Reattached from prior run: ${reattachTarget.priorRunId || "unknown"}`,
`Timeout: ${timeoutSec}s`,
],
prompt: "",
context: ctx.context,
} as Parameters<typeof onMeta>[0]);
}
await onLog("stdout", `[paperclip] Reattaching to in-flight K8s Job ${jobName} in namespace ${namespace} (prior run ${reattachTarget.priorRunId || "unknown"})\n`);
} else {
// Build Job manifest
const built = buildJobManifest({ ctx, selfPod });
const job = built.job;
jobName = built.jobName;
namespace = built.namespace;
const prompt = built.prompt;
const claudeArgs = built.claudeArgs;
const promptMetrics = built.promptMetrics;
promptSecret = built.promptSecret;
// Report invocation metadata
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: claudeArgs,
commandNotes: [
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
context: ctx.context,
} as Parameters<typeof onMeta>[0]);
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
},
},
stringData: promptSecret.data,
},
stringData: promptSecret.data,
},
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
};
}
}
// Create the Job
try {
await batchApi.createNamespacedJob({ namespace, body: job });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
errorCode: "k8s_job_create_failed",
};
}
}
// Create the Job
const batchApi = getBatchApi(kubeconfigPath);
try {
await batchApi.createNamespacedJob({ namespace, body: job });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
errorCode: "k8s_job_create_failed",
};
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
let stdout = "";
let exitCode: number | null = null;
let jobTimedOut = false;
@@ -511,8 +668,23 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
let podName: string;
try {
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
if (reattachTarget) {
// Pod is already running from the prior run — look it up directly.
const podList = await coreApi.listNamespacedPod({
namespace,
labelSelector: `job-name=${jobName}`,
});
const pod = podList.items[0];
const name = pod?.metadata?.name;
if (!name) {
throw new Error(`Reattach target Job ${jobName} has no pod`);
}
podName = name;
await onLog("stdout", `[paperclip] Reattached to pod ${podName}\n`);
} else {
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
}
// Notify the server that execution has started. This sets
// processStartedAt and refreshes updatedAt in the DB, which the
@@ -526,13 +698,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Pod scheduling failed: ${msg}\n`);
const phase = reattachTarget ? "reattach" : "scheduling";
await onLog("stderr", `[paperclip] Pod ${phase} failed: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Pod scheduling failed: ${msg}`,
errorCode: "k8s_pod_schedule_failed",
errorMessage: `Pod ${phase} failed: ${msg}`,
errorCode: reattachTarget ? "k8s_pod_reattach_failed" : "k8s_pod_schedule_failed",
};
}
@@ -640,13 +813,27 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
stdout = logResult.value;
}
// If the follow stream missed output (container exited quickly), do a
// one-shot log read as fallback before the pod is cleaned up.
if (!stdout.trim()) {
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
stdout = await readPodLogs(namespace, podName, kubeconfigPath);
if (stdout.trim()) {
// One-shot log fallback: handles two failure modes with a single read.
// Mode 1 — empty stream: the follow stream returned nothing (fast exit before connection).
// Mode 2 — partial stream: we have some output but no result event (follow stream raced
// with container exit and captured only the init line before the connection dropped).
// A one-shot readPodLogs is more reliable for already-terminated containers and reads
// from the beginning of the log, giving us the full output.
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
// the authoritative parse happens once below after all fallbacks complete).
const hasResultEvent = stdout.includes('"type":"result"');
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
if (needsOneShot) {
if (!stdout.trim()) {
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
}
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
if (!stdout.trim() && oneShotLogs.trim()) {
stdout = oneShotLogs;
await onLog("stdout", stdout);
} else if (oneShotLogs && oneShotLogs.length > stdout.length) {
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
stdout = oneShotLogs;
}
}
@@ -739,16 +926,11 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
if (!parsed) {
const stderrLine = stdout.split(/\r?\n/).map((l) => l.trim()).find(Boolean) ?? "";
return {
exitCode,
signal: null,
timedOut: false,
errorMessage: exitCode === 0
? "Failed to parse Claude JSON output"
: stderrLine
? `Claude exited with code ${exitCode ?? -1}: ${stderrLine}`
: `Claude exited with code ${exitCode ?? -1}`,
errorMessage: buildPartialRunError(exitCode, parsedStream.model, stdout),
resultJson: { stdout },
};
}
@@ -762,8 +944,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
})();
const runtimeSessionParams = parseObject(runtime.sessionParams);
const fallbackSessionId = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const fallbackSessionId = currentSessionIdRaw;
const resolvedSessionId = parsedStream.sessionId
?? (asString(parsed.session_id as string, fallbackSessionId) || fallbackSessionId);
const model = asString(config.model, "");
+60 -1
View File
@@ -1,6 +1,6 @@
import { describe, it, expect, beforeEach } from "vitest";
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
import { buildJobManifest, buildRtkSetupCommands } from "./job-manifest.js";
import { buildJobManifest, buildRtkSetupCommands, sanitizeLabelValue } from "./job-manifest.js";
import type { SelfPodInfo } from "./k8s-client.js";
function makeCtx(overrides: Partial<AdapterExecutionContext> = {}): AdapterExecutionContext {
@@ -136,6 +136,36 @@ describe("buildJobManifest", () => {
expect(job.metadata?.labels?.env).toBe("prod");
expect(job.metadata?.labels?.["paperclip.io/adapter-type"]).toBe("claude_k8s");
});
it("adds task-id label when context provides taskId", () => {
ctx.context = { taskId: "task-xyz-789" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBe("task-xyz-789");
});
it("falls back to issueId when taskId absent", () => {
ctx.context = { issueId: "issue-42" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBe("issue-42");
});
it("adds session-id label when runtime provides sessionId", () => {
ctx.runtime = { ...ctx.runtime, sessionId: "sess-abc-1234" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBe("sess-abc-1234");
});
it("reads sessionId from runtime.sessionParams when sessionId prop missing", () => {
ctx.runtime = { ...ctx.runtime, sessionParams: { sessionId: "sess-from-params" } };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBe("sess-from-params");
});
it("omits task-id and session-id labels when neither is provided", () => {
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBeUndefined();
expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBeUndefined();
});
});
describe("annotations", () => {
@@ -729,3 +759,32 @@ describe("buildJobManifest", () => {
});
});
});
describe("sanitizeLabelValue", () => {
it("passes through already-valid UUIDs and slugs", () => {
expect(sanitizeLabelValue("abc-123-def")).toBe("abc-123-def");
expect(sanitizeLabelValue("0d8b4472-c42c-4052-aab1-e32897909afa")).toBe("0d8b4472-c42c-4052-aab1-e32897909afa");
});
it("strips characters outside [a-zA-Z0-9._-]", () => {
expect(sanitizeLabelValue("task:xyz/123")).toBe("taskxyz123");
expect(sanitizeLabelValue("abc 123")).toBe("abc123");
});
it("trims leading/trailing non-alphanumeric characters", () => {
expect(sanitizeLabelValue("--abc--")).toBe("abc");
expect(sanitizeLabelValue("...123...")).toBe("123");
});
it("truncates to the configured maxLen", () => {
const long = "a".repeat(200);
const out = sanitizeLabelValue(long, 63);
expect(out?.length).toBe(63);
});
it("returns null when no alphanumeric characters remain", () => {
expect(sanitizeLabelValue("---")).toBeNull();
expect(sanitizeLabelValue("")).toBeNull();
expect(sanitizeLabelValue(" ")).toBeNull();
});
});
+20
View File
@@ -202,6 +202,17 @@ function sanitizeForK8sName(value: string, maxLen = 16): string {
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
}
/**
* Sanitize a string for use as a Kubernetes label value (RFC 1123 subset:
* `[a-zA-Z0-9]([-_.a-zA-Z0-9]*[a-zA-Z0-9])?`, max 63 chars). Returns `null`
* when no usable characters remain — the caller should omit the label.
*/
export function sanitizeLabelValue(value: string, maxLen = 63): string | null {
const cleaned = value.replace(/[^a-zA-Z0-9._-]/g, "").slice(0, maxLen);
const trimmed = cleaned.replace(/^[^a-zA-Z0-9]+/, "").replace(/[^a-zA-Z0-9]+$/, "");
return trimmed.length > 0 ? trimmed : null;
}
/**
* Build a short deterministic hash suffix from the raw inputs to avoid
* collisions when sanitized slugs happen to be identical.
@@ -428,6 +439,15 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
"paperclip.io/company-id": agent.companyId,
"paperclip.io/adapter-type": "claude_k8s",
};
// Reattach-target labels: let a future execute() identify this Job as the
// continuation of the same logical unit of work (same task + same resume
// session) so it can attach to the running pod across a Paperclip restart
// instead of deleting it and starting over (FAR-124).
const taskIdRaw = asString(context.taskId, "") || asString(context.issueId, "");
const taskLabel = taskIdRaw ? sanitizeLabelValue(taskIdRaw) : null;
if (taskLabel) labels["paperclip.io/task-id"] = taskLabel;
const sessionLabel = runtimeSessionId ? sanitizeLabelValue(runtimeSessionId) : null;
if (sessionLabel) labels["paperclip.io/session-id"] = sessionLabel;
for (const [key, value] of Object.entries(extraLabels)) {
labels[key] = value;
}
+173
View File
@@ -0,0 +1,173 @@
import { describe, it, expect } from "vitest";
import { LogLineDedupFilter, eventDedupKey } from "./log-dedup.js";
function assistantEvent(id: string, text: string): string {
return JSON.stringify({
type: "assistant",
session_id: "sess_1",
message: {
id,
content: [{ type: "text", text }],
},
});
}
function userToolResultEvent(toolUseId: string, content: string): string {
return JSON.stringify({
type: "user",
session_id: "sess_1",
message: {
content: [{ type: "tool_result", tool_use_id: toolUseId, content }],
},
});
}
function systemInitEvent(sessionId: string): string {
return JSON.stringify({
type: "system",
subtype: "init",
session_id: sessionId,
model: "claude-opus-4-7",
});
}
function resultEvent(sessionId: string): string {
return JSON.stringify({
type: "result",
subtype: "success",
session_id: sessionId,
result: "done",
total_cost_usd: 0.01,
usage: { input_tokens: 1, output_tokens: 1, cache_read_input_tokens: 0 },
});
}
describe("eventDedupKey", () => {
it("keys assistant events by message.id", () => {
const key = eventDedupKey(JSON.parse(assistantEvent("msg_abc", "hi")));
expect(key).toBe("assistant:msg_abc");
});
it("keys user tool_result events by tool_use_id", () => {
const key = eventDedupKey(JSON.parse(userToolResultEvent("toolu_1", "ok")));
expect(key).toBe("user:tool_result:toolu_1");
});
it("keys system init events by session_id", () => {
const key = eventDedupKey(JSON.parse(systemInitEvent("sess_xyz")));
expect(key).toBe("system:init:sess_xyz");
});
it("keys result events by session_id", () => {
const key = eventDedupKey(JSON.parse(resultEvent("sess_xyz")));
expect(key).toBe("result:sess_xyz");
});
it("returns null for assistant events missing message.id", () => {
const event = { type: "assistant", message: { content: [] } };
expect(eventDedupKey(event)).toBeNull();
});
it("returns null for unknown event types", () => {
expect(eventDedupKey({ type: "unknown" })).toBeNull();
expect(eventDedupKey({})).toBeNull();
});
});
describe("LogLineDedupFilter", () => {
it("passes unique lines through unchanged", () => {
const filter = new LogLineDedupFilter();
const a = assistantEvent("msg_1", "hello");
const b = assistantEvent("msg_2", "world");
expect(filter.filter(`${a}\n${b}\n`)).toBe(`${a}\n${b}\n`);
});
it("drops assistant events replayed with the same message.id", () => {
const filter = new LogLineDedupFilter();
const a = assistantEvent("msg_1", "Three nits to fix.");
filter.filter(`${a}\n`);
expect(filter.filter(`${a}\n`)).toBe("");
});
it("drops user tool_result events replayed with the same tool_use_id", () => {
const filter = new LogLineDedupFilter();
const a = userToolResultEvent("toolu_abc", "file contents");
filter.filter(`${a}\n`);
expect(filter.filter(`${a}\n`)).toBe("");
});
it("drops system init and result events on replay", () => {
const filter = new LogLineDedupFilter();
const init = systemInitEvent("sess_1");
const result = resultEvent("sess_1");
filter.filter(`${init}\n${result}\n`);
expect(filter.filter(`${init}\n${result}\n`)).toBe("");
});
it("buffers incomplete trailing lines across chunks", () => {
const filter = new LogLineDedupFilter();
const line = assistantEvent("msg_1", "hello");
const mid = Math.floor(line.length / 2);
const out1 = filter.filter(line.slice(0, mid));
const out2 = filter.filter(line.slice(mid) + "\n");
expect(out1).toBe("");
expect(out2).toBe(`${line}\n`);
});
it("flush() emits a final incomplete line that was not replayed", () => {
const filter = new LogLineDedupFilter();
const line = assistantEvent("msg_tail", "no newline");
filter.filter(line);
expect(filter.flush()).toBe(line);
});
it("flush() drops an incomplete line that was already seen with a newline", () => {
const filter = new LogLineDedupFilter();
const line = assistantEvent("msg_same", "x");
filter.filter(`${line}\n`);
filter.filter(line);
expect(filter.flush()).toBe("");
});
it("passes non-JSON lines through every time (does not dedup paperclip status)", () => {
const filter = new LogLineDedupFilter();
const status = "[paperclip] keepalive — job foo running\n";
expect(filter.filter(status)).toBe(status);
expect(filter.filter(status)).toBe(status);
});
it("dedups structurally identical JSON with identical content (raw fallback)", () => {
const filter = new LogLineDedupFilter();
// No recognized type → raw fallback key.
const line = JSON.stringify({ foo: "bar", baz: 1 });
filter.filter(`${line}\n`);
expect(filter.filter(`${line}\n`)).toBe("");
});
it("handles multiple complete lines in a single chunk with partial trailing", () => {
const filter = new LogLineDedupFilter();
const a = assistantEvent("msg_a", "a");
const b = assistantEvent("msg_b", "b");
const c = assistantEvent("msg_c", "c");
// a and b are complete, c is partial (no trailing newline).
const out = filter.filter(`${a}\n${b}\n${c}`);
expect(out).toBe(`${a}\n${b}\n`);
// Completing c later should emit exactly c.
expect(filter.filter("\n")).toBe(`${c}\n`);
});
it("drops the classic FAR-123 replay scenario across reconnects", () => {
const filter = new LogLineDedupFilter();
const assistantNits = assistantEvent("msg_nits", "Three nits to fix. Let me look at an existing test file...");
const assistantWrite = assistantEvent("msg_write", "Now I need to write unit tests");
// First stream attempt emits both events.
const out1 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
expect(out1).toBe(`${assistantNits}\n${assistantWrite}\n`);
// Reconnect replays both within the sinceSeconds overlap — filter should drop them.
const out2 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
expect(out2).toBe("");
// And a genuinely new event after the replay should still pass through.
const assistantFresh = assistantEvent("msg_fresh", "next turn");
expect(filter.filter(`${assistantFresh}\n`)).toBe(`${assistantFresh}\n`);
});
});
+146
View File
@@ -0,0 +1,146 @@
/**
* Line-level dedup filter for the K8s log stream.
*
* The K8s log follow stream can reconnect with an overlapping `sinceSeconds`
* window (integer-second granularity + a safety buffer), which replays a few
* seconds of recent output on every reconnect. Without dedup those replayed
* lines appear as duplicate events in the streaming UI — the same assistant
* text block shows up between every subsequent tool call (FAR-123).
*
* The filter operates at the chunk → line level: chunks are split on `\n`,
* incomplete trailing content is buffered until the next chunk, and each
* complete line is emitted at most once. JSON-shaped Claude stream-json
* events are keyed by their stable structural IDs; non-JSON lines pass
* through unchanged so genuinely-repeated status lines are not swallowed.
*/
type Parsed = Record<string, unknown>;
function asString(value: unknown): string {
return typeof value === "string" ? value : "";
}
function asRecord(value: unknown): Parsed | null {
if (typeof value !== "object" || value === null || Array.isArray(value)) return null;
return value as Parsed;
}
/**
* Build a stable dedup key for a Claude stream-json event. Returns `null`
* when the event is not a recognized Claude event — those lines fall back to
* raw-content hashing so non-JSON output (paperclip status lines, shell
* output) is never deduped by identity.
*/
export function eventDedupKey(event: Parsed): string | null {
const type = asString(event.type);
if (type === "system") {
const subtype = asString(event.subtype);
const sessionId = asString(event.session_id);
if (subtype === "init" && sessionId) return `system:init:${sessionId}`;
return null;
}
if (type === "assistant") {
const message = asRecord(event.message);
const id = message ? asString(message.id) : "";
if (id) return `assistant:${id}`;
return null;
}
if (type === "user") {
const message = asRecord(event.message);
const content = message && Array.isArray(message.content) ? message.content : [];
const toolUseIds: string[] = [];
for (const entry of content) {
const block = asRecord(entry);
if (!block) continue;
const toolUseId = asString(block.tool_use_id);
if (toolUseId) toolUseIds.push(toolUseId);
}
if (toolUseIds.length > 0) return `user:tool_result:${toolUseIds.join(",")}`;
return null;
}
if (type === "result") {
const sessionId = asString(event.session_id);
return sessionId ? `result:${sessionId}` : "result:unknown";
}
return null;
}
/**
* Stateful line-level dedup filter. Emits `filter(chunk)` output through
* the caller — preserves original chunk formatting (including trailing
* newlines) for lines that pass the dedup check.
*/
export class LogLineDedupFilter {
private buffer = "";
private readonly seenKeys = new Set<string>();
/**
* Process a chunk and return the subset that should be forwarded.
* Incomplete trailing content (no terminating newline) is buffered and
* emitted on the next chunk that completes the line (or on flush()).
*/
filter(chunk: string): string {
if (!chunk) return "";
const combined = this.buffer + chunk;
const endsWithNewline = combined.endsWith("\n");
const parts = combined.split("\n");
if (endsWithNewline) {
// Discard the final empty element — last line was complete.
parts.pop();
this.buffer = "";
} else {
// Last element is an incomplete line — hold it for the next chunk.
this.buffer = parts.pop() ?? "";
}
const out: string[] = [];
for (const line of parts) {
if (this.shouldEmit(line)) out.push(line);
}
if (out.length === 0) return "";
return out.join("\n") + "\n";
}
/**
* Flush any incomplete trailing content. Called when the stream ends
* without a terminating newline so the final partial line isn't lost.
*/
flush(): string {
const pending = this.buffer;
this.buffer = "";
if (!pending) return "";
return this.shouldEmit(pending) ? pending : "";
}
private shouldEmit(line: string): boolean {
const trimmed = line.trim();
if (!trimmed) return true;
// Only attempt dedup on JSON-shaped lines; pass shell/text output through.
if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true;
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch {
return true;
}
const event = asRecord(parsed);
if (!event) return true;
// Recognized Claude stream-json event → structural key.
const structuralKey = eventDedupKey(event);
const key = structuralKey ?? `raw:${trimmed}`;
if (this.seenKeys.has(key)) return false;
this.seenKeys.add(key);
return true;
}
}