Compare commits

...

15 Commits

Author SHA1 Message Date
Gandalf the Greybeard 9f79efdf36 0.1.33 2026-04-23 22:45:37 +00:00
Gandalf the Greybeard 4210f51937 chore: update lockfile
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 22:45:31 +00:00
Gandalf the Greybeard f41ae818ef fix: fire onSpawn immediately on job terminal transition (FAR-14)
Prevents process_lost false positives for 2-3 minute K8s jobs by
resetting the reaper clock when the keepalive loop detects the job
has completed (or been deleted), rather than waiting for the next
periodic refresh.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 22:29:22 +00:00
Hugh Commit baf7e2d44d 0.1.32: port prepareClaudePromptBundle to claude_k8s (FAR-12)
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 19:47:26 +00:00
Gandalf the Greybeard 77ed2004f8 fix: port prepareClaudePromptBundle flow to claude_k8s adapter (FAR-11)
K8s Job pods were starting without the Paperclip skill loaded, so agents
could not find their heartbeat procedure and reported "no issue content in
my workspace" on every wake. Root cause: claude_local materialises skills
into a PVC-backed prompt-bundle directory and passes --add-dir to Claude,
but claude_k8s did neither.

Changes:
- Add src/server/prompt-cache.ts with prepareClaudePromptBundle (ported
  from adapter-claude-local). Writes skill symlinks and the agent's
  instructions file into a content-addressed bundle directory under the
  shared PVC (/paperclip/instances/.../claude-prompt-cache/<hash>/).
- execute.ts: read desired skills and instructions file before building
  the Job manifest, then call prepareClaudePromptBundle and pass the
  resulting bundle to buildJobManifest.
- job-manifest.ts: accept optional promptBundle in JobBuildInput; when
  present, pass --add-dir <bundle.addDir> and use bundle.instructionsFilePath
  for --append-system-prompt-file. Also fix: skip --append-system-prompt-file
  on session resumes to avoid wasting tokens on re-injection.
- skills.ts: correct the detail string to reflect actual materialisation.
- job-manifest.test.ts: add 5 new tests covering --add-dir injection,
  bundle path preference, session-resume skipping, and fallback behaviour.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 19:34:35 +00:00
Gandalf the Greybeard 69d0f4972f test: regression for streamPodLogsOnce bail timer (FAR-10)
Uses vi.mock on k8s-client and vi.useFakeTimers to prove that when
logApi.log() never resolves (the FAR-10 hang shape) and stopSignal
fires, streamPodLogsOnce still returns within the bail window
(LOG_STREAM_BAIL_TIMEOUT_MS).  Exports streamPodLogsOnce so the test
can call it directly.  Also covers the no-stopSignal happy path.

269/269 passing (+2 new).

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 16:43:32 +00:00
Gandalf the Greybeard c7706d742f 0.1.31: harden streamPodLogsOnce with Promise.race bail (FAR-10)
Defensive follow-up to the FAR-10 fix.  The original patch aborts the
in-flight follow stream by destroying the Writable once stopSignal
fires, and relies on the @kubernetes/client-node library propagating
that destroy into an abort of the underlying HTTP request.  If that
propagation ever fails (e.g. the client is awaiting a response that
never arrives), logApi.log() can still hang forever.

Adds a Promise.race with a 3s bail timer that starts when stopSignal
fires.  In the happy path (destroy-propagation works), logApi.log()
resolves first and the bail timer is cleared.  In the failure path,
the bail timer fires and streamPodLogsOnce returns with whatever
chunks were captured — preventing the hang from reaching execute().

No test change: existing 267 tests pass and the race path needs a k8s
mock to exercise end-to-end; validated by monitoring real runs.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 16:36:51 +00:00
Gandalf the Greybeard 8937fb2804 chore: fix repo org farhoodliquor→farhoodlabs; wire NPM_TOKEN for publish
- Update repository, bugs, and homepage URLs in package.json to use
  the correct farhoodlabs GitHub org
- Add NODE_AUTH_TOKEN: NPM_TOKEN to the CI publish step so the newly
  added NPM_TOKEN secret is picked up for authentication

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 16:20:48 +00:00
Gandalf the Greybeard 77e9aa9b37 ci: switch npm publish to OIDC trusted publishing
Replaces NPM_TOKEN secret with id-token: write + --provenance so
publishing uses GitHub's OIDC token directly. No repository secret
required; provenance attestation is generated automatically.

Also collapses the redundant second setup-node step (registry-url is
now set on the first one).

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 16:10:39 +00:00
Gandalf the Greybeard 683ea2d8b1 0.1.30
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 16:08:22 +00:00
Chris Farhood dd859c74a8 Merge pull request #9 from farhoodlabs/fix/far-10-process-lost-after-job-complete
fix: prevent process_lost when K8s Job completes (FAR-10)
2026-04-23 12:07:33 -04:00
Gandalf the Greybeard b3c1519cf5 fix: prevent process_lost when K8s Job completes (FAR-10)
Four stacked bugs caused the adapter to hang after K8s Job completion,
allowing the 5-minute reaper to mark runs process_lost even when the Job
actually succeeded.

- streamPodLogsOnce: add stopSignal polling loop that destroys the
  writable every 200ms once the job-completion branch fires, aborting
  any in-flight follow stream that would otherwise hang indefinitely
- waitForPod: treat phase=Failed as a terminal error (throw via
  describePodTerminatedError) instead of entering the log-stream path
  with a dead pod (new helper is exported for unit tests)
- waitForPod: surface cs.state?.terminated in the per-tick detail line
  so operators see exit code / reason without needing kubectl
- keepalive: add POST_TERMINAL_KEEPALIVE_MS (90s) window after Job goes
  terminal so onSpawn keeps refreshing updatedAt during cleanup; if
  execute() genuinely stalls past 90s the reaper will still catch it

Regression tests added for describePodTerminatedError (phase=Failed
with and without claude container status).

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 15:59:51 +00:00
Test User 78fd702ccb 0.1.29 2026-04-23 02:48:58 +00:00
Chris Farhood 0bc1bb1dd1 Merge pull request #8 from farhoodliquor/fix/far-124-reattach-orphan-k8s-jobs
fix: reattach to orphaned K8s Jobs across Paperclip restarts (FAR-124)
2026-04-22 22:33:05 -04:00
Test User c8968598e4 fix: reattach to orphaned K8s Jobs across Paperclip restarts (FAR-124)
When the Paperclip pod restarts mid-run, the in-process setInterval
keepalive dies, `updatedAt` goes stale, and the server's orphan reaper
fails the run with the (misleading) "child pid 1 is no longer running"
message.  Paperclip then dispatches a continuation run, whose execute()
finds the previous run's K8s Job still happily running and deletes it
as an "orphan" — throwing away work and producing the transcript/run
cascade reported on FAR-124.

Changes:

- job-manifest: add `paperclip.io/task-id` and `paperclip.io/session-id`
  labels (sanitized via new `sanitizeLabelValue` helper) so a later
  execute() can identify an orphan as the continuation of the same
  logical unit of work.
- execute: in the concurrency guard, when `reattachOrphanedJobs` is on
  (default) and an orphan matches agent + task + session + is not
  terminal, pick it as the reattach target; delete only the other
  orphans.  Branch the build/create/waitForPod block so the reattach
  path skips manifest building, Secret creation, Job creation, and
  scheduling wait — it jumps straight to streaming logs and waiting
  for the existing pod's completion.
- config-schema: expose `reattachOrphanedJobs` toggle (default true).
- Tests: `sanitizeLabelValue`, `isReattachableOrphan`, new label
  presence/absence, config default.

No server-side changes; the misleading reaper message and lack of a
non-local retry path will be addressed in a follow-up upstream PR.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-22 21:59:25 +00:00
19 changed files with 1425 additions and 218 deletions
+4 -7
View File
@@ -29,24 +29,21 @@ jobs:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/master' && github.event_name == 'push'
permissions:
id-token: write
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: "22"
registry-url: "https://registry.npmjs.org"
cache: "npm"
- run: npm ci
- run: npm run build
- uses: actions/setup-node@v4
with:
node-version: "22"
registry-url: "https://registry.npmjs.org"
cache: "npm"
- name: Publish (skip if version already exists)
run: |
PKG_NAME=$(node -p "require('./package.json').name")
@@ -54,7 +51,7 @@ jobs:
if npm view "${PKG_NAME}@${PKG_VERSION}" version 2>/dev/null; then
echo "Version ${PKG_VERSION} already published — skipping."
else
npm publish --access public
npm publish --provenance --access public
fi
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
+33
View File
@@ -1,3 +1,36 @@
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
import type * as k8s from "@kubernetes/client-node";
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
* Exported for unit tests.
*/
export declare function isK8s404(err: unknown): boolean;
/**
* Build the error message when Claude's stdout contains no result event.
* Skips system/init event lines so the UI doesn't display the raw init JSON.
* Exported for unit tests.
*/
export declare function buildPartialRunError(exitCode: number | null, model: string, stdout: string): string;
/**
* Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does
* not match the current runId) as a potential reattach target. A Job is
* reattachable when it belongs to the same agent, same task, and same resume
* session as the current run — meaning the previous Paperclip instance was
* mid-stream on the exact piece of work this new run was dispatched to do.
* Exported for unit tests.
*/
export declare function isReattachableOrphan(job: k8s.V1Job, expected: {
agentId: string;
taskId: string | null;
sessionId: string | null;
}): boolean;
/**
* Build an error message for a pod that reached phase=Failed before or
* instead of streaming logs. Includes the claude container's terminated exit
* code and reason when available so operators can diagnose crashes without
* needing kubectl. Exported for unit tests.
*/
export declare function describePodTerminatedError(podName: string, phase: string, containerStatuses: k8s.V1ContainerStatus[]): string;
export declare function execute(ctx: AdapterExecutionContext): Promise<AdapterExecutionResult>;
//# sourceMappingURL=execute.d.ts.map
+1 -1
View File
@@ -1 +1 @@
{"version":3,"file":"execute.d.ts","sourceRoot":"","sources":["../../src/server/execute.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,EAAE,uBAAuB,EAAE,sBAAsB,EAAE,MAAM,4BAA4B,CAAC;AAiUlG,wBAAsB,OAAO,CAAC,GAAG,EAAE,uBAAuB,GAAG,OAAO,CAAC,sBAAsB,CAAC,CAoc3F"}
{"version":3,"file":"execute.d.ts","sourceRoot":"","sources":["../../src/server/execute.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,EAAE,uBAAuB,EAAE,sBAAsB,EAAE,MAAM,4BAA4B,CAAC;AAWlG,OAAO,KAAK,KAAK,GAAG,MAAM,yBAAyB,CAAC;AAYpD;;;;GAIG;AACH,wBAAgB,QAAQ,CAAC,GAAG,EAAE,OAAO,GAAG,OAAO,CAO9C;AAED;;;;GAIG;AACH,wBAAgB,oBAAoB,CAClC,QAAQ,EAAE,MAAM,GAAG,IAAI,EACvB,KAAK,EAAE,MAAM,EACb,MAAM,EAAE,MAAM,GACb,MAAM,CA4BR;AAED;;;;;;;GAOG;AACH,wBAAgB,oBAAoB,CAClC,GAAG,EAAE,GAAG,CAAC,KAAK,EACd,QAAQ,EAAE;IAAE,OAAO,EAAE,MAAM,CAAC;IAAC,MAAM,EAAE,MAAM,GAAG,IAAI,CAAC;IAAC,SAAS,EAAE,MAAM,GAAG,IAAI,CAAA;CAAE,GAC7E,OAAO,CAaT;AAED;;;;;GAKG;AACH,wBAAgB,0BAA0B,CACxC,OAAO,EAAE,MAAM,EACf,KAAK,EAAE,MAAM,EACb,iBAAiB,EAAE,GAAG,CAAC,iBAAiB,EAAE,GACzC,MAAM,CASR;AAkWD,wBAAsB,OAAO,CAAC,GAAG,EAAE,uBAAuB,GAAG,OAAO,CAAC,sBAAsB,CAAC,CAkkB3F"}
+370 -104
View File
@@ -1,12 +1,110 @@
import { asString, asNumber, asBoolean, parseObject } from "@paperclipai/adapter-utils/server-utils";
import { parseClaudeStreamJson, describeClaudeFailure, isClaudeMaxTurnsResult, isClaudeUnknownSessionError, } from "./parse.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
import { buildJobManifest } from "./job-manifest.js";
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import { Writable } from "node:stream";
const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
// How long to keep refreshing onSpawn after the Job reaches a terminal state.
// Covers the cleanup path (delete job, parse stdout) so a slow K8s API call
// doesn't trip the 5-minute reaper staleness window.
const POST_TERMINAL_KEEPALIVE_MS = 90_000;
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
* Exported for unit tests.
*/
export function isK8s404(err) {
if (!(err instanceof Error))
return false;
const e = err;
const resp = e.response;
if (resp?.statusCode === 404 || resp?.status === 404)
return true;
if (e.statusCode === 404)
return true;
return /HTTP-Code:\s*404\b/.test(err.message);
}
/**
* Build the error message when Claude's stdout contains no result event.
* Skips system/init event lines so the UI doesn't display the raw init JSON.
* Exported for unit tests.
*/
export function buildPartialRunError(exitCode, model, stdout) {
if (exitCode === 0)
return "Failed to parse Claude JSON output";
// Walk stdout lines, skip system events, return the first real content line.
const firstContentLine = stdout.split(/\r?\n/)
.map((l) => l.trim())
.find((l) => {
if (!l)
return false;
try {
const obj = JSON.parse(l);
if (typeof obj === "object" && obj !== null && obj.type === "system")
return false;
}
catch {
// not JSON — treat as content
}
return true;
}) ?? "";
// If we only have system/init events and nothing else, surface the model
// name so the operator can diagnose missing credentials or unsupported model.
const initOnlyOutput = stdout.trim() !== "" && model !== "" && !firstContentLine;
if (initOnlyOutput) {
const modelHint = model ? ` (model: ${model})` : "";
return `Claude started but did not produce a result${modelHint} — check API credentials, model support, and adapter config`;
}
return firstContentLine
? `Claude exited with code ${exitCode ?? -1}: ${firstContentLine}`
: `Claude exited with code ${exitCode ?? -1}`;
}
/**
* Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does
* not match the current runId) as a potential reattach target. A Job is
* reattachable when it belongs to the same agent, same task, and same resume
* session as the current run — meaning the previous Paperclip instance was
* mid-stream on the exact piece of work this new run was dispatched to do.
* Exported for unit tests.
*/
export function isReattachableOrphan(job, expected) {
if (!expected.taskId || !expected.sessionId)
return false;
const labels = job.metadata?.labels ?? {};
if (labels["paperclip.io/adapter-type"] !== "claude_k8s")
return false;
if (labels["paperclip.io/agent-id"] !== expected.agentId)
return false;
if (labels["paperclip.io/task-id"] !== expected.taskId)
return false;
if (labels["paperclip.io/session-id"] !== expected.sessionId)
return false;
const conditions = job.status?.conditions ?? [];
const terminal = conditions.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True");
if (terminal)
return false;
return true;
}
/**
* Build an error message for a pod that reached phase=Failed before or
* instead of streaming logs. Includes the claude container's terminated exit
* code and reason when available so operators can diagnose crashes without
* needing kubectl. Exported for unit tests.
*/
export function describePodTerminatedError(podName, phase, containerStatuses) {
const mainCs = containerStatuses.find((cs) => cs.name === "claude");
const terminated = mainCs?.state?.terminated;
if (terminated) {
const code = terminated.exitCode ?? "unknown";
const reason = terminated.reason ?? terminated.message ?? "no reason";
return `Pod ${podName} reached phase=${phase}: claude exited ${code} (${reason})`;
}
return `Pod ${podName} reached phase=${phase}`;
}
/**
* Wait for the Job's pod to reach a terminal or running state.
* Returns the pod name once logs can be streamed, or throws on failure.
@@ -52,14 +150,22 @@ async function waitForPod(namespace, jobName, timeoutMs, onLog, kubeconfigPath)
details.push(`${cs.name}: waiting (${cs.state.waiting.reason ?? "unknown"})`);
else if (cs.state?.running)
details.push(`${cs.name}: running`);
else if (cs.state?.terminated)
details.push(`${cs.name}: terminated (exit ${cs.state.terminated.exitCode ?? "?"}, ${cs.state.terminated.reason ?? "no reason"})`);
}
await onLog("stdout", `[paperclip] Pod ${podName}: ${details.join(", ")}\n`);
lastStatus = statusKey;
}
// Ready to stream logs
if (phase === "Running" || phase === "Succeeded" || phase === "Failed") {
if (phase === "Running" || phase === "Succeeded") {
return podName;
}
// phase=Failed means the pod crashed before we could stream logs.
// Throwing here routes the caller into the error path with a structured
// message instead of entering the log-streaming path with a dead pod.
if (phase === "Failed") {
throw new Error(describePodTerminatedError(podName, phase, containerStatuses));
}
// Init containers done + main running (phase may still say Pending briefly)
const allInitsDone = initStatuses.length > 0 && initStatuses.every((s) => s.state?.terminated?.exitCode === 0);
const mainRunning = containerStatuses.some((s) => s.state?.running);
@@ -104,16 +210,32 @@ async function waitForPod(namespace, jobName, timeoutMs, onLog, kubeconfigPath)
* Stream pod logs once via follow. Returns accumulated stdout when the
* stream ends (container exit, API disconnect, or abort signal).
*/
async function streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds) {
async function streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal) {
const logApi = getLogApi(kubeconfigPath);
const chunks = [];
const writable = new Writable({
write(chunk, _encoding, callback) {
const text = chunk.toString("utf-8");
chunks.push(text);
void onLog("stdout", text).then(() => callback(), callback);
const emitted = dedup ? dedup.filter(text) : text;
if (!emitted) {
callback();
return;
}
void onLog("stdout", emitted).then(() => callback(), callback);
},
});
// When the job completion signal fires, destroy the writable to abort the
// in-flight follow stream. Without this, logApi.log can hang indefinitely
// when the pod terminates without closing the HTTP connection cleanly.
let stopPoller = null;
if (stopSignal) {
stopPoller = setInterval(() => {
if (stopSignal.stopped && !writable.destroyed) {
writable.destroy();
}
}, 200);
}
try {
await logApi.log(namespace, podName, "claude", writable, {
follow: true,
@@ -122,8 +244,12 @@ async function streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinc
});
}
catch {
// follow may fail if the container already exited or the API
// connection dropped — not fatal, caller decides whether to retry.
// follow may fail if the container already exited, the API connection
// dropped, or we aborted via writable.destroy() — not fatal.
}
finally {
if (stopPoller)
clearInterval(stopPoller);
}
return chunks.join("");
}
@@ -143,6 +269,9 @@ async function streamPodLogs(namespace, podName, onLog, kubeconfigPath, stopSign
// reconnects use a tight window instead of an ever-growing one anchored
// at stream start. This is the primary fix for FAR-105 duplicative logs.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Shared across reconnects so replayed lines inside the `sinceSeconds`
// overlap window are dropped before they reach the streaming UI (FAR-123).
const dedup = new LogLineDedupFilter();
while (!stopSignal?.stopped) {
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
@@ -158,7 +287,7 @@ async function streamPodLogs(namespace, podName, onLog, kubeconfigPath, stopSign
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
@@ -177,6 +306,11 @@ async function streamPodLogs(namespace, podName, onLog, kubeconfigPath, stopSign
// Brief pause before reconnecting to avoid tight loops.
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
}
// Flush any buffered partial line so the final assistant/result chunk
// isn't dropped when the stream ends mid-line.
const tail = dedup.flush();
if (tail)
await onLog("stdout", tail);
return allChunks.join("");
}
/**
@@ -199,13 +333,27 @@ async function readPodLogs(namespace, podName, kubeconfigPath) {
}
/**
* Wait for the Job to reach a terminal state (Complete or Failed).
* Returns the Job's final status.
* Returns the Job's final status. A 404 (job deleted by TTL or externally)
* is treated as a soft terminal: succeeded=false, timedOut=false, jobGone=true.
* The caller should log this and fall through to stdout parsing.
*/
async function waitForJobCompletion(namespace, jobName, timeoutMs, kubeconfigPath) {
const batchApi = getBatchApi(kubeconfigPath);
const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : 0;
while (deadline === 0 || Date.now() < deadline) {
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
let job;
try {
job = await batchApi.readNamespacedJob({ name: jobName, namespace });
}
catch (err) {
if (isK8s404(err)) {
// Job was deleted (TTL garbage collection or external deletion) before
// we detected its terminal condition. The container must have already
// exited for TTL to fire, so log streaming will have captured the output.
return { succeeded: false, timedOut: false, jobGone: true };
}
throw err;
}
const conditions = job.status?.conditions ?? [];
const complete = conditions.find((c) => c.type === "Complete" && c.status === "True");
if (complete)
@@ -261,10 +409,18 @@ export async function execute(ctx) {
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
// still be running. We detect those by comparing the Job's run-id label against
// the current runId and clean them up so this execution can proceed.
// the current runId. When reattachOrphanedJobs is enabled and the orphan matches
// the current agent+task+session, we attach to it instead of deleting it (FAR-124).
const agentId = ctx.agent.id;
const selfPod = await getSelfPodInfo(kubeconfigPath);
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
const runtimeSessionParams = parseObject(runtime.sessionParams);
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null;
const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, "");
const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null;
let reattachTarget = null;
try {
const batchApi = getBatchApi(kubeconfigPath);
const existing = await batchApi.listNamespacedJob({
@@ -277,10 +433,37 @@ export async function execute(ctx) {
// concurrent jobs (same runId — shouldn't happen but guard defensively).
const orphaned = running.filter((j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") !== runId);
const samRun = running.filter((j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId);
if (orphaned.length > 0) {
const orphanNames = orphaned.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of orphaned) {
// Pick the most recent reattachable orphan — same agent + task + session,
// not terminal. Only one target is chosen; any other orphans get
// cleaned up as before.
if (reattachOrphanedJobs && orphaned.length > 0) {
const candidates = orphaned
.filter((j) => isReattachableOrphan(j, {
agentId,
taskId: currentTaskLabel,
sessionId: currentSessionLabel,
}))
.sort((a, b) => {
const at = new Date(a.metadata?.creationTimestamp ?? 0).getTime();
const bt = new Date(b.metadata?.creationTimestamp ?? 0).getTime();
return bt - at;
});
const chosen = candidates[0];
const chosenName = chosen?.metadata?.name;
if (chosen && chosenName) {
reattachTarget = {
jobName: chosenName,
namespace: chosen.metadata?.namespace ?? guardNamespace,
priorRunId: chosen.metadata?.labels?.["paperclip.io/run-id"] ?? "",
image: chosen.spec?.template?.spec?.containers?.[0]?.image ?? "unknown",
};
}
}
const toDelete = orphaned.filter((j) => !reattachTarget || j.metadata?.name !== reattachTarget.jobName);
if (toDelete.length > 0) {
const orphanNames = toDelete.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${toDelete.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of toDelete) {
const name = j.metadata?.name;
if (name) {
await cleanupJob(guardNamespace, name, onLog, kubeconfigPath);
@@ -317,81 +500,114 @@ export async function execute(ctx) {
errorCode: "k8s_concurrency_guard_unreachable",
};
}
// Build Job manifest
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
ctx,
selfPod,
});
// Report invocation metadata
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: claudeArgs,
commandNotes: [
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
context: ctx.context,
});
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
const coreApi = getCoreApi(kubeconfigPath);
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
},
},
stringData: promptSecret.data,
},
const batchApi = getBatchApi(kubeconfigPath);
let jobName;
let namespace;
let promptSecret = null;
if (reattachTarget) {
jobName = reattachTarget.jobName;
namespace = reattachTarget.namespace;
// Announce reattach metadata. Prompt and args aren't known here — they
// belong to the prior run that created this pod and are already present
// on the running container.
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: [],
commandNotes: [
`Image: ${reattachTarget.image}`,
`Namespace: ${namespace}`,
`Reattached from prior run: ${reattachTarget.priorRunId || "unknown"}`,
`Timeout: ${timeoutSec}s`,
],
prompt: "",
context: ctx.context,
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
}
await onLog("stdout", `[paperclip] Reattaching to in-flight K8s Job ${jobName} in namespace ${namespace} (prior run ${reattachTarget.priorRunId || "unknown"})\n`);
}
else {
// Build Job manifest
const built = buildJobManifest({ ctx, selfPod });
const job = built.job;
jobName = built.jobName;
namespace = built.namespace;
const prompt = built.prompt;
const claudeArgs = built.claudeArgs;
const promptMetrics = built.promptMetrics;
promptSecret = built.promptSecret;
// Report invocation metadata
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: claudeArgs,
commandNotes: [
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
context: ctx.context,
});
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
},
},
stringData: promptSecret.data,
},
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
}
catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
};
}
}
// Create the Job
try {
await batchApi.createNamespacedJob({ namespace, body: job });
}
catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
errorCode: "k8s_job_create_failed",
};
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
}
// Create the Job
const batchApi = getBatchApi(kubeconfigPath);
try {
await batchApi.createNamespacedJob({ namespace, body: job });
}
catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
errorCode: "k8s_job_create_failed",
};
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
let stdout = "";
let exitCode = null;
let jobTimedOut = false;
@@ -404,8 +620,24 @@ export async function execute(ctx) {
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
let podName;
try {
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
if (reattachTarget) {
// Pod is already running from the prior run — look it up directly.
const podList = await coreApi.listNamespacedPod({
namespace,
labelSelector: `job-name=${jobName}`,
});
const pod = podList.items[0];
const name = pod?.metadata?.name;
if (!name) {
throw new Error(`Reattach target Job ${jobName} has no pod`);
}
podName = name;
await onLog("stdout", `[paperclip] Reattached to pod ${podName}\n`);
}
else {
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
}
// Notify the server that execution has started. This sets
// processStartedAt and refreshes updatedAt in the DB, which the
// stale-run reaper (reapOrphanedRuns) uses to decide liveness.
@@ -419,13 +651,14 @@ export async function execute(ctx) {
}
catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Pod scheduling failed: ${msg}\n`);
const phase = reattachTarget ? "reattach" : "scheduling";
await onLog("stderr", `[paperclip] Pod ${phase} failed: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Pod scheduling failed: ${msg}`,
errorCode: "k8s_pod_schedule_failed",
errorMessage: `Pod ${phase} failed: ${msg}`,
errorCode: reattachTarget ? "k8s_pod_reattach_failed" : "k8s_pod_schedule_failed",
};
}
// Stream logs and wait for completion concurrently.
@@ -457,18 +690,32 @@ export async function execute(ctx) {
let lastLogAt = Date.now();
let keepaliveTick = 0;
let keepaliveJobTerminal = false;
let keepaliveJobTerminalAt = null;
keepaliveTimer = setInterval(() => {
// Fire-and-forget the async work; setInterval callbacks must be
// synchronous or the timer will drift.
void (async () => {
if (keepaliveJobTerminal)
if (keepaliveJobTerminal) {
// Post-terminal window: keep refreshing onSpawn during cleanup
// (job deletion, log parsing, K8s API calls) so the reaper doesn't
// fire a false process_lost while execute() is still running.
if (ctx.onSpawn &&
keepaliveJobTerminalAt !== null &&
Date.now() - keepaliveJobTerminalAt <= POST_TERMINAL_KEEPALIVE_MS) {
keepaliveTick++;
if (keepaliveTick % 6 === 0) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => { });
}
}
return;
}
// Verify the Job is still alive before announcing or refreshing.
try {
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
const terminal = job.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True");
if (terminal) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
return;
}
}
@@ -477,10 +724,9 @@ export async function execute(ctx) {
// connection resets should NOT permanently disable the keepalive —
// the next tick will re-check and the reaper uses the staleness
// window as a safety net.
const statusCode = err?.response?.statusCode
?? err?.statusCode;
if (statusCode === 404) {
if (isK8s404(err)) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
return;
}
// Log transient errors but leave keepaliveJobTerminal false so
@@ -525,23 +771,44 @@ export async function execute(ctx) {
if (logResult.status === "fulfilled") {
stdout = logResult.value;
}
// If the follow stream missed output (container exited quickly), do a
// one-shot log read as fallback before the pod is cleaned up.
if (!stdout.trim()) {
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
stdout = await readPodLogs(namespace, podName, kubeconfigPath);
if (stdout.trim()) {
// One-shot log fallback: handles two failure modes with a single read.
// Mode 1 — empty stream: the follow stream returned nothing (fast exit before connection).
// Mode 2 — partial stream: we have some output but no result event (follow stream raced
// with container exit and captured only the init line before the connection dropped).
// A one-shot readPodLogs is more reliable for already-terminated containers and reads
// from the beginning of the log, giving us the full output.
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
// the authoritative parse happens once below after all fallbacks complete).
const hasResultEvent = stdout.includes('"type":"result"');
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
if (needsOneShot) {
if (!stdout.trim()) {
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
}
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
if (!stdout.trim() && oneShotLogs.trim()) {
stdout = oneShotLogs;
await onLog("stdout", stdout);
}
else if (oneShotLogs && oneShotLogs.length > stdout.length) {
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
stdout = oneShotLogs;
}
}
if (completionResult.status === "fulfilled") {
jobTimedOut = completionResult.value.timedOut;
if (completionResult.value.jobGone) {
// Job was deleted by TTL or externally before we observed the Complete/Failed
// condition. The container must have exited first (TTL only fires after
// completion), so log streaming has captured the full output — continue
// to stdout parsing rather than returning an error.
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
}
}
else {
// waitForJobCompletion threw — re-check job state to avoid returning
// while the job is still running (which would cause UI staleness and
// concurrency errors on retry). Use a bounded timeout (60s) so we
// don't hang the heartbeat indefinitely if the K8s API is degraded.
// waitForJobCompletion threw an unexpected error — re-check job state to
// avoid returning while the job is still running. Use a bounded timeout
// (60s) so we don't hang the heartbeat indefinitely if the K8s API is degraded.
jobTimedOut = false;
const RECHECK_TIMEOUT_MS = 60_000;
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath);
@@ -550,6 +817,11 @@ export async function execute(ctx) {
// Return an error so the UI knows the run is not done.
jobTimedOut = true;
}
else if (actualState.jobGone) {
// Job was deleted before we could confirm terminal state — same as the
// fulfilled+jobGone case above: proceed with captured output.
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
}
else if (!actualState.succeeded) {
// Job still not terminal — the completion error was likely transient.
// Return an error so the UI knows the run is not done, rather than
@@ -615,16 +887,11 @@ export async function execute(ctx) {
};
}
if (!parsed) {
const stderrLine = stdout.split(/\r?\n/).map((l) => l.trim()).find(Boolean) ?? "";
return {
exitCode,
signal: null,
timedOut: false,
errorMessage: exitCode === 0
? "Failed to parse Claude JSON output"
: stderrLine
? `Claude exited with code ${exitCode ?? -1}: ${stderrLine}`
: `Claude exited with code ${exitCode ?? -1}`,
errorMessage: buildPartialRunError(exitCode, parsedStream.model, stdout),
resultJson: { stdout },
};
}
@@ -636,8 +903,7 @@ export async function execute(ctx) {
outputTokens: asNumber(usageObj.output_tokens, 0),
};
})();
const runtimeSessionParams = parseObject(runtime.sessionParams);
const fallbackSessionId = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const fallbackSessionId = currentSessionIdRaw;
const resolvedSessionId = parsedStream.sessionId
?? (asString(parsed.session_id, fallbackSessionId) || fallbackSessionId);
const model = asString(config.model, "");
+1 -1
View File
File diff suppressed because one or more lines are too long
+20
View File
@@ -1,5 +1,19 @@
import type * as k8s from "@kubernetes/client-node";
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
/**
* Build the shell command prefix that installs a native Node.js PostToolUse
* hook into Claude Code's settings. The hook truncates oversized tool outputs
* before they reach the model — replacing the RTK binary init-container
* approach with a self-contained Node.js implementation.
*
* Both scripts are base64-encoded so they can be embedded in a sh -c command
* string without any quoting or escaping issues.
*
* @param maxOutputBytes Byte threshold above which tool output is truncated.
* @returns A shell command string (suitable for "&&"-chaining
* before the claude invocation).
*/
export declare function buildRtkSetupCommands(maxOutputBytes: number): string;
import type { SelfPodInfo } from "./k8s-client.js";
export interface JobBuildInput {
ctx: AdapterExecutionContext;
@@ -24,5 +38,11 @@ export interface JobBuildResult {
* staged as a K8s Secret before creating the Job. */
promptSecret: PromptSecret | null;
}
/**
* Sanitize a string for use as a Kubernetes label value (RFC 1123 subset:
* `[a-zA-Z0-9]([-_.a-zA-Z0-9]*[a-zA-Z0-9])?`, max 63 chars). Returns `null`
* when no usable characters remain — the caller should omit the label.
*/
export declare function sanitizeLabelValue(value: string, maxLen?: number): string | null;
export declare function buildJobManifest(input: JobBuildInput): JobBuildResult;
//# sourceMappingURL=job-manifest.d.ts.map
+1 -1
View File
@@ -1 +1 @@
{"version":3,"file":"job-manifest.d.ts","sourceRoot":"","sources":["../../src/server/job-manifest.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,KAAK,GAAG,MAAM,yBAAyB,CAAC;AACpD,OAAO,KAAK,EAAE,uBAAuB,EAAE,MAAM,4BAA4B,CAAC;AAgD1E,OAAO,KAAK,EAAE,WAAW,EAAE,MAAM,iBAAiB,CAAC;AA6CnD,MAAM,WAAW,aAAa;IAC5B,GAAG,EAAE,uBAAuB,CAAC;IAC7B,OAAO,EAAE,WAAW,CAAC;CACtB;AAED;;+EAE+E;AAC/E,MAAM,WAAW,YAAY;IAC3B,IAAI,EAAE,MAAM,CAAC;IACb,SAAS,EAAE,MAAM,CAAC;IAClB,IAAI,EAAE,MAAM,CAAC,MAAM,EAAE,MAAM,CAAC,CAAC;CAC9B;AAED,MAAM,WAAW,cAAc;IAC7B,GAAG,EAAE,GAAG,CAAC,KAAK,CAAC;IACf,OAAO,EAAE,MAAM,CAAC;IAChB,SAAS,EAAE,MAAM,CAAC;IAClB,MAAM,EAAE,MAAM,CAAC;IACf,UAAU,EAAE,MAAM,EAAE,CAAC;IACrB,aAAa,EAAE,MAAM,CAAC,MAAM,EAAE,MAAM,CAAC,CAAC;IACtC;0DACsD;IACtD,YAAY,EAAE,YAAY,GAAG,IAAI,CAAC;CACnC;AAuHD,wBAAgB,gBAAgB,CAAC,KAAK,EAAE,aAAa,GAAG,cAAc,CAkRrE"}
{"version":3,"file":"job-manifest.d.ts","sourceRoot":"","sources":["../../src/server/job-manifest.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,KAAK,GAAG,MAAM,yBAAyB,CAAC;AACpD,OAAO,KAAK,EAAE,uBAAuB,EAAE,MAAM,4BAA4B,CAAC;AAY1E;;;;;;;;;;;;GAYG;AACH,wBAAgB,qBAAqB,CAAC,cAAc,EAAE,MAAM,GAAG,MAAM,CAiEpE;AAsCD,OAAO,KAAK,EAAE,WAAW,EAAE,MAAM,iBAAiB,CAAC;AA6CnD,MAAM,WAAW,aAAa;IAC5B,GAAG,EAAE,uBAAuB,CAAC;IAC7B,OAAO,EAAE,WAAW,CAAC;CACtB;AAED;;+EAE+E;AAC/E,MAAM,WAAW,YAAY;IAC3B,IAAI,EAAE,MAAM,CAAC;IACb,SAAS,EAAE,MAAM,CAAC;IAClB,IAAI,EAAE,MAAM,CAAC,MAAM,EAAE,MAAM,CAAC,CAAC;CAC9B;AAED,MAAM,WAAW,cAAc;IAC7B,GAAG,EAAE,GAAG,CAAC,KAAK,CAAC;IACf,OAAO,EAAE,MAAM,CAAC;IAChB,SAAS,EAAE,MAAM,CAAC;IAClB,MAAM,EAAE,MAAM,CAAC;IACf,UAAU,EAAE,MAAM,EAAE,CAAC;IACrB,aAAa,EAAE,MAAM,CAAC,MAAM,EAAE,MAAM,CAAC,CAAC;IACtC;0DACsD;IACtD,YAAY,EAAE,YAAY,GAAG,IAAI,CAAC;CACnC;AAMD;;;;GAIG;AACH,wBAAgB,kBAAkB,CAAC,KAAK,EAAE,MAAM,EAAE,MAAM,SAAK,GAAG,MAAM,GAAG,IAAI,CAI5E;AAmHD,wBAAgB,gBAAgB,CAAC,KAAK,EAAE,aAAa,GAAG,cAAc,CAmSrE"}
+106 -1
View File
@@ -1,5 +1,81 @@
import { asString, asNumber, asBoolean, asStringArray, parseObject, buildPaperclipEnv, renderTemplate, } from "@paperclipai/adapter-utils/server-utils";
import { createHash } from "node:crypto";
/**
* Build the shell command prefix that installs a native Node.js PostToolUse
* hook into Claude Code's settings. The hook truncates oversized tool outputs
* before they reach the model — replacing the RTK binary init-container
* approach with a self-contained Node.js implementation.
*
* Both scripts are base64-encoded so they can be embedded in a sh -c command
* string without any quoting or escaping issues.
*
* @param maxOutputBytes Byte threshold above which tool output is truncated.
* @returns A shell command string (suitable for "&&"-chaining
* before the claude invocation).
*/
export function buildRtkSetupCommands(maxOutputBytes) {
// --- Filter script ----------------------------------------------------------
// This script runs as the PostToolUse hook inside every K8s Job pod.
// Claude Code writes the hook event as JSON to the script's stdin; the script
// truncates the tool_response/tool_result content when it exceeds the
// threshold and writes the (possibly modified) JSON to stdout.
//
// Field-name coverage:
// • tool_response — documented hook event format for PostToolUse
// • tool_result — alternative name seen in some Claude Code versions
// Content may be a plain string or an array of typed blocks (text/image/…).
const filterScript = [
`const c=[];`,
`process.stdin.on('data',d=>c.push(d));`,
`process.stdin.on('end',()=>{`,
`const raw=Buffer.concat(c).toString('utf-8');`,
`let o;try{o=JSON.parse(raw);}catch{process.stdout.write(raw);return;}`,
`const MAX=${maxOutputBytes};`,
`function trunc(s){`,
`if(typeof s!=='string')return s;`,
`const b=Buffer.from(s,'utf-8');`,
`if(b.length<=MAX)return s;`,
`return b.slice(0,MAX).toString('utf-8')+'\\n[...'+(b.length-MAX)+' bytes truncated by paperclip-rtk]';`,
`}`,
`const tr=o&&(o.tool_response||o.tool_result);`,
`if(tr){`,
`if(typeof tr.content==='string'){tr.content=trunc(tr.content);}`,
`else if(Array.isArray(tr.content)){`,
`tr.content=tr.content.map(function(b){`,
`if(b&&typeof b==='object'&&typeof b.text==='string'){`,
`return Object.assign({},b,{text:trunc(b.text)});`,
`}return b;`,
`});`,
`}`,
`}`,
`process.stdout.write(JSON.stringify(o));`,
`});`,
].join("");
// --- Settings script --------------------------------------------------------
// Reads the existing ~/.claude/settings.json (if any), merges in the RTK
// PostToolUse hook, and writes the file back. All other settings sections
// are preserved; only PostToolUse is replaced so we own the full hook list
// for this run.
const settingsScript = [
`const fs=require('fs'),pt=require('path');`,
`const p=pt.join(process.env.HOME,'.claude','settings.json');`,
`let s={};try{s=JSON.parse(fs.readFileSync(p,'utf-8'));}catch(e){}`,
`s.hooks=s.hooks||{};`,
`s.hooks.PostToolUse=[{matcher:'.*',hooks:[{type:'command',command:'node /tmp/.rtk-filter.js'}]}];`,
`fs.mkdirSync(pt.dirname(p),{recursive:true});`,
`fs.writeFileSync(p,JSON.stringify(s));`,
].join("");
// Encode as base64 so the strings can be embedded directly in a shell command
// without any quoting concerns (base64 alphabet: A-Za-z0-9+/=).
const filterB64 = Buffer.from(filterScript, "utf-8").toString("base64");
const settingsB64 = Buffer.from(settingsScript, "utf-8").toString("base64");
return [
// Write the filter script
`node -e "require('fs').writeFileSync('/tmp/.rtk-filter.js',Buffer.from('${filterB64}','base64').toString('utf-8'))"`,
// Install the Claude Code PostToolUse hook (merge into existing settings)
`node -e "eval(Buffer.from('${settingsB64}','base64').toString('utf-8'))"`,
].join(" && ");
}
/** Prompts above this size (bytes) are staged via a Secret instead of an
* init container env var, protecting against the ~1 MiB PodSpec limit. */
const LARGE_PROMPT_THRESHOLD_BYTES = 256 * 1024;
@@ -91,6 +167,16 @@ function parseKeyValueConfig(raw) {
function sanitizeForK8sName(value, maxLen = 16) {
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
}
/**
* Sanitize a string for use as a Kubernetes label value (RFC 1123 subset:
* `[a-zA-Z0-9]([-_.a-zA-Z0-9]*[a-zA-Z0-9])?`, max 63 chars). Returns `null`
* when no usable characters remain — the caller should omit the label.
*/
export function sanitizeLabelValue(value, maxLen = 63) {
const cleaned = value.replace(/[^a-zA-Z0-9._-]/g, "").slice(0, maxLen);
const trimmed = cleaned.replace(/^[^a-zA-Z0-9]+/, "").replace(/[^a-zA-Z0-9]+$/, "");
return trimmed.length > 0 ? trimmed : null;
}
/**
* Build a short deterministic hash suffix from the raw inputs to avoid
* collisions when sanitized slugs happen to be identical.
@@ -202,6 +288,8 @@ export function buildJobManifest(input) {
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
const extraLabels = parseKeyValueConfig(config.labels);
const enableRtk = asBoolean(config.enableRtk, false);
const rtkMaxOutputBytes = asNumber(config.rtkMaxOutputBytes, 50000);
// Resolve working directory — use workspace cwd, fall back to /paperclip
const workspaceContext = parseObject(context.paperclipWorkspace);
const workspaceCwd = asString(workspaceContext.cwd, "");
@@ -289,6 +377,17 @@ export function buildJobManifest(input) {
"paperclip.io/company-id": agent.companyId,
"paperclip.io/adapter-type": "claude_k8s",
};
// Reattach-target labels: let a future execute() identify this Job as the
// continuation of the same logical unit of work (same task + same resume
// session) so it can attach to the running pod across a Paperclip restart
// instead of deleting it and starting over (FAR-124).
const taskIdRaw = asString(context.taskId, "") || asString(context.issueId, "");
const taskLabel = taskIdRaw ? sanitizeLabelValue(taskIdRaw) : null;
if (taskLabel)
labels["paperclip.io/task-id"] = taskLabel;
const sessionLabel = runtimeSessionId ? sanitizeLabelValue(runtimeSessionId) : null;
if (sessionLabel)
labels["paperclip.io/session-id"] = sessionLabel;
for (const [key, value] of Object.entries(extraLabels)) {
labels[key] = value;
}
@@ -345,7 +444,13 @@ export function buildJobManifest(input) {
};
// Build the claude command string for the main container
const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
const mainCommand = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
const claudeInvocation = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
// When RTK output filtering is enabled, prepend the Node.js hook setup.
// This writes a filter script and a Claude Code settings file that installs
// it as a PostToolUse hook — no external binary or init container required.
const mainCommand = enableRtk
? `${buildRtkSetupCommands(rtkMaxOutputBytes)} && ${claudeInvocation}`
: claudeInvocation;
// Decide prompt delivery strategy: env var (small) or Secret volume (large).
const promptBytes = Buffer.byteLength(prompt, "utf-8");
const useLargePromptPath = promptBytes > LARGE_PROMPT_THRESHOLD_BYTES;
+1 -1
View File
File diff suppressed because one or more lines are too long
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.28",
"version": "0.1.33",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.28",
"version": "0.1.33",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+4 -4
View File
@@ -1,16 +1,16 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.28",
"version": "0.1.33",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
"type": "git",
"url": "https://github.com/farhoodliquor/paperclip-adapter-claude-k8s"
"url": "https://github.com/farhoodlabs/paperclip-adapter-claude-k8s"
},
"bugs": {
"url": "https://github.com/farhoodliquor/paperclip-adapter-claude-k8s/issues"
"url": "https://github.com/farhoodlabs/paperclip-adapter-claude-k8s/issues"
},
"homepage": "https://github.com/farhoodliquor/paperclip-adapter-claude-k8s#readme",
"homepage": "https://github.com/farhoodlabs/paperclip-adapter-claude-k8s#readme",
"type": "module",
"paperclip": {
"adapterUiParser": "1.0.0"
+8
View File
@@ -42,6 +42,14 @@ describe("getConfigSchema", () => {
expect(field!.default).toBe(true);
});
it("reattachOrphanedJobs defaults to true", () => {
const schema = getConfigSchema();
const field = schema.fields.find((f: ConfigFieldSchema) => f.key === "reattachOrphanedJobs");
expect(field).toBeDefined();
expect(field!.type).toBe("toggle");
expect(field!.default).toBe(true);
});
it("has imagePullPolicy as select with correct options", () => {
const schema = getConfigSchema();
const field = schema.fields.find((f: ConfigFieldSchema) => f.key === "imagePullPolicy");
+7
View File
@@ -89,6 +89,13 @@ export function getConfigSchema(): AdapterConfigSchema {
label: "Retain Jobs",
hint: "Skip cleanup of completed Jobs for debugging purposes.",
},
{
type: "toggle",
key: "reattachOrphanedJobs",
label: "Reattach to Orphaned Jobs",
hint: "If a prior K8s Job for the same agent/task/session is still running (e.g. Paperclip restarted mid-run), attach to it and stream its output instead of deleting it and starting a new pod. Default: on.",
default: true,
},
// Resource Limits
{
type: "text",
+226 -2
View File
@@ -1,5 +1,44 @@
import { describe, it, expect } from "vitest";
import { isK8s404, buildPartialRunError } from "./execute.js";
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import type * as k8s from "@kubernetes/client-node";
import type { Writable } from "node:stream";
// Mock the K8s client before importing execute so streamPodLogsOnce picks up
// the mocked getLogApi. The mock's logApi.log never resolves, simulating the
// FAR-10 hang: K8s API drops the connection but the client awaits forever.
const mockLogFn = vi.fn();
vi.mock("./k8s-client.js", () => ({
getLogApi: () => ({ log: mockLogFn }),
getBatchApi: () => ({}),
getCoreApi: () => ({}),
getAuthzApi: () => ({}),
getSelfPodInfo: vi.fn(),
resetCache: vi.fn(),
}));
const { isK8s404, buildPartialRunError, isReattachableOrphan, describePodTerminatedError, streamPodLogsOnce } = await import("./execute.js");
function makeJob(opts: {
runId?: string;
agentId?: string;
taskId?: string;
sessionId?: string;
adapterType?: string;
terminal?: boolean;
}): k8s.V1Job {
const labels: Record<string, string> = {
"paperclip.io/adapter-type": opts.adapterType ?? "claude_k8s",
};
if (opts.agentId) labels["paperclip.io/agent-id"] = opts.agentId;
if (opts.runId) labels["paperclip.io/run-id"] = opts.runId;
if (opts.taskId) labels["paperclip.io/task-id"] = opts.taskId;
if (opts.sessionId) labels["paperclip.io/session-id"] = opts.sessionId;
return {
metadata: { name: "ac-job", namespace: "paperclip", labels },
status: opts.terminal
? { conditions: [{ type: "Complete", status: "True" }] }
: { conditions: [] },
} as k8s.V1Job;
}
describe("isK8s404", () => {
it("returns false for non-Error values", () => {
@@ -106,3 +145,188 @@ describe("buildPartialRunError", () => {
expect(msg).toBe("Claude exited with code 1: real error line");
});
});
describe("isReattachableOrphan", () => {
const agentId = "agent-abc";
const taskId = "task-xyz";
const sessionId = "sess-123";
it("returns true when agent/task/session all match and Job is not terminal", () => {
const job = makeJob({ agentId, taskId, sessionId, runId: "old-run" });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(true);
});
it("returns false when the Job is already Complete", () => {
const job = makeJob({ agentId, taskId, sessionId, runId: "old-run", terminal: true });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when expected taskId is null (caller couldn't derive one)", () => {
const job = makeJob({ agentId, taskId, sessionId });
expect(isReattachableOrphan(job, { agentId, taskId: null, sessionId })).toBe(false);
});
it("returns false when expected sessionId is null", () => {
const job = makeJob({ agentId, taskId, sessionId });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId: null })).toBe(false);
});
it("returns false when agent id doesn't match", () => {
const job = makeJob({ agentId: "agent-other", taskId, sessionId });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when task id doesn't match", () => {
const job = makeJob({ agentId, taskId: "task-other", sessionId });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when session id doesn't match", () => {
const job = makeJob({ agentId, taskId, sessionId: "sess-other" });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when the Job is from a different adapter type", () => {
const job = makeJob({ agentId, taskId, sessionId, adapterType: "claude_local" });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when Job has no task-id label (labels were introduced in FAR-124)", () => {
const job = makeJob({ agentId, sessionId });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
it("returns false when Job has no session-id label", () => {
const job = makeJob({ agentId, taskId });
expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false);
});
});
// Regression: FAR-10 — waitForPod must throw on phase=Failed, not return the pod name.
// These tests cover describePodTerminatedError, the helper that waitForPod uses to build
// the error message before throwing. Verifies that phase=Failed with no claude logs
// produces a structured, actionable error instead of silently entering the log-stream path.
describe("describePodTerminatedError", () => {
it("includes exit code and reason when claude container status is available", () => {
const cs = [
{
name: "claude",
state: { terminated: { exitCode: 137, reason: "OOMKilled" } },
},
] as k8s.V1ContainerStatus[];
const msg = describePodTerminatedError("mypod", "Failed", cs);
expect(msg).toContain("137");
expect(msg).toContain("OOMKilled");
expect(msg).toContain("phase=Failed");
});
it("falls back to message field when reason is absent", () => {
const cs = [
{
name: "claude",
state: { terminated: { exitCode: 1, message: "signal: killed" } },
},
] as k8s.V1ContainerStatus[];
const msg = describePodTerminatedError("mypod", "Failed", cs);
expect(msg).toContain("signal: killed");
expect(msg).toContain("1");
});
it("returns generic message when no claude container status is present", () => {
const msg = describePodTerminatedError("mypod", "Failed", []);
expect(msg).toBe("Pod mypod reached phase=Failed");
});
it("ignores non-claude containers", () => {
const cs = [
{
name: "sidecar",
state: { terminated: { exitCode: 0, reason: "Completed" } },
},
] as k8s.V1ContainerStatus[];
const msg = describePodTerminatedError("mypod", "Failed", cs);
expect(msg).toBe("Pod mypod reached phase=Failed");
});
it("handles null exitCode gracefully", () => {
const cs = [
{
name: "claude",
state: { terminated: { exitCode: null, reason: "Error" } },
},
] as unknown as k8s.V1ContainerStatus[];
const msg = describePodTerminatedError("mypod", "Failed", cs);
expect(msg).toContain("unknown");
expect(msg).toContain("Error");
});
});
// Regression: FAR-10 hardening — streamPodLogsOnce must not hang forever when
// the K8s client's logApi.log call never resolves. When stopSignal fires, the
// bail timer must force-return within LOG_STREAM_BAIL_TIMEOUT_MS (3s in the
// implementation) so execute() does not get stuck waiting for a dead stream.
describe("streamPodLogsOnce bail timer", () => {
beforeEach(() => {
mockLogFn.mockReset();
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("returns within the bail window when stopSignal fires during a hung log call", async () => {
// logApi.log never resolves — simulates the FAR-10 hang where the K8s
// response stream stalls without closing the connection.
mockLogFn.mockImplementation((_ns, _pod, _ctr, _writable: Writable) => {
return new Promise(() => { /* never resolves */ });
});
const stopSignal = { stopped: false };
const onLog = vi.fn().mockResolvedValue(undefined);
const resultPromise = streamPodLogsOnce(
"default",
"mypod",
onLog,
undefined,
undefined,
undefined,
stopSignal,
);
// Fire stopSignal; let the 200ms poller tick and start the bail timer.
stopSignal.stopped = true;
await vi.advanceTimersByTimeAsync(300);
// Advance past the 3s bail timeout. streamPodLogsOnce must now resolve
// with an empty string (no chunks were captured) rather than hanging.
await vi.advanceTimersByTimeAsync(3_100);
const result = await resultPromise;
expect(result).toBe("");
expect(mockLogFn).toHaveBeenCalledOnce();
});
it("returns promptly if logApi.log resolves before stopSignal fires (happy path, no bail involved)", async () => {
mockLogFn.mockImplementation(async (_ns, _pod, _ctr, _writable: Writable) => {
// Resolve immediately — normal log-stream completion.
return undefined;
});
const onLog = vi.fn().mockResolvedValue(undefined);
// No stopSignal → no bail machinery engaged.
const result = await streamPodLogsOnce(
"default",
"mypod",
onLog,
undefined,
undefined,
undefined,
undefined,
);
expect(result).toBe("");
expect(mockLogFn).toHaveBeenCalledOnce();
});
});
+342 -88
View File
@@ -1,5 +1,15 @@
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
import { asString, asNumber, asBoolean, parseObject } from "@paperclipai/adapter-utils/server-utils";
import {
asString,
asNumber,
asBoolean,
parseObject,
readPaperclipRuntimeSkillEntries,
resolvePaperclipDesiredSkillNames,
} from "@paperclipai/adapter-utils/server-utils";
import fs from "node:fs/promises";
import path from "node:path";
import { prepareClaudePromptBundle } from "./prompt-cache.js";
import {
parseClaudeStreamJson,
describeClaudeFailure,
@@ -7,7 +17,7 @@ import {
isClaudeUnknownSessionError,
} from "./parse.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
import { buildJobManifest } from "./job-manifest.js";
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import type * as k8s from "@kubernetes/client-node";
import { Writable } from "node:stream";
@@ -16,6 +26,15 @@ const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
// How long to keep refreshing onSpawn after the Job reaches a terminal state.
// Covers the cleanup path (delete job, parse stdout) so a slow K8s API call
// doesn't trip the 5-minute reaper staleness window.
const POST_TERMINAL_KEEPALIVE_MS = 90_000;
// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires
// before force-returning, even if logApi.log has not yet resolved. Defensive
// against the K8s client library not propagating writable.destroy() into an
// abort of the underlying HTTP request.
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
@@ -70,6 +89,53 @@ export function buildPartialRunError(
: `Claude exited with code ${exitCode ?? -1}`;
}
/**
* Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does
* not match the current runId) as a potential reattach target. A Job is
* reattachable when it belongs to the same agent, same task, and same resume
* session as the current run — meaning the previous Paperclip instance was
* mid-stream on the exact piece of work this new run was dispatched to do.
* Exported for unit tests.
*/
export function isReattachableOrphan(
job: k8s.V1Job,
expected: { agentId: string; taskId: string | null; sessionId: string | null },
): boolean {
if (!expected.taskId || !expected.sessionId) return false;
const labels = job.metadata?.labels ?? {};
if (labels["paperclip.io/adapter-type"] !== "claude_k8s") return false;
if (labels["paperclip.io/agent-id"] !== expected.agentId) return false;
if (labels["paperclip.io/task-id"] !== expected.taskId) return false;
if (labels["paperclip.io/session-id"] !== expected.sessionId) return false;
const conditions = job.status?.conditions ?? [];
const terminal = conditions.some(
(c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True",
);
if (terminal) return false;
return true;
}
/**
* Build an error message for a pod that reached phase=Failed before or
* instead of streaming logs. Includes the claude container's terminated exit
* code and reason when available so operators can diagnose crashes without
* needing kubectl. Exported for unit tests.
*/
export function describePodTerminatedError(
podName: string,
phase: string,
containerStatuses: k8s.V1ContainerStatus[],
): string {
const mainCs = containerStatuses.find((cs) => cs.name === "claude");
const terminated = mainCs?.state?.terminated;
if (terminated) {
const code = terminated.exitCode ?? "unknown";
const reason = terminated.reason ?? terminated.message ?? "no reason";
return `Pod ${podName} reached phase=${phase}: claude exited ${code} (${reason})`;
}
return `Pod ${podName} reached phase=${phase}`;
}
/**
* Wait for the Job's pod to reach a terminal or running state.
* Returns the pod name once logs can be streamed, or throws on failure.
@@ -121,15 +187,22 @@ async function waitForPod(
for (const cs of containerStatuses) {
if (cs.state?.waiting) details.push(`${cs.name}: waiting (${cs.state.waiting.reason ?? "unknown"})`);
else if (cs.state?.running) details.push(`${cs.name}: running`);
else if (cs.state?.terminated) details.push(`${cs.name}: terminated (exit ${cs.state.terminated.exitCode ?? "?"}, ${cs.state.terminated.reason ?? "no reason"})`);
}
await onLog("stdout", `[paperclip] Pod ${podName}: ${details.join(", ")}\n`);
lastStatus = statusKey;
}
// Ready to stream logs
if (phase === "Running" || phase === "Succeeded" || phase === "Failed") {
if (phase === "Running" || phase === "Succeeded") {
return podName;
}
// phase=Failed means the pod crashed before we could stream logs.
// Throwing here routes the caller into the error path with a structured
// message instead of entering the log-streaming path with a dead pod.
if (phase === "Failed") {
throw new Error(describePodTerminatedError(podName, phase, containerStatuses));
}
// Init containers done + main running (phase may still say Pending briefly)
const allInitsDone = initStatuses.length > 0 && initStatuses.every(
@@ -185,13 +258,14 @@ async function waitForPod(
* Stream pod logs once via follow. Returns accumulated stdout when the
* stream ends (container exit, API disconnect, or abort signal).
*/
async function streamPodLogsOnce(
export async function streamPodLogsOnce(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
sinceSeconds?: number,
dedup?: LogLineDedupFilter,
stopSignal?: { stopped: boolean },
): Promise<string> {
const logApi = getLogApi(kubeconfigPath);
const chunks: string[] = [];
@@ -209,15 +283,48 @@ async function streamPodLogsOnce(
},
});
// When the job completion signal fires, destroy the writable to abort the
// in-flight follow stream. Without this, logApi.log can hang indefinitely
// when the pod terminates without closing the HTTP connection cleanly.
let stopPoller: ReturnType<typeof setInterval> | null = null;
let bailTimer: ReturnType<typeof setTimeout> | null = null;
let bailResolve: (() => void) | null = null;
// Bail promise resolves LOG_STREAM_BAIL_TIMEOUT_MS after stopSignal fires,
// even if logApi.log has not resolved by then. This is a safety net for the
// case where writable.destroy() fails to propagate to an abort of the HTTP
// request (e.g. the K8s client is awaiting a response that never comes).
const bailPromise = new Promise<void>((resolve) => {
bailResolve = resolve;
});
if (stopSignal) {
stopPoller = setInterval(() => {
if (stopSignal.stopped) {
if (!writable.destroyed) writable.destroy();
if (!bailTimer && bailResolve) {
bailTimer = setTimeout(bailResolve, LOG_STREAM_BAIL_TIMEOUT_MS);
}
}
}, 200);
}
const logPromise = logApi.log(namespace, podName, "claude", writable, {
follow: true,
pretty: false,
...(sinceSeconds ? { sinceSeconds } : {}),
}).catch(() => {
// follow may fail if the container already exited, the API connection
// dropped, or we aborted via writable.destroy() — not fatal.
});
try {
await logApi.log(namespace, podName, "claude", writable, {
follow: true,
pretty: false,
...(sinceSeconds ? { sinceSeconds } : {}),
});
} catch {
// follow may fail if the container already exited or the API
// connection dropped — not fatal, caller decides whether to retry.
if (stopSignal) {
await Promise.race([logPromise, bailPromise]);
} else {
await logPromise;
}
} finally {
if (stopPoller) clearInterval(stopPoller);
if (bailTimer) clearTimeout(bailTimer);
}
return chunks.join("");
@@ -267,7 +374,7 @@ async function streamPodLogs(
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
@@ -411,10 +518,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
// still be running. We detect those by comparing the Job's run-id label against
// the current runId and clean them up so this execution can proceed.
// the current runId. When reattachOrphanedJobs is enabled and the orphan matches
// the current agent+task+session, we attach to it instead of deleting it (FAR-124).
const agentId = ctx.agent.id;
const selfPod = await getSelfPodInfo(kubeconfigPath);
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
const runtimeSessionParams = parseObject(runtime.sessionParams);
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null;
const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, "");
const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null;
let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null;
try {
const batchApi = getBatchApi(kubeconfigPath);
const existing = await batchApi.listNamespacedJob({
@@ -434,10 +549,42 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
(j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId,
);
if (orphaned.length > 0) {
const orphanNames = orphaned.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of orphaned) {
// Pick the most recent reattachable orphan — same agent + task + session,
// not terminal. Only one target is chosen; any other orphans get
// cleaned up as before.
if (reattachOrphanedJobs && orphaned.length > 0) {
const candidates = orphaned
.filter((j) =>
isReattachableOrphan(j, {
agentId,
taskId: currentTaskLabel,
sessionId: currentSessionLabel,
}),
)
.sort((a, b) => {
const at = new Date(a.metadata?.creationTimestamp ?? 0).getTime();
const bt = new Date(b.metadata?.creationTimestamp ?? 0).getTime();
return bt - at;
});
const chosen = candidates[0];
const chosenName = chosen?.metadata?.name;
if (chosen && chosenName) {
reattachTarget = {
jobName: chosenName,
namespace: chosen.metadata?.namespace ?? guardNamespace,
priorRunId: chosen.metadata?.labels?.["paperclip.io/run-id"] ?? "",
image: chosen.spec?.template?.spec?.containers?.[0]?.image ?? "unknown",
};
}
}
const toDelete = orphaned.filter(
(j) => !reattachTarget || j.metadata?.name !== reattachTarget.jobName,
);
if (toDelete.length > 0) {
const orphanNames = toDelete.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${toDelete.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of toDelete) {
const name = j.metadata?.name;
if (name) {
await cleanupJob(guardNamespace, name, onLog, kubeconfigPath);
@@ -475,84 +622,152 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
// Build Job manifest
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
ctx,
selfPod,
const coreApi = getCoreApi(kubeconfigPath);
const batchApi = getBatchApi(kubeconfigPath);
let jobName: string;
let namespace: string;
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
// Prepare the prompt bundle (skills + instructions) on the server filesystem.
// The K8s Job pod mounts the same PVC at /paperclip, so bundle paths written
// here are accessible inside the pod at the identical absolute path.
const skillEntries = await readPaperclipRuntimeSkillEntries(config, import.meta.dirname ?? __dirname);
const desiredSkillNames = new Set(resolvePaperclipDesiredSkillNames(config, skillEntries));
const desiredSkills = skillEntries.filter((e) => desiredSkillNames.has(e.key));
const instructionsFilePath = asString(config.instructionsFilePath, "").trim();
const instructionsFileDir = instructionsFilePath ? `${path.dirname(instructionsFilePath)}/` : "";
let instructionsContents: string | null = null;
if (instructionsFilePath) {
try {
const raw = await fs.readFile(instructionsFilePath, "utf-8");
const pathDirective =
`\nThe above agent instructions were loaded from ${instructionsFilePath}. ` +
`Resolve any relative file references from ${instructionsFileDir}. ` +
`This base directory is authoritative for sibling instruction files such as ` +
`./HEARTBEAT.md, ./SOUL.md, and ./TOOLS.md; do not resolve those from the parent agent directory.`;
instructionsContents = raw + pathDirective;
} catch (err) {
await onLog(
"stderr",
`[paperclip] Warning: could not read agent instructions file "${instructionsFilePath}": ${err instanceof Error ? err.message : String(err)}\n`,
);
}
}
const promptBundle = await prepareClaudePromptBundle({
companyId: ctx.agent.companyId,
skills: desiredSkills,
instructionsContents,
onLog,
});
// Report invocation metadata
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: claudeArgs,
commandNotes: [
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
context: ctx.context,
} as Parameters<typeof onMeta>[0]);
}
if (reattachTarget) {
jobName = reattachTarget.jobName;
namespace = reattachTarget.namespace;
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
const coreApi = getCoreApi(kubeconfigPath);
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
// Announce reattach metadata. Prompt and args aren't known here — they
// belong to the prior run that created this pod and are already present
// on the running container.
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: [],
commandNotes: [
`Image: ${reattachTarget.image}`,
`Namespace: ${namespace}`,
`Reattached from prior run: ${reattachTarget.priorRunId || "unknown"}`,
`Timeout: ${timeoutSec}s`,
],
prompt: "",
context: ctx.context,
} as Parameters<typeof onMeta>[0]);
}
await onLog("stdout", `[paperclip] Reattaching to in-flight K8s Job ${jobName} in namespace ${namespace} (prior run ${reattachTarget.priorRunId || "unknown"})\n`);
} else {
// Build Job manifest
const built = buildJobManifest({ ctx, selfPod, promptBundle });
const job = built.job;
jobName = built.jobName;
namespace = built.namespace;
const prompt = built.prompt;
const claudeArgs = built.claudeArgs;
const promptMetrics = built.promptMetrics;
promptSecret = built.promptSecret;
// Report invocation metadata
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: claudeArgs,
commandNotes: [
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
context: ctx.context,
} as Parameters<typeof onMeta>[0]);
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
},
},
stringData: promptSecret.data,
},
stringData: promptSecret.data,
},
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
};
}
}
// Create the Job
try {
await batchApi.createNamespacedJob({ namespace, body: job });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
errorCode: "k8s_job_create_failed",
};
}
}
// Create the Job
const batchApi = getBatchApi(kubeconfigPath);
try {
await batchApi.createNamespacedJob({ namespace, body: job });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
errorCode: "k8s_job_create_failed",
};
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
let stdout = "";
let exitCode: number | null = null;
let jobTimedOut = false;
@@ -566,8 +781,23 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
let podName: string;
try {
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
if (reattachTarget) {
// Pod is already running from the prior run — look it up directly.
const podList = await coreApi.listNamespacedPod({
namespace,
labelSelector: `job-name=${jobName}`,
});
const pod = podList.items[0];
const name = pod?.metadata?.name;
if (!name) {
throw new Error(`Reattach target Job ${jobName} has no pod`);
}
podName = name;
await onLog("stdout", `[paperclip] Reattached to pod ${podName}\n`);
} else {
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
}
// Notify the server that execution has started. This sets
// processStartedAt and refreshes updatedAt in the DB, which the
@@ -581,13 +811,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Pod scheduling failed: ${msg}\n`);
const phase = reattachTarget ? "reattach" : "scheduling";
await onLog("stderr", `[paperclip] Pod ${phase} failed: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Pod scheduling failed: ${msg}`,
errorCode: "k8s_pod_schedule_failed",
errorMessage: `Pod ${phase} failed: ${msg}`,
errorCode: reattachTarget ? "k8s_pod_reattach_failed" : "k8s_pod_schedule_failed",
};
}
@@ -621,11 +852,27 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let lastLogAt = Date.now();
let keepaliveTick = 0;
let keepaliveJobTerminal = false;
let keepaliveJobTerminalAt: number | null = null;
keepaliveTimer = setInterval(() => {
// Fire-and-forget the async work; setInterval callbacks must be
// synchronous or the timer will drift.
void (async () => {
if (keepaliveJobTerminal) return;
if (keepaliveJobTerminal) {
// Post-terminal window: keep refreshing onSpawn during cleanup
// (job deletion, log parsing, K8s API calls) so the reaper doesn't
// fire a false process_lost while execute() is still running.
if (
ctx.onSpawn &&
keepaliveJobTerminalAt !== null &&
Date.now() - keepaliveJobTerminalAt <= POST_TERMINAL_KEEPALIVE_MS
) {
keepaliveTick++;
if (keepaliveTick % 6 === 0) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
}
return;
}
// Verify the Job is still alive before announcing or refreshing.
try {
@@ -635,6 +882,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
);
if (terminal) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
if (ctx.onSpawn) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
return;
}
} catch (err: unknown) {
@@ -644,6 +895,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// window as a safety net.
if (isK8s404(err)) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
if (ctx.onSpawn) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
return;
}
// Log transient errors but leave keepaliveJobTerminal false so
@@ -826,8 +1081,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
})();
const runtimeSessionParams = parseObject(runtime.sessionParams);
const fallbackSessionId = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const fallbackSessionId = currentSessionIdRaw;
const resolvedSessionId = parsedStream.sessionId
?? (asString(parsed.session_id as string, fallbackSessionId) || fallbackSessionId);
const model = asString(config.model, "");
+114 -2
View File
@@ -1,6 +1,6 @@
import { describe, it, expect, beforeEach } from "vitest";
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
import { buildJobManifest, buildRtkSetupCommands } from "./job-manifest.js";
import { buildJobManifest, buildRtkSetupCommands, sanitizeLabelValue } from "./job-manifest.js";
import type { SelfPodInfo } from "./k8s-client.js";
function makeCtx(overrides: Partial<AdapterExecutionContext> = {}): AdapterExecutionContext {
@@ -136,6 +136,36 @@ describe("buildJobManifest", () => {
expect(job.metadata?.labels?.env).toBe("prod");
expect(job.metadata?.labels?.["paperclip.io/adapter-type"]).toBe("claude_k8s");
});
it("adds task-id label when context provides taskId", () => {
ctx.context = { taskId: "task-xyz-789" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBe("task-xyz-789");
});
it("falls back to issueId when taskId absent", () => {
ctx.context = { issueId: "issue-42" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBe("issue-42");
});
it("adds session-id label when runtime provides sessionId", () => {
ctx.runtime = { ...ctx.runtime, sessionId: "sess-abc-1234" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBe("sess-abc-1234");
});
it("reads sessionId from runtime.sessionParams when sessionId prop missing", () => {
ctx.runtime = { ...ctx.runtime, sessionParams: { sessionId: "sess-from-params" } };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBe("sess-from-params");
});
it("omits task-id and session-id labels when neither is provided", () => {
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBeUndefined();
expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBeUndefined();
});
});
describe("annotations", () => {
@@ -487,13 +517,66 @@ describe("buildJobManifest", () => {
expect(claudeArgs).toContain("--dangerously-skip-permissions");
});
it("adds --append-system-prompt-file when instructionsFilePath set", () => {
it("adds --append-system-prompt-file (config fallback) when instructionsFilePath set and no session", () => {
ctx.config = { instructionsFilePath: "/paperclip/instructions.md" };
const { claudeArgs } = buildJobManifest({ ctx, selfPod });
expect(claudeArgs).toContain("--append-system-prompt-file");
expect(claudeArgs).toContain("/paperclip/instructions.md");
});
it("omits --append-system-prompt-file on session resume (avoids token waste)", () => {
ctx.config = { instructionsFilePath: "/paperclip/instructions.md" };
ctx.runtime.sessionId = "sess_existing";
const { claudeArgs } = buildJobManifest({ ctx, selfPod });
expect(claudeArgs).not.toContain("--append-system-prompt-file");
});
it("adds --add-dir when promptBundle is provided", () => {
const promptBundle = {
bundleKey: "abc123",
rootDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
addDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
instructionsFilePath: null,
};
const { claudeArgs } = buildJobManifest({ ctx, selfPod, promptBundle });
expect(claudeArgs).toContain("--add-dir");
expect(claudeArgs).toContain(promptBundle.addDir);
});
it("uses bundle instructionsFilePath for --append-system-prompt-file when promptBundle provided", () => {
const promptBundle = {
bundleKey: "abc123",
rootDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
addDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
instructionsFilePath: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123/agent-instructions.md",
};
ctx.config = { instructionsFilePath: "/raw/path/AGENTS.md" };
const { claudeArgs } = buildJobManifest({ ctx, selfPod, promptBundle });
expect(claudeArgs).toContain("--append-system-prompt-file");
const idx = claudeArgs.indexOf("--append-system-prompt-file");
expect(claudeArgs[idx + 1]).toBe(promptBundle.instructionsFilePath);
expect(claudeArgs).not.toContain("/raw/path/AGENTS.md");
});
it("omits --append-system-prompt-file from bundle on session resume", () => {
const promptBundle = {
bundleKey: "abc123",
rootDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
addDir: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123",
instructionsFilePath: "/paperclip/instances/default/companies/co1/claude-prompt-cache/abc123/agent-instructions.md",
};
ctx.runtime.sessionId = "sess_existing";
const { claudeArgs } = buildJobManifest({ ctx, selfPod, promptBundle });
expect(claudeArgs).not.toContain("--append-system-prompt-file");
// --add-dir must still be present even on resume
expect(claudeArgs).toContain("--add-dir");
});
it("omits --add-dir when no promptBundle", () => {
const { claudeArgs } = buildJobManifest({ ctx, selfPod });
expect(claudeArgs).not.toContain("--add-dir");
});
it("appends extraArgs when configured", () => {
ctx.config = { extraArgs: ["--no-input", "--verbose"] };
const { claudeArgs } = buildJobManifest({ ctx, selfPod });
@@ -729,3 +812,32 @@ describe("buildJobManifest", () => {
});
});
});
describe("sanitizeLabelValue", () => {
it("passes through already-valid UUIDs and slugs", () => {
expect(sanitizeLabelValue("abc-123-def")).toBe("abc-123-def");
expect(sanitizeLabelValue("0d8b4472-c42c-4052-aab1-e32897909afa")).toBe("0d8b4472-c42c-4052-aab1-e32897909afa");
});
it("strips characters outside [a-zA-Z0-9._-]", () => {
expect(sanitizeLabelValue("task:xyz/123")).toBe("taskxyz123");
expect(sanitizeLabelValue("abc 123")).toBe("abc123");
});
it("trims leading/trailing non-alphanumeric characters", () => {
expect(sanitizeLabelValue("--abc--")).toBe("abc");
expect(sanitizeLabelValue("...123...")).toBe("123");
});
it("truncates to the configured maxLen", () => {
const long = "a".repeat(200);
const out = sanitizeLabelValue(long, 63);
expect(out?.length).toBe(63);
});
it("returns null when no alphanumeric characters remain", () => {
expect(sanitizeLabelValue("---")).toBeNull();
expect(sanitizeLabelValue("")).toBeNull();
expect(sanitizeLabelValue(" ")).toBeNull();
});
});
+34 -3
View File
@@ -10,6 +10,7 @@ import {
renderTemplate,
} from "@paperclipai/adapter-utils/server-utils";
import { createHash } from "node:crypto";
import type { ClaudePromptBundle } from "./prompt-cache.js";
/**
* Build the shell command prefix that installs a native Node.js PostToolUse
@@ -175,6 +176,8 @@ function parseKeyValueConfig(raw: unknown): Record<string, string> {
export interface JobBuildInput {
ctx: AdapterExecutionContext;
selfPod: SelfPodInfo;
/** Prepared prompt bundle (skills + instructions). When provided, --add-dir and --append-system-prompt-file use bundle paths. */
promptBundle?: ClaudePromptBundle | null;
}
/** When the prompt exceeds the env-var size limit, the manifest uses a
@@ -202,6 +205,17 @@ function sanitizeForK8sName(value: string, maxLen = 16): string {
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
}
/**
* Sanitize a string for use as a Kubernetes label value (RFC 1123 subset:
* `[a-zA-Z0-9]([-_.a-zA-Z0-9]*[a-zA-Z0-9])?`, max 63 chars). Returns `null`
* when no usable characters remain — the caller should omit the label.
*/
export function sanitizeLabelValue(value: string, maxLen = 63): string | null {
const cleaned = value.replace(/[^a-zA-Z0-9._-]/g, "").slice(0, maxLen);
const trimmed = cleaned.replace(/^[^a-zA-Z0-9]+/, "").replace(/[^a-zA-Z0-9]+$/, "");
return trimmed.length > 0 ? trimmed : null;
}
/**
* Build a short deterministic hash suffix from the raw inputs to avoid
* collisions when sanitized slugs happen to be identical.
@@ -316,7 +330,7 @@ function buildEnvVars(
}
export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const { ctx, selfPod } = input;
const { ctx, selfPod, promptBundle } = input;
const { runId, agent, runtime, config: rawConfig, context } = ctx;
const config = parseObject(rawConfig);
@@ -392,14 +406,22 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
};
// Build Claude CLI args
const instructionsFilePath = asString(config.instructionsFilePath, "").trim();
// Prefer the bundle's materialized instructions file over the raw config path.
// Never inject --append-system-prompt-file on session resumes — the instructions
// are already in the session cache and re-injecting wastes tokens.
const rawInstructionsFilePath = asString(config.instructionsFilePath, "").trim();
const effectiveInstructionsFilePath =
promptBundle?.instructionsFilePath ?? (rawInstructionsFilePath || null);
const claudeArgs = ["--print", "-", "--output-format", "stream-json", "--verbose"];
if (runtimeSessionId) claudeArgs.push("--resume", runtimeSessionId);
if (dangerouslySkipPermissions) claudeArgs.push("--dangerously-skip-permissions");
if (model) claudeArgs.push("--model", model);
if (effort) claudeArgs.push("--effort", effort);
if (maxTurns > 0) claudeArgs.push("--max-turns", String(maxTurns));
if (instructionsFilePath) claudeArgs.push("--append-system-prompt-file", instructionsFilePath);
if (effectiveInstructionsFilePath && !runtimeSessionId) {
claudeArgs.push("--append-system-prompt-file", effectiveInstructionsFilePath);
}
if (promptBundle) claudeArgs.push("--add-dir", promptBundle.addDir);
if (extraArgs.length > 0) claudeArgs.push(...extraArgs);
// Build env vars
@@ -428,6 +450,15 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
"paperclip.io/company-id": agent.companyId,
"paperclip.io/adapter-type": "claude_k8s",
};
// Reattach-target labels: let a future execute() identify this Job as the
// continuation of the same logical unit of work (same task + same resume
// session) so it can attach to the running pod across a Paperclip restart
// instead of deleting it and starting over (FAR-124).
const taskIdRaw = asString(context.taskId, "") || asString(context.issueId, "");
const taskLabel = taskIdRaw ? sanitizeLabelValue(taskIdRaw) : null;
if (taskLabel) labels["paperclip.io/task-id"] = taskLabel;
const sessionLabel = runtimeSessionId ? sanitizeLabelValue(runtimeSessionId) : null;
if (sessionLabel) labels["paperclip.io/session-id"] = sessionLabel;
for (const [key, value] of Object.entries(extraLabels)) {
labels[key] = value;
}
+150
View File
@@ -0,0 +1,150 @@
import { constants as fsConstants } from "node:fs";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { createHash } from "node:crypto";
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
import {
type PaperclipSkillEntry,
ensurePaperclipSkillSymlink,
} from "@paperclipai/adapter-utils/server-utils";
export interface ClaudePromptBundle {
bundleKey: string;
/** Absolute path to the bundle root directory (contains .claude/skills/ and agent-instructions.md). */
rootDir: string;
/** Value to pass as --add-dir to the Claude CLI. */
addDir: string;
/** Path to the materialized instructions file, or null if no instructions were provided. */
instructionsFilePath: string | null;
}
const DEFAULT_PAPERCLIP_INSTANCE_ID = "default";
function resolveManagedClaudePromptCacheRoot(companyId: string): string {
const paperclipHome =
(typeof process.env.PAPERCLIP_HOME === "string" && process.env.PAPERCLIP_HOME.trim().length > 0
? process.env.PAPERCLIP_HOME.trim()
: null) ??
path.resolve(os.homedir(), ".paperclip");
const instanceId =
(typeof process.env.PAPERCLIP_INSTANCE_ID === "string" && process.env.PAPERCLIP_INSTANCE_ID.trim().length > 0
? process.env.PAPERCLIP_INSTANCE_ID.trim()
: null) ?? DEFAULT_PAPERCLIP_INSTANCE_ID;
return path.resolve(paperclipHome, "instances", instanceId, "companies", companyId, "claude-prompt-cache");
}
async function hashPathContents(
candidate: string,
hash: ReturnType<typeof createHash>,
relativePath: string,
seenDirectories: Set<string>,
): Promise<void> {
const stat = await fs.lstat(candidate);
if (stat.isSymbolicLink()) {
hash.update(`symlink:${relativePath}\n`);
const resolved = await fs.realpath(candidate).catch(() => null);
if (!resolved) {
hash.update("missing\n");
return;
}
await hashPathContents(resolved, hash, relativePath, seenDirectories);
return;
}
if (stat.isDirectory()) {
const realDir = await fs.realpath(candidate).catch(() => candidate);
hash.update(`dir:${relativePath}\n`);
if (seenDirectories.has(realDir)) {
hash.update("loop\n");
return;
}
seenDirectories.add(realDir);
const entries = await fs.readdir(candidate, { withFileTypes: true });
entries.sort((a, b) => a.name.localeCompare(b.name));
for (const entry of entries) {
const childRelativePath = relativePath.length > 0 ? `${relativePath}/${entry.name}` : entry.name;
await hashPathContents(path.join(candidate, entry.name), hash, childRelativePath, seenDirectories);
}
return;
}
if (stat.isFile()) {
hash.update(`file:${relativePath}\n`);
hash.update(await fs.readFile(candidate));
hash.update("\n");
return;
}
hash.update(`other:${relativePath}:${stat.mode}\n`);
}
async function buildClaudePromptBundleKey(input: {
skills: PaperclipSkillEntry[];
instructionsContents: string | null;
}): Promise<string> {
const hash = createHash("sha256");
hash.update("paperclip-claude-prompt-bundle:v1\n");
if (input.instructionsContents) {
hash.update("instructions\n");
hash.update(input.instructionsContents);
hash.update("\n");
} else {
hash.update("instructions:none\n");
}
const sortedSkills = [...input.skills].sort((a, b) => a.runtimeName.localeCompare(b.runtimeName));
for (const entry of sortedSkills) {
hash.update(`skill:${entry.key}:${entry.runtimeName}\n`);
await hashPathContents(entry.source, hash, entry.runtimeName, new Set());
}
return hash.digest("hex");
}
async function ensureReadableFile(targetPath: string, contents: string): Promise<void> {
try {
await fs.access(targetPath, fsConstants.R_OK);
return;
} catch {
// Fall through and materialize the file.
}
await fs.mkdir(path.dirname(targetPath), { recursive: true });
const tempPath = `${targetPath}.${process.pid}.${Date.now()}.tmp`;
try {
await fs.writeFile(tempPath, contents, "utf8");
await fs.rename(tempPath, targetPath);
} catch (err) {
const targetReadable = await fs.access(targetPath, fsConstants.R_OK).then(() => true).catch(() => false);
if (!targetReadable) throw err;
} finally {
await fs.rm(tempPath, { force: true }).catch(() => {});
}
}
export async function prepareClaudePromptBundle(input: {
companyId: string;
skills: PaperclipSkillEntry[];
instructionsContents: string | null;
onLog: AdapterExecutionContext["onLog"];
}): Promise<ClaudePromptBundle> {
const { companyId, skills, instructionsContents, onLog } = input;
const bundleKey = await buildClaudePromptBundleKey({ skills, instructionsContents });
const rootDir = path.join(resolveManagedClaudePromptCacheRoot(companyId), bundleKey);
const skillsHome = path.join(rootDir, ".claude", "skills");
await fs.mkdir(skillsHome, { recursive: true });
for (const entry of skills) {
const target = path.join(skillsHome, entry.runtimeName);
try {
await ensurePaperclipSkillSymlink(entry.source, target);
} catch (err) {
await onLog(
"stderr",
`[paperclip] Failed to materialize Claude skill "${entry.key}" into ${skillsHome}: ${err instanceof Error ? err.message : String(err)}\n`,
);
}
}
const instructionsFilePath = instructionsContents ? path.join(rootDir, "agent-instructions.md") : null;
if (instructionsFilePath && instructionsContents) {
await ensureReadableFile(instructionsFilePath, instructionsContents);
}
return { bundleKey, rootDir, addDir: rootDir, instructionsFilePath };
}
+1 -1
View File
@@ -33,7 +33,7 @@ async function buildK8sSkillSnapshot(
sourcePath: entry.source,
targetPath: null,
detail: desiredSet.has(entry.key)
? "Injected via prompt bundle into ephemeral K8s Job pods."
? "Materialized into the PVC-backed Claude prompt bundle before each K8s Job run."
: null,
required: Boolean(entry.required),
requiredReason: entry.requiredReason ?? null,