Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| baf7e2d44d | |||
| 77ed2004f8 | |||
| 69d0f4972f | |||
| c7706d742f | |||
| 8937fb2804 | |||
| 77e9aa9b37 | |||
| 683ea2d8b1 | |||
| dd859c74a8 | |||
| b3c1519cf5 | |||
| 78fd702ccb | |||
| 0bc1bb1dd1 | |||
| c8968598e4 | |||
| a4631ac756 | |||
| 1fc6a9c626 | |||
| d71ff15443 | |||
| 5e01ae99b3 |
@@ -29,24 +29,21 @@ jobs:
|
||||
needs: test
|
||||
runs-on: ubuntu-latest
|
||||
if: github.ref == 'refs/heads/master' && github.event_name == 'push'
|
||||
permissions:
|
||||
id-token: write
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22"
|
||||
registry-url: "https://registry.npmjs.org"
|
||||
cache: "npm"
|
||||
|
||||
- run: npm ci
|
||||
|
||||
- run: npm run build
|
||||
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22"
|
||||
registry-url: "https://registry.npmjs.org"
|
||||
cache: "npm"
|
||||
|
||||
- name: Publish (skip if version already exists)
|
||||
run: |
|
||||
PKG_NAME=$(node -p "require('./package.json').name")
|
||||
@@ -54,7 +51,7 @@ jobs:
|
||||
if npm view "${PKG_NAME}@${PKG_VERSION}" version 2>/dev/null; then
|
||||
echo "Version ${PKG_VERSION} already published — skipping."
|
||||
else
|
||||
npm publish --access public
|
||||
npm publish --provenance --access public
|
||||
fi
|
||||
env:
|
||||
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
|
||||
|
||||
Generated
+2
-2
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.27",
|
||||
"version": "0.1.31",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.27",
|
||||
"version": "0.1.31",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@kubernetes/client-node": "^1.0.0",
|
||||
|
||||
+4
-4
@@ -1,16 +1,16 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.27",
|
||||
"version": "0.1.32",
|
||||
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
|
||||
"license": "MIT",
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/farhoodliquor/paperclip-adapter-claude-k8s"
|
||||
"url": "https://github.com/farhoodlabs/paperclip-adapter-claude-k8s"
|
||||
},
|
||||
"bugs": {
|
||||
"url": "https://github.com/farhoodliquor/paperclip-adapter-claude-k8s/issues"
|
||||
"url": "https://github.com/farhoodlabs/paperclip-adapter-claude-k8s/issues"
|
||||
},
|
||||
"homepage": "https://github.com/farhoodliquor/paperclip-adapter-claude-k8s#readme",
|
||||
"homepage": "https://github.com/farhoodlabs/paperclip-adapter-claude-k8s#readme",
|
||||
"type": "module",
|
||||
"paperclip": {
|
||||
"adapterUiParser": "1.0.0"
|
||||
|
||||
@@ -42,6 +42,14 @@ describe("getConfigSchema", () => {
|
||||
expect(field!.default).toBe(true);
|
||||
});
|
||||
|
||||
it("reattachOrphanedJobs defaults to true", () => {
|
||||
const schema = getConfigSchema();
|
||||
const field = schema.fields.find((f: ConfigFieldSchema) => f.key === "reattachOrphanedJobs");
|
||||
expect(field).toBeDefined();
|
||||
expect(field!.type).toBe("toggle");
|
||||
expect(field!.default).toBe(true);
|
||||
});
|
||||
|
||||
it("has imagePullPolicy as select with correct options", () => {
|
||||
const schema = getConfigSchema();
|
||||
const field = schema.fields.find((f: ConfigFieldSchema) => f.key === "imagePullPolicy");
|
||||
|
||||
@@ -89,6 +89,13 @@ export function getConfigSchema(): AdapterConfigSchema {
|
||||
label: "Retain Jobs",
|
||||
hint: "Skip cleanup of completed Jobs for debugging purposes.",
|
||||
},
|
||||
{
|
||||
type: "toggle",
|
||||
key: "reattachOrphanedJobs",
|
||||
label: "Reattach to Orphaned Jobs",
|
||||
hint: "If a prior K8s Job for the same agent/task/session is still running (e.g. Paperclip restarted mid-run), attach to it and stream its output instead of deleting it and starting a new pod. Default: on.",
|
||||
default: true,
|
||||
},
|
||||
// Resource Limits
|
||||
{
|
||||
type: "text",
|
||||
|
||||
+226
-2
@@ -1,5 +1,44 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { isK8s404, buildPartialRunError } from "./execute.js";
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import type { Writable } from "node:stream";
|
||||
|
||||
// Mock the K8s client before importing execute so streamPodLogsOnce picks up
|
||||
// the mocked getLogApi. The mock's logApi.log never resolves, simulating the
|
||||
// FAR-10 hang: K8s API drops the connection but the client awaits forever.
|
||||
const mockLogFn = vi.fn();
|
||||
vi.mock("./k8s-client.js", () => ({
|
||||
getLogApi: () => ({ log: mockLogFn }),
|
||||
getBatchApi: () => ({}),
|
||||
getCoreApi: () => ({}),
|
||||
getAuthzApi: () => ({}),
|
||||
getSelfPodInfo: vi.fn(),
|
||||
resetCache: vi.fn(),
|
||||
}));
|
||||
|
||||
const { isK8s404, buildPartialRunError, isReattachableOrphan, describePodTerminatedError, streamPodLogsOnce } = await import("./execute.js");
|
||||
|
||||
function makeJob(opts: {
|
||||
runId?: string;
|
||||
agentId?: string;
|
||||
taskId?: string;
|
||||
sessionId?: string;
|
||||
adapterType?: string;
|
||||
terminal?: boolean;
|
||||
}): k8s.V1Job {
|
||||
const labels: Record<string, string> = {
|
||||
"paperclip.io/adapter-type": opts.adapterType ?? "claude_k8s",
|
||||
};
|
||||
if (opts.agentId) labels["paperclip.io/agent-id"] = opts.agentId;
|
||||
if (opts.runId) labels["paperclip.io/run-id"] = opts.runId;
|
||||
if (opts.taskId) labels["paperclip.io/task-id"] = opts.taskId;
|
||||
if (opts.sessionId) labels["paperclip.io/session-id"] = opts.sessionId;
|
||||
return {
|
||||
metadata: { name: "ac-job", namespace: "paperclip", labels },
|
||||
status: opts.terminal
|
||||
? { conditions: [{ type: "Complete", status: "True" }] }
|
||||
: { conditions: [] },
|
||||
} as k8s.V1Job;
|
||||
}
|
||||
|
||||
describe("isK8s404", () => {
|
||||
it("returns false for non-Error values", () => {
|
||||
@@ -106,3 +145,188 @@ describe("buildPartialRunError", () => {
|
||||
expect(msg).toBe("Claude exited with code 1: real error line");
|
||||
});
|
||||
});
|
||||
|
||||
describe("isReattachableOrphan", () => {
|
||||
const agentId = "agent-abc";
|
||||
const taskId = "task-xyz";
|
||||
const sessionId = "sess-123";
|
||||
|
||||
it("returns true when agent/task/session all match and Job is not terminal", () => {
|
||||
const job = makeJob({ agentId, taskId, sessionId, runId: "old-run" });
|
||||
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(true);
|
||||
});
|
||||
|
||||
it("returns false when the Job is already Complete", () => {
|
||||
const job = makeJob({ agentId, taskId, sessionId, runId: "old-run", terminal: true });
|
||||
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false when expected taskId is null (caller couldn't derive one)", () => {
|
||||
const job = makeJob({ agentId, taskId, sessionId });
|
||||
expect(isReattachableOrphan(job, { agentId, taskId: null, sessionId })).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false when expected sessionId is null", () => {
|
||||
const job = makeJob({ agentId, taskId, sessionId });
|
||||
expect(isReattachableOrphan(job, { agentId, taskId, sessionId: null })).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false when agent id doesn't match", () => {
|
||||
const job = makeJob({ agentId: "agent-other", taskId, sessionId });
|
||||
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false when task id doesn't match", () => {
|
||||
const job = makeJob({ agentId, taskId: "task-other", sessionId });
|
||||
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false when session id doesn't match", () => {
|
||||
const job = makeJob({ agentId, taskId, sessionId: "sess-other" });
|
||||
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false when the Job is from a different adapter type", () => {
|
||||
const job = makeJob({ agentId, taskId, sessionId, adapterType: "claude_local" });
|
||||
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false when Job has no task-id label (labels were introduced in FAR-124)", () => {
|
||||
const job = makeJob({ agentId, sessionId });
|
||||
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false when Job has no session-id label", () => {
|
||||
const job = makeJob({ agentId, taskId });
|
||||
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
// Regression: FAR-10 — waitForPod must throw on phase=Failed, not return the pod name.
|
||||
// These tests cover describePodTerminatedError, the helper that waitForPod uses to build
|
||||
// the error message before throwing. Verifies that phase=Failed with no claude logs
|
||||
// produces a structured, actionable error instead of silently entering the log-stream path.
|
||||
describe("describePodTerminatedError", () => {
|
||||
it("includes exit code and reason when claude container status is available", () => {
|
||||
const cs = [
|
||||
{
|
||||
name: "claude",
|
||||
state: { terminated: { exitCode: 137, reason: "OOMKilled" } },
|
||||
},
|
||||
] as k8s.V1ContainerStatus[];
|
||||
const msg = describePodTerminatedError("mypod", "Failed", cs);
|
||||
expect(msg).toContain("137");
|
||||
expect(msg).toContain("OOMKilled");
|
||||
expect(msg).toContain("phase=Failed");
|
||||
});
|
||||
|
||||
it("falls back to message field when reason is absent", () => {
|
||||
const cs = [
|
||||
{
|
||||
name: "claude",
|
||||
state: { terminated: { exitCode: 1, message: "signal: killed" } },
|
||||
},
|
||||
] as k8s.V1ContainerStatus[];
|
||||
const msg = describePodTerminatedError("mypod", "Failed", cs);
|
||||
expect(msg).toContain("signal: killed");
|
||||
expect(msg).toContain("1");
|
||||
});
|
||||
|
||||
it("returns generic message when no claude container status is present", () => {
|
||||
const msg = describePodTerminatedError("mypod", "Failed", []);
|
||||
expect(msg).toBe("Pod mypod reached phase=Failed");
|
||||
});
|
||||
|
||||
it("ignores non-claude containers", () => {
|
||||
const cs = [
|
||||
{
|
||||
name: "sidecar",
|
||||
state: { terminated: { exitCode: 0, reason: "Completed" } },
|
||||
},
|
||||
] as k8s.V1ContainerStatus[];
|
||||
const msg = describePodTerminatedError("mypod", "Failed", cs);
|
||||
expect(msg).toBe("Pod mypod reached phase=Failed");
|
||||
});
|
||||
|
||||
it("handles null exitCode gracefully", () => {
|
||||
const cs = [
|
||||
{
|
||||
name: "claude",
|
||||
state: { terminated: { exitCode: null, reason: "Error" } },
|
||||
},
|
||||
] as unknown as k8s.V1ContainerStatus[];
|
||||
const msg = describePodTerminatedError("mypod", "Failed", cs);
|
||||
expect(msg).toContain("unknown");
|
||||
expect(msg).toContain("Error");
|
||||
});
|
||||
});
|
||||
|
||||
// Regression: FAR-10 hardening — streamPodLogsOnce must not hang forever when
|
||||
// the K8s client's logApi.log call never resolves. When stopSignal fires, the
|
||||
// bail timer must force-return within LOG_STREAM_BAIL_TIMEOUT_MS (3s in the
|
||||
// implementation) so execute() does not get stuck waiting for a dead stream.
|
||||
describe("streamPodLogsOnce bail timer", () => {
|
||||
beforeEach(() => {
|
||||
mockLogFn.mockReset();
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("returns within the bail window when stopSignal fires during a hung log call", async () => {
|
||||
// logApi.log never resolves — simulates the FAR-10 hang where the K8s
|
||||
// response stream stalls without closing the connection.
|
||||
mockLogFn.mockImplementation((_ns, _pod, _ctr, _writable: Writable) => {
|
||||
return new Promise(() => { /* never resolves */ });
|
||||
});
|
||||
|
||||
const stopSignal = { stopped: false };
|
||||
const onLog = vi.fn().mockResolvedValue(undefined);
|
||||
|
||||
const resultPromise = streamPodLogsOnce(
|
||||
"default",
|
||||
"mypod",
|
||||
onLog,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
stopSignal,
|
||||
);
|
||||
|
||||
// Fire stopSignal; let the 200ms poller tick and start the bail timer.
|
||||
stopSignal.stopped = true;
|
||||
await vi.advanceTimersByTimeAsync(300);
|
||||
|
||||
// Advance past the 3s bail timeout. streamPodLogsOnce must now resolve
|
||||
// with an empty string (no chunks were captured) rather than hanging.
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
|
||||
const result = await resultPromise;
|
||||
expect(result).toBe("");
|
||||
expect(mockLogFn).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it("returns promptly if logApi.log resolves before stopSignal fires (happy path, no bail involved)", async () => {
|
||||
mockLogFn.mockImplementation(async (_ns, _pod, _ctr, _writable: Writable) => {
|
||||
// Resolve immediately — normal log-stream completion.
|
||||
return undefined;
|
||||
});
|
||||
|
||||
const onLog = vi.fn().mockResolvedValue(undefined);
|
||||
|
||||
// No stopSignal → no bail machinery engaged.
|
||||
const result = await streamPodLogsOnce(
|
||||
"default",
|
||||
"mypod",
|
||||
onLog,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
);
|
||||
|
||||
expect(result).toBe("");
|
||||
expect(mockLogFn).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
+352
-89
@@ -1,5 +1,15 @@
|
||||
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
|
||||
import { asString, asNumber, asBoolean, parseObject } from "@paperclipai/adapter-utils/server-utils";
|
||||
import {
|
||||
asString,
|
||||
asNumber,
|
||||
asBoolean,
|
||||
parseObject,
|
||||
readPaperclipRuntimeSkillEntries,
|
||||
resolvePaperclipDesiredSkillNames,
|
||||
} from "@paperclipai/adapter-utils/server-utils";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { prepareClaudePromptBundle } from "./prompt-cache.js";
|
||||
import {
|
||||
parseClaudeStreamJson,
|
||||
describeClaudeFailure,
|
||||
@@ -7,7 +17,8 @@ import {
|
||||
isClaudeUnknownSessionError,
|
||||
} from "./parse.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
|
||||
import { buildJobManifest } from "./job-manifest.js";
|
||||
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
|
||||
import { LogLineDedupFilter } from "./log-dedup.js";
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import { Writable } from "node:stream";
|
||||
|
||||
@@ -15,6 +26,15 @@ const POLL_INTERVAL_MS = 2000;
|
||||
const KEEPALIVE_INTERVAL_MS = 15_000;
|
||||
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
|
||||
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
|
||||
// How long to keep refreshing onSpawn after the Job reaches a terminal state.
|
||||
// Covers the cleanup path (delete job, parse stdout) so a slow K8s API call
|
||||
// doesn't trip the 5-minute reaper staleness window.
|
||||
const POST_TERMINAL_KEEPALIVE_MS = 90_000;
|
||||
// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires
|
||||
// before force-returning, even if logApi.log has not yet resolved. Defensive
|
||||
// against the K8s client library not propagating writable.destroy() into an
|
||||
// abort of the underlying HTTP request.
|
||||
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
|
||||
|
||||
/**
|
||||
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
|
||||
@@ -69,6 +89,53 @@ export function buildPartialRunError(
|
||||
: `Claude exited with code ${exitCode ?? -1}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does
|
||||
* not match the current runId) as a potential reattach target. A Job is
|
||||
* reattachable when it belongs to the same agent, same task, and same resume
|
||||
* session as the current run — meaning the previous Paperclip instance was
|
||||
* mid-stream on the exact piece of work this new run was dispatched to do.
|
||||
* Exported for unit tests.
|
||||
*/
|
||||
export function isReattachableOrphan(
|
||||
job: k8s.V1Job,
|
||||
expected: { agentId: string; taskId: string | null; sessionId: string | null },
|
||||
): boolean {
|
||||
if (!expected.taskId || !expected.sessionId) return false;
|
||||
const labels = job.metadata?.labels ?? {};
|
||||
if (labels["paperclip.io/adapter-type"] !== "claude_k8s") return false;
|
||||
if (labels["paperclip.io/agent-id"] !== expected.agentId) return false;
|
||||
if (labels["paperclip.io/task-id"] !== expected.taskId) return false;
|
||||
if (labels["paperclip.io/session-id"] !== expected.sessionId) return false;
|
||||
const conditions = job.status?.conditions ?? [];
|
||||
const terminal = conditions.some(
|
||||
(c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True",
|
||||
);
|
||||
if (terminal) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an error message for a pod that reached phase=Failed before or
|
||||
* instead of streaming logs. Includes the claude container's terminated exit
|
||||
* code and reason when available so operators can diagnose crashes without
|
||||
* needing kubectl. Exported for unit tests.
|
||||
*/
|
||||
export function describePodTerminatedError(
|
||||
podName: string,
|
||||
phase: string,
|
||||
containerStatuses: k8s.V1ContainerStatus[],
|
||||
): string {
|
||||
const mainCs = containerStatuses.find((cs) => cs.name === "claude");
|
||||
const terminated = mainCs?.state?.terminated;
|
||||
if (terminated) {
|
||||
const code = terminated.exitCode ?? "unknown";
|
||||
const reason = terminated.reason ?? terminated.message ?? "no reason";
|
||||
return `Pod ${podName} reached phase=${phase}: claude exited ${code} (${reason})`;
|
||||
}
|
||||
return `Pod ${podName} reached phase=${phase}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the Job's pod to reach a terminal or running state.
|
||||
* Returns the pod name once logs can be streamed, or throws on failure.
|
||||
@@ -120,15 +187,22 @@ async function waitForPod(
|
||||
for (const cs of containerStatuses) {
|
||||
if (cs.state?.waiting) details.push(`${cs.name}: waiting (${cs.state.waiting.reason ?? "unknown"})`);
|
||||
else if (cs.state?.running) details.push(`${cs.name}: running`);
|
||||
else if (cs.state?.terminated) details.push(`${cs.name}: terminated (exit ${cs.state.terminated.exitCode ?? "?"}, ${cs.state.terminated.reason ?? "no reason"})`);
|
||||
}
|
||||
await onLog("stdout", `[paperclip] Pod ${podName}: ${details.join(", ")}\n`);
|
||||
lastStatus = statusKey;
|
||||
}
|
||||
|
||||
// Ready to stream logs
|
||||
if (phase === "Running" || phase === "Succeeded" || phase === "Failed") {
|
||||
if (phase === "Running" || phase === "Succeeded") {
|
||||
return podName;
|
||||
}
|
||||
// phase=Failed means the pod crashed before we could stream logs.
|
||||
// Throwing here routes the caller into the error path with a structured
|
||||
// message instead of entering the log-streaming path with a dead pod.
|
||||
if (phase === "Failed") {
|
||||
throw new Error(describePodTerminatedError(podName, phase, containerStatuses));
|
||||
}
|
||||
|
||||
// Init containers done + main running (phase may still say Pending briefly)
|
||||
const allInitsDone = initStatuses.length > 0 && initStatuses.every(
|
||||
@@ -184,12 +258,14 @@ async function waitForPod(
|
||||
* Stream pod logs once via follow. Returns accumulated stdout when the
|
||||
* stream ends (container exit, API disconnect, or abort signal).
|
||||
*/
|
||||
async function streamPodLogsOnce(
|
||||
export async function streamPodLogsOnce(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
sinceSeconds?: number,
|
||||
dedup?: LogLineDedupFilter,
|
||||
stopSignal?: { stopped: boolean },
|
||||
): Promise<string> {
|
||||
const logApi = getLogApi(kubeconfigPath);
|
||||
const chunks: string[] = [];
|
||||
@@ -198,19 +274,57 @@ async function streamPodLogsOnce(
|
||||
write(chunk: Buffer, _encoding, callback) {
|
||||
const text = chunk.toString("utf-8");
|
||||
chunks.push(text);
|
||||
void onLog("stdout", text).then(() => callback(), callback);
|
||||
const emitted = dedup ? dedup.filter(text) : text;
|
||||
if (!emitted) {
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
void onLog("stdout", emitted).then(() => callback(), callback);
|
||||
},
|
||||
});
|
||||
|
||||
// When the job completion signal fires, destroy the writable to abort the
|
||||
// in-flight follow stream. Without this, logApi.log can hang indefinitely
|
||||
// when the pod terminates without closing the HTTP connection cleanly.
|
||||
let stopPoller: ReturnType<typeof setInterval> | null = null;
|
||||
let bailTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let bailResolve: (() => void) | null = null;
|
||||
// Bail promise resolves LOG_STREAM_BAIL_TIMEOUT_MS after stopSignal fires,
|
||||
// even if logApi.log has not resolved by then. This is a safety net for the
|
||||
// case where writable.destroy() fails to propagate to an abort of the HTTP
|
||||
// request (e.g. the K8s client is awaiting a response that never comes).
|
||||
const bailPromise = new Promise<void>((resolve) => {
|
||||
bailResolve = resolve;
|
||||
});
|
||||
if (stopSignal) {
|
||||
stopPoller = setInterval(() => {
|
||||
if (stopSignal.stopped) {
|
||||
if (!writable.destroyed) writable.destroy();
|
||||
if (!bailTimer && bailResolve) {
|
||||
bailTimer = setTimeout(bailResolve, LOG_STREAM_BAIL_TIMEOUT_MS);
|
||||
}
|
||||
}
|
||||
}, 200);
|
||||
}
|
||||
|
||||
const logPromise = logApi.log(namespace, podName, "claude", writable, {
|
||||
follow: true,
|
||||
pretty: false,
|
||||
...(sinceSeconds ? { sinceSeconds } : {}),
|
||||
}).catch(() => {
|
||||
// follow may fail if the container already exited, the API connection
|
||||
// dropped, or we aborted via writable.destroy() — not fatal.
|
||||
});
|
||||
|
||||
try {
|
||||
await logApi.log(namespace, podName, "claude", writable, {
|
||||
follow: true,
|
||||
pretty: false,
|
||||
...(sinceSeconds ? { sinceSeconds } : {}),
|
||||
});
|
||||
} catch {
|
||||
// follow may fail if the container already exited or the API
|
||||
// connection dropped — not fatal, caller decides whether to retry.
|
||||
if (stopSignal) {
|
||||
await Promise.race([logPromise, bailPromise]);
|
||||
} else {
|
||||
await logPromise;
|
||||
}
|
||||
} finally {
|
||||
if (stopPoller) clearInterval(stopPoller);
|
||||
if (bailTimer) clearTimeout(bailTimer);
|
||||
}
|
||||
|
||||
return chunks.join("");
|
||||
@@ -238,6 +352,9 @@ async function streamPodLogs(
|
||||
// reconnects use a tight window instead of an ever-growing one anchored
|
||||
// at stream start. This is the primary fix for FAR-105 duplicative logs.
|
||||
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
// Shared across reconnects so replayed lines inside the `sinceSeconds`
|
||||
// overlap window are dropped before they reach the streaming UI (FAR-123).
|
||||
const dedup = new LogLineDedupFilter();
|
||||
|
||||
while (!stopSignal?.stopped) {
|
||||
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
|
||||
@@ -257,7 +374,7 @@ async function streamPodLogs(
|
||||
}
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
@@ -277,6 +394,11 @@ async function streamPodLogs(
|
||||
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
|
||||
}
|
||||
|
||||
// Flush any buffered partial line so the final assistant/result chunk
|
||||
// isn't dropped when the stream ends mid-line.
|
||||
const tail = dedup.flush();
|
||||
if (tail) await onLog("stdout", tail);
|
||||
|
||||
return allChunks.join("");
|
||||
}
|
||||
|
||||
@@ -396,10 +518,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
|
||||
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
|
||||
// still be running. We detect those by comparing the Job's run-id label against
|
||||
// the current runId and clean them up so this execution can proceed.
|
||||
// the current runId. When reattachOrphanedJobs is enabled and the orphan matches
|
||||
// the current agent+task+session, we attach to it instead of deleting it (FAR-124).
|
||||
const agentId = ctx.agent.id;
|
||||
const selfPod = await getSelfPodInfo(kubeconfigPath);
|
||||
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
|
||||
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
|
||||
const runtimeSessionParams = parseObject(runtime.sessionParams);
|
||||
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
|
||||
const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null;
|
||||
const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, "");
|
||||
const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null;
|
||||
let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null;
|
||||
try {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
const existing = await batchApi.listNamespacedJob({
|
||||
@@ -419,10 +549,42 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
(j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId,
|
||||
);
|
||||
|
||||
if (orphaned.length > 0) {
|
||||
const orphanNames = orphaned.map((j) => j.metadata?.name).join(", ");
|
||||
await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
|
||||
for (const j of orphaned) {
|
||||
// Pick the most recent reattachable orphan — same agent + task + session,
|
||||
// not terminal. Only one target is chosen; any other orphans get
|
||||
// cleaned up as before.
|
||||
if (reattachOrphanedJobs && orphaned.length > 0) {
|
||||
const candidates = orphaned
|
||||
.filter((j) =>
|
||||
isReattachableOrphan(j, {
|
||||
agentId,
|
||||
taskId: currentTaskLabel,
|
||||
sessionId: currentSessionLabel,
|
||||
}),
|
||||
)
|
||||
.sort((a, b) => {
|
||||
const at = new Date(a.metadata?.creationTimestamp ?? 0).getTime();
|
||||
const bt = new Date(b.metadata?.creationTimestamp ?? 0).getTime();
|
||||
return bt - at;
|
||||
});
|
||||
const chosen = candidates[0];
|
||||
const chosenName = chosen?.metadata?.name;
|
||||
if (chosen && chosenName) {
|
||||
reattachTarget = {
|
||||
jobName: chosenName,
|
||||
namespace: chosen.metadata?.namespace ?? guardNamespace,
|
||||
priorRunId: chosen.metadata?.labels?.["paperclip.io/run-id"] ?? "",
|
||||
image: chosen.spec?.template?.spec?.containers?.[0]?.image ?? "unknown",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const toDelete = orphaned.filter(
|
||||
(j) => !reattachTarget || j.metadata?.name !== reattachTarget.jobName,
|
||||
);
|
||||
if (toDelete.length > 0) {
|
||||
const orphanNames = toDelete.map((j) => j.metadata?.name).join(", ");
|
||||
await onLog("stdout", `[paperclip] Cleaning up ${toDelete.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
|
||||
for (const j of toDelete) {
|
||||
const name = j.metadata?.name;
|
||||
if (name) {
|
||||
await cleanupJob(guardNamespace, name, onLog, kubeconfigPath);
|
||||
@@ -460,84 +622,152 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
};
|
||||
}
|
||||
|
||||
// Build Job manifest
|
||||
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
|
||||
ctx,
|
||||
selfPod,
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
|
||||
let jobName: string;
|
||||
let namespace: string;
|
||||
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
|
||||
|
||||
// Prepare the prompt bundle (skills + instructions) on the server filesystem.
|
||||
// The K8s Job pod mounts the same PVC at /paperclip, so bundle paths written
|
||||
// here are accessible inside the pod at the identical absolute path.
|
||||
const skillEntries = await readPaperclipRuntimeSkillEntries(config, import.meta.dirname ?? __dirname);
|
||||
const desiredSkillNames = new Set(resolvePaperclipDesiredSkillNames(config, skillEntries));
|
||||
const desiredSkills = skillEntries.filter((e) => desiredSkillNames.has(e.key));
|
||||
const instructionsFilePath = asString(config.instructionsFilePath, "").trim();
|
||||
const instructionsFileDir = instructionsFilePath ? `${path.dirname(instructionsFilePath)}/` : "";
|
||||
let instructionsContents: string | null = null;
|
||||
if (instructionsFilePath) {
|
||||
try {
|
||||
const raw = await fs.readFile(instructionsFilePath, "utf-8");
|
||||
const pathDirective =
|
||||
`\nThe above agent instructions were loaded from ${instructionsFilePath}. ` +
|
||||
`Resolve any relative file references from ${instructionsFileDir}. ` +
|
||||
`This base directory is authoritative for sibling instruction files such as ` +
|
||||
`./HEARTBEAT.md, ./SOUL.md, and ./TOOLS.md; do not resolve those from the parent agent directory.`;
|
||||
instructionsContents = raw + pathDirective;
|
||||
} catch (err) {
|
||||
await onLog(
|
||||
"stderr",
|
||||
`[paperclip] Warning: could not read agent instructions file "${instructionsFilePath}": ${err instanceof Error ? err.message : String(err)}\n`,
|
||||
);
|
||||
}
|
||||
}
|
||||
const promptBundle = await prepareClaudePromptBundle({
|
||||
companyId: ctx.agent.companyId,
|
||||
skills: desiredSkills,
|
||||
instructionsContents,
|
||||
onLog,
|
||||
});
|
||||
|
||||
// Report invocation metadata
|
||||
if (onMeta) {
|
||||
await onMeta({
|
||||
adapterType: "claude_k8s",
|
||||
command: `kubectl job/${jobName}`,
|
||||
cwd: namespace,
|
||||
commandArgs: claudeArgs,
|
||||
commandNotes: [
|
||||
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
|
||||
`Namespace: ${namespace}`,
|
||||
`Timeout: ${timeoutSec}s`,
|
||||
],
|
||||
prompt,
|
||||
...(promptMetrics ? { promptMetrics } : {}),
|
||||
context: ctx.context,
|
||||
} as Parameters<typeof onMeta>[0]);
|
||||
}
|
||||
if (reattachTarget) {
|
||||
jobName = reattachTarget.jobName;
|
||||
namespace = reattachTarget.namespace;
|
||||
|
||||
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
|
||||
// PodSpec limit). The Secret is cleaned up in the finally block.
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
if (promptSecret) {
|
||||
try {
|
||||
await coreApi.createNamespacedSecret({
|
||||
namespace: promptSecret.namespace,
|
||||
body: {
|
||||
apiVersion: "v1",
|
||||
kind: "Secret",
|
||||
metadata: {
|
||||
name: promptSecret.name,
|
||||
namespace: promptSecret.namespace,
|
||||
labels: {
|
||||
"app.kubernetes.io/managed-by": "paperclip",
|
||||
"paperclip.io/adapter-type": "claude_k8s",
|
||||
"paperclip.io/run-id": runId,
|
||||
// Announce reattach metadata. Prompt and args aren't known here — they
|
||||
// belong to the prior run that created this pod and are already present
|
||||
// on the running container.
|
||||
if (onMeta) {
|
||||
await onMeta({
|
||||
adapterType: "claude_k8s",
|
||||
command: `kubectl job/${jobName}`,
|
||||
cwd: namespace,
|
||||
commandArgs: [],
|
||||
commandNotes: [
|
||||
`Image: ${reattachTarget.image}`,
|
||||
`Namespace: ${namespace}`,
|
||||
`Reattached from prior run: ${reattachTarget.priorRunId || "unknown"}`,
|
||||
`Timeout: ${timeoutSec}s`,
|
||||
],
|
||||
prompt: "",
|
||||
context: ctx.context,
|
||||
} as Parameters<typeof onMeta>[0]);
|
||||
}
|
||||
|
||||
await onLog("stdout", `[paperclip] Reattaching to in-flight K8s Job ${jobName} in namespace ${namespace} (prior run ${reattachTarget.priorRunId || "unknown"})\n`);
|
||||
} else {
|
||||
// Build Job manifest
|
||||
const built = buildJobManifest({ ctx, selfPod, promptBundle });
|
||||
const job = built.job;
|
||||
jobName = built.jobName;
|
||||
namespace = built.namespace;
|
||||
const prompt = built.prompt;
|
||||
const claudeArgs = built.claudeArgs;
|
||||
const promptMetrics = built.promptMetrics;
|
||||
promptSecret = built.promptSecret;
|
||||
|
||||
// Report invocation metadata
|
||||
if (onMeta) {
|
||||
await onMeta({
|
||||
adapterType: "claude_k8s",
|
||||
command: `kubectl job/${jobName}`,
|
||||
cwd: namespace,
|
||||
commandArgs: claudeArgs,
|
||||
commandNotes: [
|
||||
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
|
||||
`Namespace: ${namespace}`,
|
||||
`Timeout: ${timeoutSec}s`,
|
||||
],
|
||||
prompt,
|
||||
...(promptMetrics ? { promptMetrics } : {}),
|
||||
context: ctx.context,
|
||||
} as Parameters<typeof onMeta>[0]);
|
||||
}
|
||||
|
||||
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
|
||||
// PodSpec limit). The Secret is cleaned up in the finally block.
|
||||
if (promptSecret) {
|
||||
try {
|
||||
await coreApi.createNamespacedSecret({
|
||||
namespace: promptSecret.namespace,
|
||||
body: {
|
||||
apiVersion: "v1",
|
||||
kind: "Secret",
|
||||
metadata: {
|
||||
name: promptSecret.name,
|
||||
namespace: promptSecret.namespace,
|
||||
labels: {
|
||||
"app.kubernetes.io/managed-by": "paperclip",
|
||||
"paperclip.io/adapter-type": "claude_k8s",
|
||||
"paperclip.io/run-id": runId,
|
||||
},
|
||||
},
|
||||
stringData: promptSecret.data,
|
||||
},
|
||||
stringData: promptSecret.data,
|
||||
},
|
||||
});
|
||||
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
|
||||
});
|
||||
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Failed to create prompt Secret: ${msg}`,
|
||||
errorCode: "k8s_prompt_secret_create_failed",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Create the Job
|
||||
try {
|
||||
await batchApi.createNamespacedJob({ namespace, body: job });
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
|
||||
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Failed to create prompt Secret: ${msg}`,
|
||||
errorCode: "k8s_prompt_secret_create_failed",
|
||||
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
|
||||
errorCode: "k8s_job_create_failed",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Create the Job
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
try {
|
||||
await batchApi.createNamespacedJob({ namespace, body: job });
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
|
||||
errorCode: "k8s_job_create_failed",
|
||||
};
|
||||
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
|
||||
}
|
||||
|
||||
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
|
||||
|
||||
let stdout = "";
|
||||
let exitCode: number | null = null;
|
||||
let jobTimedOut = false;
|
||||
@@ -551,8 +781,23 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
|
||||
let podName: string;
|
||||
try {
|
||||
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
|
||||
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
|
||||
if (reattachTarget) {
|
||||
// Pod is already running from the prior run — look it up directly.
|
||||
const podList = await coreApi.listNamespacedPod({
|
||||
namespace,
|
||||
labelSelector: `job-name=${jobName}`,
|
||||
});
|
||||
const pod = podList.items[0];
|
||||
const name = pod?.metadata?.name;
|
||||
if (!name) {
|
||||
throw new Error(`Reattach target Job ${jobName} has no pod`);
|
||||
}
|
||||
podName = name;
|
||||
await onLog("stdout", `[paperclip] Reattached to pod ${podName}\n`);
|
||||
} else {
|
||||
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
|
||||
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
|
||||
}
|
||||
|
||||
// Notify the server that execution has started. This sets
|
||||
// processStartedAt and refreshes updatedAt in the DB, which the
|
||||
@@ -566,13 +811,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
}
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Pod scheduling failed: ${msg}\n`);
|
||||
const phase = reattachTarget ? "reattach" : "scheduling";
|
||||
await onLog("stderr", `[paperclip] Pod ${phase} failed: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Pod scheduling failed: ${msg}`,
|
||||
errorCode: "k8s_pod_schedule_failed",
|
||||
errorMessage: `Pod ${phase} failed: ${msg}`,
|
||||
errorCode: reattachTarget ? "k8s_pod_reattach_failed" : "k8s_pod_schedule_failed",
|
||||
};
|
||||
}
|
||||
|
||||
@@ -606,11 +852,27 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
let lastLogAt = Date.now();
|
||||
let keepaliveTick = 0;
|
||||
let keepaliveJobTerminal = false;
|
||||
let keepaliveJobTerminalAt: number | null = null;
|
||||
keepaliveTimer = setInterval(() => {
|
||||
// Fire-and-forget the async work; setInterval callbacks must be
|
||||
// synchronous or the timer will drift.
|
||||
void (async () => {
|
||||
if (keepaliveJobTerminal) return;
|
||||
if (keepaliveJobTerminal) {
|
||||
// Post-terminal window: keep refreshing onSpawn during cleanup
|
||||
// (job deletion, log parsing, K8s API calls) so the reaper doesn't
|
||||
// fire a false process_lost while execute() is still running.
|
||||
if (
|
||||
ctx.onSpawn &&
|
||||
keepaliveJobTerminalAt !== null &&
|
||||
Date.now() - keepaliveJobTerminalAt <= POST_TERMINAL_KEEPALIVE_MS
|
||||
) {
|
||||
keepaliveTick++;
|
||||
if (keepaliveTick % 6 === 0) {
|
||||
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Verify the Job is still alive before announcing or refreshing.
|
||||
try {
|
||||
@@ -620,6 +882,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
);
|
||||
if (terminal) {
|
||||
keepaliveJobTerminal = true;
|
||||
keepaliveJobTerminalAt = Date.now();
|
||||
return;
|
||||
}
|
||||
} catch (err: unknown) {
|
||||
@@ -629,6 +892,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// window as a safety net.
|
||||
if (isK8s404(err)) {
|
||||
keepaliveJobTerminal = true;
|
||||
keepaliveJobTerminalAt = Date.now();
|
||||
return;
|
||||
}
|
||||
// Log transient errors but leave keepaliveJobTerminal false so
|
||||
@@ -811,8 +1075,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
};
|
||||
})();
|
||||
|
||||
const runtimeSessionParams = parseObject(runtime.sessionParams);
|
||||
const fallbackSessionId = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
|
||||
const fallbackSessionId = currentSessionIdRaw;
|
||||
const resolvedSessionId = parsedStream.sessionId
|
||||
?? (asString(parsed.session_id as string, fallbackSessionId) || fallbackSessionId);
|
||||
const model = asString(config.model, "");
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { describe, it, expect, beforeEach } from "vitest";
|
||||
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
|
||||
import { buildJobManifest, buildRtkSetupCommands } from "./job-manifest.js";
|
||||
import { buildJobManifest, buildRtkSetupCommands, sanitizeLabelValue } from "./job-manifest.js";
|
||||
import type { SelfPodInfo } from "./k8s-client.js";
|
||||
|
||||
function makeCtx(overrides: Partial<AdapterExecutionContext> = {}): AdapterExecutionContext {
|
||||
@@ -136,6 +136,36 @@ describe("buildJobManifest", () => {
|
||||
expect(job.metadata?.labels?.env).toBe("prod");
|
||||
expect(job.metadata?.labels?.["paperclip.io/adapter-type"]).toBe("claude_k8s");
|
||||
});
|
||||
|
||||
it("adds task-id label when context provides taskId", () => {
|
||||
ctx.context = { taskId: "task-xyz-789" };
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBe("task-xyz-789");
|
||||
});
|
||||
|
||||
it("falls back to issueId when taskId absent", () => {
|
||||
ctx.context = { issueId: "issue-42" };
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBe("issue-42");
|
||||
});
|
||||
|
||||
it("adds session-id label when runtime provides sessionId", () => {
|
||||
ctx.runtime = { ...ctx.runtime, sessionId: "sess-abc-1234" };
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBe("sess-abc-1234");
|
||||
});
|
||||
|
||||
it("reads sessionId from runtime.sessionParams when sessionId prop missing", () => {
|
||||
ctx.runtime = { ...ctx.runtime, sessionParams: { sessionId: "sess-from-params" } };
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBe("sess-from-params");
|
||||
});
|
||||
|
||||
it("omits task-id and session-id labels when neither is provided", () => {
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBeUndefined();
|
||||
expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe("annotations", () => {
|
||||
@@ -487,13 +517,66 @@ describe("buildJobManifest", () => {
|
||||
expect(claudeArgs).toContain("--dangerously-skip-permissions");
|
||||
});
|
||||
|
||||
it("adds --append-system-prompt-file when instructionsFilePath set", () => {
|
||||
it("adds --append-system-prompt-file (config fallback) when instructionsFilePath set and no session", () => {
|
||||
ctx.config = { instructionsFilePath: "/paperclip/instructions.md" };
|
||||
const { claudeArgs } = buildJobManifest({ ctx, selfPod });
|
||||
expect(claudeArgs).toContain("--append-system-prompt-file");
|
||||
expect(claudeArgs).toContain("/paperclip/instructions.md");
|
||||
});
|
||||
|
||||
it("omits --append-system-prompt-file on session resume (avoids token waste)", () => {
|
||||
ctx.config = { instructionsFilePath: "/paperclip/instructions.md" };
|
||||
ctx.runtime.sessionId = "sess_existing";
|
||||
const { claudeArgs } = buildJobManifest({ ctx, selfPod });
|
||||
expect(claudeArgs).not.toContain("--append-system-prompt-file");
|
||||
});
|
||||
|
||||
it("adds --add-dir when promptBundle is provided", () => {
|
||||
const promptBundle = {
|
||||
bundleKey: "abc123",
|
||||
rootDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
|
||||
addDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
|
||||
instructionsFilePath: null,
|
||||
};
|
||||
const { claudeArgs } = buildJobManifest({ ctx, selfPod, promptBundle });
|
||||
expect(claudeArgs).toContain("--add-dir");
|
||||
expect(claudeArgs).toContain(promptBundle.addDir);
|
||||
});
|
||||
|
||||
it("uses bundle instructionsFilePath for --append-system-prompt-file when promptBundle provided", () => {
|
||||
const promptBundle = {
|
||||
bundleKey: "abc123",
|
||||
rootDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
|
||||
addDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
|
||||
instructionsFilePath: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123/agent-instructions.md",
|
||||
};
|
||||
ctx.config = { instructionsFilePath: "/raw/path/AGENTS.md" };
|
||||
const { claudeArgs } = buildJobManifest({ ctx, selfPod, promptBundle });
|
||||
expect(claudeArgs).toContain("--append-system-prompt-file");
|
||||
const idx = claudeArgs.indexOf("--append-system-prompt-file");
|
||||
expect(claudeArgs[idx + 1]).toBe(promptBundle.instructionsFilePath);
|
||||
expect(claudeArgs).not.toContain("/raw/path/AGENTS.md");
|
||||
});
|
||||
|
||||
it("omits --append-system-prompt-file from bundle on session resume", () => {
|
||||
const promptBundle = {
|
||||
bundleKey: "abc123",
|
||||
rootDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
|
||||
addDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
|
||||
instructionsFilePath: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123/agent-instructions.md",
|
||||
};
|
||||
ctx.runtime.sessionId = "sess_existing";
|
||||
const { claudeArgs } = buildJobManifest({ ctx, selfPod, promptBundle });
|
||||
expect(claudeArgs).not.toContain("--append-system-prompt-file");
|
||||
// --add-dir must still be present even on resume
|
||||
expect(claudeArgs).toContain("--add-dir");
|
||||
});
|
||||
|
||||
it("omits --add-dir when no promptBundle", () => {
|
||||
const { claudeArgs } = buildJobManifest({ ctx, selfPod });
|
||||
expect(claudeArgs).not.toContain("--add-dir");
|
||||
});
|
||||
|
||||
it("appends extraArgs when configured", () => {
|
||||
ctx.config = { extraArgs: ["--no-input", "--verbose"] };
|
||||
const { claudeArgs } = buildJobManifest({ ctx, selfPod });
|
||||
@@ -729,3 +812,32 @@ describe("buildJobManifest", () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("sanitizeLabelValue", () => {
|
||||
it("passes through already-valid UUIDs and slugs", () => {
|
||||
expect(sanitizeLabelValue("abc-123-def")).toBe("abc-123-def");
|
||||
expect(sanitizeLabelValue("0d8b4472-c42c-4052-aab1-e32897909afa")).toBe("0d8b4472-c42c-4052-aab1-e32897909afa");
|
||||
});
|
||||
|
||||
it("strips characters outside [a-zA-Z0-9._-]", () => {
|
||||
expect(sanitizeLabelValue("task:xyz/123")).toBe("taskxyz123");
|
||||
expect(sanitizeLabelValue("abc 123")).toBe("abc123");
|
||||
});
|
||||
|
||||
it("trims leading/trailing non-alphanumeric characters", () => {
|
||||
expect(sanitizeLabelValue("--abc--")).toBe("abc");
|
||||
expect(sanitizeLabelValue("...123...")).toBe("123");
|
||||
});
|
||||
|
||||
it("truncates to the configured maxLen", () => {
|
||||
const long = "a".repeat(200);
|
||||
const out = sanitizeLabelValue(long, 63);
|
||||
expect(out?.length).toBe(63);
|
||||
});
|
||||
|
||||
it("returns null when no alphanumeric characters remain", () => {
|
||||
expect(sanitizeLabelValue("---")).toBeNull();
|
||||
expect(sanitizeLabelValue("")).toBeNull();
|
||||
expect(sanitizeLabelValue(" ")).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
renderTemplate,
|
||||
} from "@paperclipai/adapter-utils/server-utils";
|
||||
import { createHash } from "node:crypto";
|
||||
import type { ClaudePromptBundle } from "./prompt-cache.js";
|
||||
|
||||
/**
|
||||
* Build the shell command prefix that installs a native Node.js PostToolUse
|
||||
@@ -175,6 +176,8 @@ function parseKeyValueConfig(raw: unknown): Record<string, string> {
|
||||
export interface JobBuildInput {
|
||||
ctx: AdapterExecutionContext;
|
||||
selfPod: SelfPodInfo;
|
||||
/** Prepared prompt bundle (skills + instructions). When provided, --add-dir and --append-system-prompt-file use bundle paths. */
|
||||
promptBundle?: ClaudePromptBundle | null;
|
||||
}
|
||||
|
||||
/** When the prompt exceeds the env-var size limit, the manifest uses a
|
||||
@@ -202,6 +205,17 @@ function sanitizeForK8sName(value: string, maxLen = 16): string {
|
||||
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sanitize a string for use as a Kubernetes label value (RFC 1123 subset:
|
||||
* `[a-zA-Z0-9]([-_.a-zA-Z0-9]*[a-zA-Z0-9])?`, max 63 chars). Returns `null`
|
||||
* when no usable characters remain — the caller should omit the label.
|
||||
*/
|
||||
export function sanitizeLabelValue(value: string, maxLen = 63): string | null {
|
||||
const cleaned = value.replace(/[^a-zA-Z0-9._-]/g, "").slice(0, maxLen);
|
||||
const trimmed = cleaned.replace(/^[^a-zA-Z0-9]+/, "").replace(/[^a-zA-Z0-9]+$/, "");
|
||||
return trimmed.length > 0 ? trimmed : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a short deterministic hash suffix from the raw inputs to avoid
|
||||
* collisions when sanitized slugs happen to be identical.
|
||||
@@ -316,7 +330,7 @@ function buildEnvVars(
|
||||
}
|
||||
|
||||
export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
const { ctx, selfPod } = input;
|
||||
const { ctx, selfPod, promptBundle } = input;
|
||||
const { runId, agent, runtime, config: rawConfig, context } = ctx;
|
||||
const config = parseObject(rawConfig);
|
||||
|
||||
@@ -392,14 +406,22 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
};
|
||||
|
||||
// Build Claude CLI args
|
||||
const instructionsFilePath = asString(config.instructionsFilePath, "").trim();
|
||||
// Prefer the bundle's materialized instructions file over the raw config path.
|
||||
// Never inject --append-system-prompt-file on session resumes — the instructions
|
||||
// are already in the session cache and re-injecting wastes tokens.
|
||||
const rawInstructionsFilePath = asString(config.instructionsFilePath, "").trim();
|
||||
const effectiveInstructionsFilePath =
|
||||
promptBundle?.instructionsFilePath ?? (rawInstructionsFilePath || null);
|
||||
const claudeArgs = ["--print", "-", "--output-format", "stream-json", "--verbose"];
|
||||
if (runtimeSessionId) claudeArgs.push("--resume", runtimeSessionId);
|
||||
if (dangerouslySkipPermissions) claudeArgs.push("--dangerously-skip-permissions");
|
||||
if (model) claudeArgs.push("--model", model);
|
||||
if (effort) claudeArgs.push("--effort", effort);
|
||||
if (maxTurns > 0) claudeArgs.push("--max-turns", String(maxTurns));
|
||||
if (instructionsFilePath) claudeArgs.push("--append-system-prompt-file", instructionsFilePath);
|
||||
if (effectiveInstructionsFilePath && !runtimeSessionId) {
|
||||
claudeArgs.push("--append-system-prompt-file", effectiveInstructionsFilePath);
|
||||
}
|
||||
if (promptBundle) claudeArgs.push("--add-dir", promptBundle.addDir);
|
||||
if (extraArgs.length > 0) claudeArgs.push(...extraArgs);
|
||||
|
||||
// Build env vars
|
||||
@@ -428,6 +450,15 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
"paperclip.io/company-id": agent.companyId,
|
||||
"paperclip.io/adapter-type": "claude_k8s",
|
||||
};
|
||||
// Reattach-target labels: let a future execute() identify this Job as the
|
||||
// continuation of the same logical unit of work (same task + same resume
|
||||
// session) so it can attach to the running pod across a Paperclip restart
|
||||
// instead of deleting it and starting over (FAR-124).
|
||||
const taskIdRaw = asString(context.taskId, "") || asString(context.issueId, "");
|
||||
const taskLabel = taskIdRaw ? sanitizeLabelValue(taskIdRaw) : null;
|
||||
if (taskLabel) labels["paperclip.io/task-id"] = taskLabel;
|
||||
const sessionLabel = runtimeSessionId ? sanitizeLabelValue(runtimeSessionId) : null;
|
||||
if (sessionLabel) labels["paperclip.io/session-id"] = sessionLabel;
|
||||
for (const [key, value] of Object.entries(extraLabels)) {
|
||||
labels[key] = value;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,173 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { LogLineDedupFilter, eventDedupKey } from "./log-dedup.js";
|
||||
|
||||
function assistantEvent(id: string, text: string): string {
|
||||
return JSON.stringify({
|
||||
type: "assistant",
|
||||
session_id: "sess_1",
|
||||
message: {
|
||||
id,
|
||||
content: [{ type: "text", text }],
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function userToolResultEvent(toolUseId: string, content: string): string {
|
||||
return JSON.stringify({
|
||||
type: "user",
|
||||
session_id: "sess_1",
|
||||
message: {
|
||||
content: [{ type: "tool_result", tool_use_id: toolUseId, content }],
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function systemInitEvent(sessionId: string): string {
|
||||
return JSON.stringify({
|
||||
type: "system",
|
||||
subtype: "init",
|
||||
session_id: sessionId,
|
||||
model: "claude-opus-4-7",
|
||||
});
|
||||
}
|
||||
|
||||
function resultEvent(sessionId: string): string {
|
||||
return JSON.stringify({
|
||||
type: "result",
|
||||
subtype: "success",
|
||||
session_id: sessionId,
|
||||
result: "done",
|
||||
total_cost_usd: 0.01,
|
||||
usage: { input_tokens: 1, output_tokens: 1, cache_read_input_tokens: 0 },
|
||||
});
|
||||
}
|
||||
|
||||
describe("eventDedupKey", () => {
|
||||
it("keys assistant events by message.id", () => {
|
||||
const key = eventDedupKey(JSON.parse(assistantEvent("msg_abc", "hi")));
|
||||
expect(key).toBe("assistant:msg_abc");
|
||||
});
|
||||
|
||||
it("keys user tool_result events by tool_use_id", () => {
|
||||
const key = eventDedupKey(JSON.parse(userToolResultEvent("toolu_1", "ok")));
|
||||
expect(key).toBe("user:tool_result:toolu_1");
|
||||
});
|
||||
|
||||
it("keys system init events by session_id", () => {
|
||||
const key = eventDedupKey(JSON.parse(systemInitEvent("sess_xyz")));
|
||||
expect(key).toBe("system:init:sess_xyz");
|
||||
});
|
||||
|
||||
it("keys result events by session_id", () => {
|
||||
const key = eventDedupKey(JSON.parse(resultEvent("sess_xyz")));
|
||||
expect(key).toBe("result:sess_xyz");
|
||||
});
|
||||
|
||||
it("returns null for assistant events missing message.id", () => {
|
||||
const event = { type: "assistant", message: { content: [] } };
|
||||
expect(eventDedupKey(event)).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null for unknown event types", () => {
|
||||
expect(eventDedupKey({ type: "unknown" })).toBeNull();
|
||||
expect(eventDedupKey({})).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe("LogLineDedupFilter", () => {
|
||||
it("passes unique lines through unchanged", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = assistantEvent("msg_1", "hello");
|
||||
const b = assistantEvent("msg_2", "world");
|
||||
expect(filter.filter(`${a}\n${b}\n`)).toBe(`${a}\n${b}\n`);
|
||||
});
|
||||
|
||||
it("drops assistant events replayed with the same message.id", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = assistantEvent("msg_1", "Three nits to fix.");
|
||||
filter.filter(`${a}\n`);
|
||||
expect(filter.filter(`${a}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("drops user tool_result events replayed with the same tool_use_id", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = userToolResultEvent("toolu_abc", "file contents");
|
||||
filter.filter(`${a}\n`);
|
||||
expect(filter.filter(`${a}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("drops system init and result events on replay", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const init = systemInitEvent("sess_1");
|
||||
const result = resultEvent("sess_1");
|
||||
filter.filter(`${init}\n${result}\n`);
|
||||
expect(filter.filter(`${init}\n${result}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("buffers incomplete trailing lines across chunks", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const line = assistantEvent("msg_1", "hello");
|
||||
const mid = Math.floor(line.length / 2);
|
||||
const out1 = filter.filter(line.slice(0, mid));
|
||||
const out2 = filter.filter(line.slice(mid) + "\n");
|
||||
expect(out1).toBe("");
|
||||
expect(out2).toBe(`${line}\n`);
|
||||
});
|
||||
|
||||
it("flush() emits a final incomplete line that was not replayed", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const line = assistantEvent("msg_tail", "no newline");
|
||||
filter.filter(line);
|
||||
expect(filter.flush()).toBe(line);
|
||||
});
|
||||
|
||||
it("flush() drops an incomplete line that was already seen with a newline", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const line = assistantEvent("msg_same", "x");
|
||||
filter.filter(`${line}\n`);
|
||||
filter.filter(line);
|
||||
expect(filter.flush()).toBe("");
|
||||
});
|
||||
|
||||
it("passes non-JSON lines through every time (does not dedup paperclip status)", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const status = "[paperclip] keepalive — job foo running\n";
|
||||
expect(filter.filter(status)).toBe(status);
|
||||
expect(filter.filter(status)).toBe(status);
|
||||
});
|
||||
|
||||
it("dedups structurally identical JSON with identical content (raw fallback)", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
// No recognized type → raw fallback key.
|
||||
const line = JSON.stringify({ foo: "bar", baz: 1 });
|
||||
filter.filter(`${line}\n`);
|
||||
expect(filter.filter(`${line}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("handles multiple complete lines in a single chunk with partial trailing", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = assistantEvent("msg_a", "a");
|
||||
const b = assistantEvent("msg_b", "b");
|
||||
const c = assistantEvent("msg_c", "c");
|
||||
// a and b are complete, c is partial (no trailing newline).
|
||||
const out = filter.filter(`${a}\n${b}\n${c}`);
|
||||
expect(out).toBe(`${a}\n${b}\n`);
|
||||
// Completing c later should emit exactly c.
|
||||
expect(filter.filter("\n")).toBe(`${c}\n`);
|
||||
});
|
||||
|
||||
it("drops the classic FAR-123 replay scenario across reconnects", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const assistantNits = assistantEvent("msg_nits", "Three nits to fix. Let me look at an existing test file...");
|
||||
const assistantWrite = assistantEvent("msg_write", "Now I need to write unit tests");
|
||||
// First stream attempt emits both events.
|
||||
const out1 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
|
||||
expect(out1).toBe(`${assistantNits}\n${assistantWrite}\n`);
|
||||
// Reconnect replays both within the sinceSeconds overlap — filter should drop them.
|
||||
const out2 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
|
||||
expect(out2).toBe("");
|
||||
// And a genuinely new event after the replay should still pass through.
|
||||
const assistantFresh = assistantEvent("msg_fresh", "next turn");
|
||||
expect(filter.filter(`${assistantFresh}\n`)).toBe(`${assistantFresh}\n`);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,146 @@
|
||||
/**
|
||||
* Line-level dedup filter for the K8s log stream.
|
||||
*
|
||||
* The K8s log follow stream can reconnect with an overlapping `sinceSeconds`
|
||||
* window (integer-second granularity + a safety buffer), which replays a few
|
||||
* seconds of recent output on every reconnect. Without dedup those replayed
|
||||
* lines appear as duplicate events in the streaming UI — the same assistant
|
||||
* text block shows up between every subsequent tool call (FAR-123).
|
||||
*
|
||||
* The filter operates at the chunk → line level: chunks are split on `\n`,
|
||||
* incomplete trailing content is buffered until the next chunk, and each
|
||||
* complete line is emitted at most once. JSON-shaped Claude stream-json
|
||||
* events are keyed by their stable structural IDs; non-JSON lines pass
|
||||
* through unchanged so genuinely-repeated status lines are not swallowed.
|
||||
*/
|
||||
|
||||
type Parsed = Record<string, unknown>;
|
||||
|
||||
function asString(value: unknown): string {
|
||||
return typeof value === "string" ? value : "";
|
||||
}
|
||||
|
||||
function asRecord(value: unknown): Parsed | null {
|
||||
if (typeof value !== "object" || value === null || Array.isArray(value)) return null;
|
||||
return value as Parsed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a stable dedup key for a Claude stream-json event. Returns `null`
|
||||
* when the event is not a recognized Claude event — those lines fall back to
|
||||
* raw-content hashing so non-JSON output (paperclip status lines, shell
|
||||
* output) is never deduped by identity.
|
||||
*/
|
||||
export function eventDedupKey(event: Parsed): string | null {
|
||||
const type = asString(event.type);
|
||||
|
||||
if (type === "system") {
|
||||
const subtype = asString(event.subtype);
|
||||
const sessionId = asString(event.session_id);
|
||||
if (subtype === "init" && sessionId) return `system:init:${sessionId}`;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === "assistant") {
|
||||
const message = asRecord(event.message);
|
||||
const id = message ? asString(message.id) : "";
|
||||
if (id) return `assistant:${id}`;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === "user") {
|
||||
const message = asRecord(event.message);
|
||||
const content = message && Array.isArray(message.content) ? message.content : [];
|
||||
const toolUseIds: string[] = [];
|
||||
for (const entry of content) {
|
||||
const block = asRecord(entry);
|
||||
if (!block) continue;
|
||||
const toolUseId = asString(block.tool_use_id);
|
||||
if (toolUseId) toolUseIds.push(toolUseId);
|
||||
}
|
||||
if (toolUseIds.length > 0) return `user:tool_result:${toolUseIds.join(",")}`;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === "result") {
|
||||
const sessionId = asString(event.session_id);
|
||||
return sessionId ? `result:${sessionId}` : "result:unknown";
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stateful line-level dedup filter. Emits `filter(chunk)` output through
|
||||
* the caller — preserves original chunk formatting (including trailing
|
||||
* newlines) for lines that pass the dedup check.
|
||||
*/
|
||||
export class LogLineDedupFilter {
|
||||
private buffer = "";
|
||||
private readonly seenKeys = new Set<string>();
|
||||
|
||||
/**
|
||||
* Process a chunk and return the subset that should be forwarded.
|
||||
* Incomplete trailing content (no terminating newline) is buffered and
|
||||
* emitted on the next chunk that completes the line (or on flush()).
|
||||
*/
|
||||
filter(chunk: string): string {
|
||||
if (!chunk) return "";
|
||||
const combined = this.buffer + chunk;
|
||||
const endsWithNewline = combined.endsWith("\n");
|
||||
const parts = combined.split("\n");
|
||||
|
||||
if (endsWithNewline) {
|
||||
// Discard the final empty element — last line was complete.
|
||||
parts.pop();
|
||||
this.buffer = "";
|
||||
} else {
|
||||
// Last element is an incomplete line — hold it for the next chunk.
|
||||
this.buffer = parts.pop() ?? "";
|
||||
}
|
||||
|
||||
const out: string[] = [];
|
||||
for (const line of parts) {
|
||||
if (this.shouldEmit(line)) out.push(line);
|
||||
}
|
||||
if (out.length === 0) return "";
|
||||
return out.join("\n") + "\n";
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush any incomplete trailing content. Called when the stream ends
|
||||
* without a terminating newline so the final partial line isn't lost.
|
||||
*/
|
||||
flush(): string {
|
||||
const pending = this.buffer;
|
||||
this.buffer = "";
|
||||
if (!pending) return "";
|
||||
return this.shouldEmit(pending) ? pending : "";
|
||||
}
|
||||
|
||||
private shouldEmit(line: string): boolean {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) return true;
|
||||
|
||||
// Only attempt dedup on JSON-shaped lines; pass shell/text output through.
|
||||
if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true;
|
||||
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(trimmed);
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
|
||||
const event = asRecord(parsed);
|
||||
if (!event) return true;
|
||||
|
||||
// Recognized Claude stream-json event → structural key.
|
||||
const structuralKey = eventDedupKey(event);
|
||||
const key = structuralKey ?? `raw:${trimmed}`;
|
||||
|
||||
if (this.seenKeys.has(key)) return false;
|
||||
this.seenKeys.add(key);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,150 @@
|
||||
import { constants as fsConstants } from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { createHash } from "node:crypto";
|
||||
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
|
||||
import {
|
||||
type PaperclipSkillEntry,
|
||||
ensurePaperclipSkillSymlink,
|
||||
} from "@paperclipai/adapter-utils/server-utils";
|
||||
|
||||
export interface ClaudePromptBundle {
|
||||
bundleKey: string;
|
||||
/** Absolute path to the bundle root directory (contains .claude/skills/ and agent-instructions.md). */
|
||||
rootDir: string;
|
||||
/** Value to pass as --add-dir to the Claude CLI. */
|
||||
addDir: string;
|
||||
/** Path to the materialized instructions file, or null if no instructions were provided. */
|
||||
instructionsFilePath: string | null;
|
||||
}
|
||||
|
||||
const DEFAULT_PAPERCLIP_INSTANCE_ID = "default";
|
||||
|
||||
function resolveManagedClaudePromptCacheRoot(companyId: string): string {
|
||||
const paperclipHome =
|
||||
(typeof process.env.PAPERCLIP_HOME === "string" && process.env.PAPERCLIP_HOME.trim().length > 0
|
||||
? process.env.PAPERCLIP_HOME.trim()
|
||||
: null) ??
|
||||
path.resolve(os.homedir(), ".paperclip");
|
||||
const instanceId =
|
||||
(typeof process.env.PAPERCLIP_INSTANCE_ID === "string" && process.env.PAPERCLIP_INSTANCE_ID.trim().length > 0
|
||||
? process.env.PAPERCLIP_INSTANCE_ID.trim()
|
||||
: null) ?? DEFAULT_PAPERCLIP_INSTANCE_ID;
|
||||
return path.resolve(paperclipHome, "instances", instanceId, "companies", companyId, "claude-prompt-cache");
|
||||
}
|
||||
|
||||
async function hashPathContents(
|
||||
candidate: string,
|
||||
hash: ReturnType<typeof createHash>,
|
||||
relativePath: string,
|
||||
seenDirectories: Set<string>,
|
||||
): Promise<void> {
|
||||
const stat = await fs.lstat(candidate);
|
||||
if (stat.isSymbolicLink()) {
|
||||
hash.update(`symlink:${relativePath}\n`);
|
||||
const resolved = await fs.realpath(candidate).catch(() => null);
|
||||
if (!resolved) {
|
||||
hash.update("missing\n");
|
||||
return;
|
||||
}
|
||||
await hashPathContents(resolved, hash, relativePath, seenDirectories);
|
||||
return;
|
||||
}
|
||||
if (stat.isDirectory()) {
|
||||
const realDir = await fs.realpath(candidate).catch(() => candidate);
|
||||
hash.update(`dir:${relativePath}\n`);
|
||||
if (seenDirectories.has(realDir)) {
|
||||
hash.update("loop\n");
|
||||
return;
|
||||
}
|
||||
seenDirectories.add(realDir);
|
||||
const entries = await fs.readdir(candidate, { withFileTypes: true });
|
||||
entries.sort((a, b) => a.name.localeCompare(b.name));
|
||||
for (const entry of entries) {
|
||||
const childRelativePath = relativePath.length > 0 ? `${relativePath}/${entry.name}` : entry.name;
|
||||
await hashPathContents(path.join(candidate, entry.name), hash, childRelativePath, seenDirectories);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (stat.isFile()) {
|
||||
hash.update(`file:${relativePath}\n`);
|
||||
hash.update(await fs.readFile(candidate));
|
||||
hash.update("\n");
|
||||
return;
|
||||
}
|
||||
hash.update(`other:${relativePath}:${stat.mode}\n`);
|
||||
}
|
||||
|
||||
async function buildClaudePromptBundleKey(input: {
|
||||
skills: PaperclipSkillEntry[];
|
||||
instructionsContents: string | null;
|
||||
}): Promise<string> {
|
||||
const hash = createHash("sha256");
|
||||
hash.update("paperclip-claude-prompt-bundle:v1\n");
|
||||
if (input.instructionsContents) {
|
||||
hash.update("instructions\n");
|
||||
hash.update(input.instructionsContents);
|
||||
hash.update("\n");
|
||||
} else {
|
||||
hash.update("instructions:none\n");
|
||||
}
|
||||
const sortedSkills = [...input.skills].sort((a, b) => a.runtimeName.localeCompare(b.runtimeName));
|
||||
for (const entry of sortedSkills) {
|
||||
hash.update(`skill:${entry.key}:${entry.runtimeName}\n`);
|
||||
await hashPathContents(entry.source, hash, entry.runtimeName, new Set());
|
||||
}
|
||||
return hash.digest("hex");
|
||||
}
|
||||
|
||||
async function ensureReadableFile(targetPath: string, contents: string): Promise<void> {
|
||||
try {
|
||||
await fs.access(targetPath, fsConstants.R_OK);
|
||||
return;
|
||||
} catch {
|
||||
// Fall through and materialize the file.
|
||||
}
|
||||
await fs.mkdir(path.dirname(targetPath), { recursive: true });
|
||||
const tempPath = `${targetPath}.${process.pid}.${Date.now()}.tmp`;
|
||||
try {
|
||||
await fs.writeFile(tempPath, contents, "utf8");
|
||||
await fs.rename(tempPath, targetPath);
|
||||
} catch (err) {
|
||||
const targetReadable = await fs.access(targetPath, fsConstants.R_OK).then(() => true).catch(() => false);
|
||||
if (!targetReadable) throw err;
|
||||
} finally {
|
||||
await fs.rm(tempPath, { force: true }).catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
export async function prepareClaudePromptBundle(input: {
|
||||
companyId: string;
|
||||
skills: PaperclipSkillEntry[];
|
||||
instructionsContents: string | null;
|
||||
onLog: AdapterExecutionContext["onLog"];
|
||||
}): Promise<ClaudePromptBundle> {
|
||||
const { companyId, skills, instructionsContents, onLog } = input;
|
||||
const bundleKey = await buildClaudePromptBundleKey({ skills, instructionsContents });
|
||||
const rootDir = path.join(resolveManagedClaudePromptCacheRoot(companyId), bundleKey);
|
||||
const skillsHome = path.join(rootDir, ".claude", "skills");
|
||||
await fs.mkdir(skillsHome, { recursive: true });
|
||||
|
||||
for (const entry of skills) {
|
||||
const target = path.join(skillsHome, entry.runtimeName);
|
||||
try {
|
||||
await ensurePaperclipSkillSymlink(entry.source, target);
|
||||
} catch (err) {
|
||||
await onLog(
|
||||
"stderr",
|
||||
`[paperclip] Failed to materialize Claude skill "${entry.key}" into ${skillsHome}: ${err instanceof Error ? err.message : String(err)}\n`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const instructionsFilePath = instructionsContents ? path.join(rootDir, "agent-instructions.md") : null;
|
||||
if (instructionsFilePath && instructionsContents) {
|
||||
await ensureReadableFile(instructionsFilePath, instructionsContents);
|
||||
}
|
||||
|
||||
return { bundleKey, rootDir, addDir: rootDir, instructionsFilePath };
|
||||
}
|
||||
@@ -33,7 +33,7 @@ async function buildK8sSkillSnapshot(
|
||||
sourcePath: entry.source,
|
||||
targetPath: null,
|
||||
detail: desiredSet.has(entry.key)
|
||||
? "Injected via prompt bundle into ephemeral K8s Job pods."
|
||||
? "Materialized into the PVC-backed Claude prompt bundle before each K8s Job run."
|
||||
: null,
|
||||
required: Boolean(entry.required),
|
||||
requiredReason: entry.requiredReason ?? null,
|
||||
|
||||
Reference in New Issue
Block a user