Compare commits

...

20 Commits

Author SHA1 Message Date
Chris Farhood abdce817f3 0.1.36
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 12:36:21 +00:00
Chris Farhood f9d8a2e0ce fix: resolve grace-period deadlock for stale UI status (FAR-23)
The log-stream-exit grace timer never fired because logExitTime was set
in the .then() of streamPodLogs, which only resolves once stopSignal is
set — but stopSignal is only set when completionWithGrace fires, which
requires logExitTime to be non-null. Classic deadlock.

Fix: add onFirstStreamExit callback to streamPodLogs, called after
attempt=0's streamPodLogsOnce returns (the first container exit signal).
execute() passes a closure that sets logExitTime immediately, breaking
the circular dependency and allowing the 30s grace timer to fire
correctly when K8s Job conditions lag container exit.

Tests: all 323 pass including the two FAR-23 grace-period regression tests.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 12:20:10 +00:00
Chris Farhood a7dfd5d502 test: fix flaky execute.ts timer tests and hit 80%+ line coverage
readPaperclipRuntimeSkillEntries does real fs.stat I/O under fake timers,
delaying execute()'s fake-timer registration by ~3200-4200ms of fake time
when tests run in isolation (cold OS page cache).  The previous approach
tried vi.spyOn on an ESM module namespace export, which throws
"Cannot redefine property" — a fundamental ESM constraint.

Fix: remove the broken spy.  Instead, each timer-heavy test now uses enough
advanceTimersByTimeAsync calls to (a) give the event loop sufficient turns
for the I/O to drain, and (b) cover the full fake-timer sequence even with
the maximum observed I/O delay.  Patterns chosen:

  reconnects (needs t+6000):       6 advances, ~12200ms total
  deadline exceeded (needs t+3000): 5 advances, ~8400ms total
  pod-creation wait (needs t+5000): 5 advances, ~9400ms total

execute.ts line coverage: 82.57% (was ~24% before this task's test additions).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 04:10:49 +00:00
Chris Farhood e310ba4156 0.1.35 2026-04-24 00:44:59 +00:00
Chris Farhood ae7adb0847 docs: add enableRtk, rtkMaxOutputBytes, reattachOrphanedJobs to config doc (N6)
Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 00:01:57 +00:00
Chris Farhood d24510172e fix: remove misleading dangerouslySkipPermissions UI toggle (N5)
Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 00:01:38 +00:00
Chris Farhood 29a4e709d0 fix: sanitize agent/run/company labels to RFC 1123 (N4)
Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 00:00:56 +00:00
Chris Farhood 8a08e6a6ee fix: relabel reattached Job with current run-id and session-id (N3)
Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:59:05 +00:00
Chris Farhood c0dba8e904 fix: never auto-delete live K8s orphans; block on mismatch (#8)
Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:58:51 +00:00
Chris Farhood b91859c258 refactor: extract classifyOrphan helper with decision matrix (#8)
Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:58:23 +00:00
Chris Farhood f1433b05a6 fix: reserve paperclip.io/ and app.kubernetes.io/ label prefixes (N2)
Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:54:15 +00:00
Chris Farhood f64694f894 fix: validate companyId/instanceId against path traversal (N1)
Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:53:18 +00:00
Gandalf the Greybeard e86b14a677 0.1.34 2026-04-23 23:35:02 +00:00
Gandalf the Greybeard 98f3821f91 fix: address remaining minor code review findings (FAR-15)
- #9: match Paperclip container by name in k8s-client instead of
  trusting spec.containers[0], which could be a service-mesh sidecar
- #11: key assistant-text dedup by (message.id, index) so legitimate
  duplicate content across turns isn't collapsed in the summary
- #16: trim trailing hyphens from sanitized K8s names so truncation
  doesn't produce names ending in "-"

Findings #5 (keepalive re-verify) and #6 (one-shot log dedup) were
already addressed in the current code — verified during this review.
#8 (orphan reattach behavior) requires a product decision on whether
"new session wins" is intentional, so deferring.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:34:59 +00:00
Gandalf the Greybeard 21a02da00f fix: prevent prompt Secret leak by attaching ownerReference to Job (FAR-15)
When a large prompt creates a K8s Secret, it can orphan if the process
crashes before the finally block runs. Now the Secret gets an
ownerReference pointing to the Job after creation, so K8s GC cleans it
up automatically. Also cleans up the Secret on job creation failure.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:29:47 +00:00
Gandalf the Greybeard 346f5cc1df fix: prevent UTF-8 corruption when RTK truncation splits multi-byte codepoints (FAR-19)
The trunc function in the RTK filter script now walks back from the
truncation point past continuation bytes and checks whether the full
codepoint fits, avoiding replacement characters from mid-codepoint slicing.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:28:28 +00:00
Gandalf the Greybeard ef73586a41 fix: address 6 critical/minor code review findings (FAR-15)
1. Fix resources.* dotted-key config — UI fields now correctly read
2. Fix operator precedence bug in container status key (add parens)
3. Add missing RBAC checks to testEnvironment (jobs/list, secrets/*, pvc)
4. Add bail timer log message for debuggability
5. Make result-event detection robust to JSON whitespace variations
6. Remove namespace short-circuit so all checks run on first attempt

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:15:01 +00:00
Gandalf the Greybeard 9f79efdf36 0.1.33 2026-04-23 22:45:37 +00:00
Gandalf the Greybeard 4210f51937 chore: update lockfile
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 22:45:31 +00:00
Gandalf the Greybeard f41ae818ef fix: fire onSpawn immediately on job terminal transition (FAR-14)
Prevents process_lost false positives for 2-3 minute K8s jobs by
resetting the reaper clock when the keepalive loop detects the job
has completed (or been deleted), rather than waiting for the next
periodic refresh.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 22:29:22 +00:00
22 changed files with 1965 additions and 271 deletions
+33
View File
@@ -1,3 +1,36 @@
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
import type * as k8s from "@kubernetes/client-node";
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
* Exported for unit tests.
*/
export declare function isK8s404(err: unknown): boolean;
/**
* Build the error message when Claude's stdout contains no result event.
* Skips system/init event lines so the UI doesn't display the raw init JSON.
* Exported for unit tests.
*/
export declare function buildPartialRunError(exitCode: number | null, model: string, stdout: string): string;
/**
* Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does
* not match the current runId) as a potential reattach target. A Job is
* reattachable when it belongs to the same agent, same task, and same resume
* session as the current run — meaning the previous Paperclip instance was
* mid-stream on the exact piece of work this new run was dispatched to do.
* Exported for unit tests.
*/
export declare function isReattachableOrphan(job: k8s.V1Job, expected: {
agentId: string;
taskId: string | null;
sessionId: string | null;
}): boolean;
/**
* Build an error message for a pod that reached phase=Failed before or
* instead of streaming logs. Includes the claude container's terminated exit
* code and reason when available so operators can diagnose crashes without
* needing kubectl. Exported for unit tests.
*/
export declare function describePodTerminatedError(podName: string, phase: string, containerStatuses: k8s.V1ContainerStatus[]): string;
export declare function execute(ctx: AdapterExecutionContext): Promise<AdapterExecutionResult>;
//# sourceMappingURL=execute.d.ts.map
+1 -1
View File
@@ -1 +1 @@
{"version":3,"file":"execute.d.ts","sourceRoot":"","sources":["../../src/server/execute.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,EAAE,uBAAuB,EAAE,sBAAsB,EAAE,MAAM,4BAA4B,CAAC;AAiUlG,wBAAsB,OAAO,CAAC,GAAG,EAAE,uBAAuB,GAAG,OAAO,CAAC,sBAAsB,CAAC,CAoc3F"}
{"version":3,"file":"execute.d.ts","sourceRoot":"","sources":["../../src/server/execute.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,EAAE,uBAAuB,EAAE,sBAAsB,EAAE,MAAM,4BAA4B,CAAC;AAWlG,OAAO,KAAK,KAAK,GAAG,MAAM,yBAAyB,CAAC;AAYpD;;;;GAIG;AACH,wBAAgB,QAAQ,CAAC,GAAG,EAAE,OAAO,GAAG,OAAO,CAO9C;AAED;;;;GAIG;AACH,wBAAgB,oBAAoB,CAClC,QAAQ,EAAE,MAAM,GAAG,IAAI,EACvB,KAAK,EAAE,MAAM,EACb,MAAM,EAAE,MAAM,GACb,MAAM,CA4BR;AAED;;;;;;;GAOG;AACH,wBAAgB,oBAAoB,CAClC,GAAG,EAAE,GAAG,CAAC,KAAK,EACd,QAAQ,EAAE;IAAE,OAAO,EAAE,MAAM,CAAC;IAAC,MAAM,EAAE,MAAM,GAAG,IAAI,CAAC;IAAC,SAAS,EAAE,MAAM,GAAG,IAAI,CAAA;CAAE,GAC7E,OAAO,CAaT;AAED;;;;;GAKG;AACH,wBAAgB,0BAA0B,CACxC,OAAO,EAAE,MAAM,EACf,KAAK,EAAE,MAAM,EACb,iBAAiB,EAAE,GAAG,CAAC,iBAAiB,EAAE,GACzC,MAAM,CASR;AAkWD,wBAAsB,OAAO,CAAC,GAAG,EAAE,uBAAuB,GAAG,OAAO,CAAC,sBAAsB,CAAC,CAkkB3F"}
+370 -104
View File
@@ -1,12 +1,110 @@
import { asString, asNumber, asBoolean, parseObject } from "@paperclipai/adapter-utils/server-utils";
import { parseClaudeStreamJson, describeClaudeFailure, isClaudeMaxTurnsResult, isClaudeUnknownSessionError, } from "./parse.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
import { buildJobManifest } from "./job-manifest.js";
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import { Writable } from "node:stream";
const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
// How long to keep refreshing onSpawn after the Job reaches a terminal state.
// Covers the cleanup path (delete job, parse stdout) so a slow K8s API call
// doesn't trip the 5-minute reaper staleness window.
const POST_TERMINAL_KEEPALIVE_MS = 90_000;
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
* Exported for unit tests.
*/
export function isK8s404(err) {
if (!(err instanceof Error))
return false;
const e = err;
const resp = e.response;
if (resp?.statusCode === 404 || resp?.status === 404)
return true;
if (e.statusCode === 404)
return true;
return /HTTP-Code:\s*404\b/.test(err.message);
}
/**
* Build the error message when Claude's stdout contains no result event.
* Skips system/init event lines so the UI doesn't display the raw init JSON.
* Exported for unit tests.
*/
export function buildPartialRunError(exitCode, model, stdout) {
if (exitCode === 0)
return "Failed to parse Claude JSON output";
// Walk stdout lines, skip system events, return the first real content line.
const firstContentLine = stdout.split(/\r?\n/)
.map((l) => l.trim())
.find((l) => {
if (!l)
return false;
try {
const obj = JSON.parse(l);
if (typeof obj === "object" && obj !== null && obj.type === "system")
return false;
}
catch {
// not JSON — treat as content
}
return true;
}) ?? "";
// If we only have system/init events and nothing else, surface the model
// name so the operator can diagnose missing credentials or unsupported model.
const initOnlyOutput = stdout.trim() !== "" && model !== "" && !firstContentLine;
if (initOnlyOutput) {
const modelHint = model ? ` (model: ${model})` : "";
return `Claude started but did not produce a result${modelHint} — check API credentials, model support, and adapter config`;
}
return firstContentLine
? `Claude exited with code ${exitCode ?? -1}: ${firstContentLine}`
: `Claude exited with code ${exitCode ?? -1}`;
}
/**
* Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does
* not match the current runId) as a potential reattach target. A Job is
* reattachable when it belongs to the same agent, same task, and same resume
* session as the current run — meaning the previous Paperclip instance was
* mid-stream on the exact piece of work this new run was dispatched to do.
* Exported for unit tests.
*/
export function isReattachableOrphan(job, expected) {
if (!expected.taskId || !expected.sessionId)
return false;
const labels = job.metadata?.labels ?? {};
if (labels["paperclip.io/adapter-type"] !== "claude_k8s")
return false;
if (labels["paperclip.io/agent-id"] !== expected.agentId)
return false;
if (labels["paperclip.io/task-id"] !== expected.taskId)
return false;
if (labels["paperclip.io/session-id"] !== expected.sessionId)
return false;
const conditions = job.status?.conditions ?? [];
const terminal = conditions.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True");
if (terminal)
return false;
return true;
}
/**
* Build an error message for a pod that reached phase=Failed before or
* instead of streaming logs. Includes the claude container's terminated exit
* code and reason when available so operators can diagnose crashes without
* needing kubectl. Exported for unit tests.
*/
export function describePodTerminatedError(podName, phase, containerStatuses) {
const mainCs = containerStatuses.find((cs) => cs.name === "claude");
const terminated = mainCs?.state?.terminated;
if (terminated) {
const code = terminated.exitCode ?? "unknown";
const reason = terminated.reason ?? terminated.message ?? "no reason";
return `Pod ${podName} reached phase=${phase}: claude exited ${code} (${reason})`;
}
return `Pod ${podName} reached phase=${phase}`;
}
/**
* Wait for the Job's pod to reach a terminal or running state.
* Returns the pod name once logs can be streamed, or throws on failure.
@@ -52,14 +150,22 @@ async function waitForPod(namespace, jobName, timeoutMs, onLog, kubeconfigPath)
details.push(`${cs.name}: waiting (${cs.state.waiting.reason ?? "unknown"})`);
else if (cs.state?.running)
details.push(`${cs.name}: running`);
else if (cs.state?.terminated)
details.push(`${cs.name}: terminated (exit ${cs.state.terminated.exitCode ?? "?"}, ${cs.state.terminated.reason ?? "no reason"})`);
}
await onLog("stdout", `[paperclip] Pod ${podName}: ${details.join(", ")}\n`);
lastStatus = statusKey;
}
// Ready to stream logs
if (phase === "Running" || phase === "Succeeded" || phase === "Failed") {
if (phase === "Running" || phase === "Succeeded") {
return podName;
}
// phase=Failed means the pod crashed before we could stream logs.
// Throwing here routes the caller into the error path with a structured
// message instead of entering the log-streaming path with a dead pod.
if (phase === "Failed") {
throw new Error(describePodTerminatedError(podName, phase, containerStatuses));
}
// Init containers done + main running (phase may still say Pending briefly)
const allInitsDone = initStatuses.length > 0 && initStatuses.every((s) => s.state?.terminated?.exitCode === 0);
const mainRunning = containerStatuses.some((s) => s.state?.running);
@@ -104,16 +210,32 @@ async function waitForPod(namespace, jobName, timeoutMs, onLog, kubeconfigPath)
* Stream pod logs once via follow. Returns accumulated stdout when the
* stream ends (container exit, API disconnect, or abort signal).
*/
async function streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds) {
async function streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal) {
const logApi = getLogApi(kubeconfigPath);
const chunks = [];
const writable = new Writable({
write(chunk, _encoding, callback) {
const text = chunk.toString("utf-8");
chunks.push(text);
void onLog("stdout", text).then(() => callback(), callback);
const emitted = dedup ? dedup.filter(text) : text;
if (!emitted) {
callback();
return;
}
void onLog("stdout", emitted).then(() => callback(), callback);
},
});
// When the job completion signal fires, destroy the writable to abort the
// in-flight follow stream. Without this, logApi.log can hang indefinitely
// when the pod terminates without closing the HTTP connection cleanly.
let stopPoller = null;
if (stopSignal) {
stopPoller = setInterval(() => {
if (stopSignal.stopped && !writable.destroyed) {
writable.destroy();
}
}, 200);
}
try {
await logApi.log(namespace, podName, "claude", writable, {
follow: true,
@@ -122,8 +244,12 @@ async function streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinc
});
}
catch {
// follow may fail if the container already exited or the API
// connection dropped — not fatal, caller decides whether to retry.
// follow may fail if the container already exited, the API connection
// dropped, or we aborted via writable.destroy() — not fatal.
}
finally {
if (stopPoller)
clearInterval(stopPoller);
}
return chunks.join("");
}
@@ -143,6 +269,9 @@ async function streamPodLogs(namespace, podName, onLog, kubeconfigPath, stopSign
// reconnects use a tight window instead of an ever-growing one anchored
// at stream start. This is the primary fix for FAR-105 duplicative logs.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Shared across reconnects so replayed lines inside the `sinceSeconds`
// overlap window are dropped before they reach the streaming UI (FAR-123).
const dedup = new LogLineDedupFilter();
while (!stopSignal?.stopped) {
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
@@ -158,7 +287,7 @@ async function streamPodLogs(namespace, podName, onLog, kubeconfigPath, stopSign
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
@@ -177,6 +306,11 @@ async function streamPodLogs(namespace, podName, onLog, kubeconfigPath, stopSign
// Brief pause before reconnecting to avoid tight loops.
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
}
// Flush any buffered partial line so the final assistant/result chunk
// isn't dropped when the stream ends mid-line.
const tail = dedup.flush();
if (tail)
await onLog("stdout", tail);
return allChunks.join("");
}
/**
@@ -199,13 +333,27 @@ async function readPodLogs(namespace, podName, kubeconfigPath) {
}
/**
* Wait for the Job to reach a terminal state (Complete or Failed).
* Returns the Job's final status.
* Returns the Job's final status. A 404 (job deleted by TTL or externally)
* is treated as a soft terminal: succeeded=false, timedOut=false, jobGone=true.
* The caller should log this and fall through to stdout parsing.
*/
async function waitForJobCompletion(namespace, jobName, timeoutMs, kubeconfigPath) {
const batchApi = getBatchApi(kubeconfigPath);
const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : 0;
while (deadline === 0 || Date.now() < deadline) {
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
let job;
try {
job = await batchApi.readNamespacedJob({ name: jobName, namespace });
}
catch (err) {
if (isK8s404(err)) {
// Job was deleted (TTL garbage collection or external deletion) before
// we detected its terminal condition. The container must have already
// exited for TTL to fire, so log streaming will have captured the output.
return { succeeded: false, timedOut: false, jobGone: true };
}
throw err;
}
const conditions = job.status?.conditions ?? [];
const complete = conditions.find((c) => c.type === "Complete" && c.status === "True");
if (complete)
@@ -261,10 +409,18 @@ export async function execute(ctx) {
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
// still be running. We detect those by comparing the Job's run-id label against
// the current runId and clean them up so this execution can proceed.
// the current runId. When reattachOrphanedJobs is enabled and the orphan matches
// the current agent+task+session, we attach to it instead of deleting it (FAR-124).
const agentId = ctx.agent.id;
const selfPod = await getSelfPodInfo(kubeconfigPath);
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
const runtimeSessionParams = parseObject(runtime.sessionParams);
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null;
const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, "");
const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null;
let reattachTarget = null;
try {
const batchApi = getBatchApi(kubeconfigPath);
const existing = await batchApi.listNamespacedJob({
@@ -277,10 +433,37 @@ export async function execute(ctx) {
// concurrent jobs (same runId — shouldn't happen but guard defensively).
const orphaned = running.filter((j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") !== runId);
const samRun = running.filter((j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId);
if (orphaned.length > 0) {
const orphanNames = orphaned.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of orphaned) {
// Pick the most recent reattachable orphan — same agent + task + session,
// not terminal. Only one target is chosen; any other orphans get
// cleaned up as before.
if (reattachOrphanedJobs && orphaned.length > 0) {
const candidates = orphaned
.filter((j) => isReattachableOrphan(j, {
agentId,
taskId: currentTaskLabel,
sessionId: currentSessionLabel,
}))
.sort((a, b) => {
const at = new Date(a.metadata?.creationTimestamp ?? 0).getTime();
const bt = new Date(b.metadata?.creationTimestamp ?? 0).getTime();
return bt - at;
});
const chosen = candidates[0];
const chosenName = chosen?.metadata?.name;
if (chosen && chosenName) {
reattachTarget = {
jobName: chosenName,
namespace: chosen.metadata?.namespace ?? guardNamespace,
priorRunId: chosen.metadata?.labels?.["paperclip.io/run-id"] ?? "",
image: chosen.spec?.template?.spec?.containers?.[0]?.image ?? "unknown",
};
}
}
const toDelete = orphaned.filter((j) => !reattachTarget || j.metadata?.name !== reattachTarget.jobName);
if (toDelete.length > 0) {
const orphanNames = toDelete.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${toDelete.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of toDelete) {
const name = j.metadata?.name;
if (name) {
await cleanupJob(guardNamespace, name, onLog, kubeconfigPath);
@@ -317,81 +500,114 @@ export async function execute(ctx) {
errorCode: "k8s_concurrency_guard_unreachable",
};
}
// Build Job manifest
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
ctx,
selfPod,
});
// Report invocation metadata
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: claudeArgs,
commandNotes: [
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
context: ctx.context,
});
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
const coreApi = getCoreApi(kubeconfigPath);
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
},
},
stringData: promptSecret.data,
},
const batchApi = getBatchApi(kubeconfigPath);
let jobName;
let namespace;
let promptSecret = null;
if (reattachTarget) {
jobName = reattachTarget.jobName;
namespace = reattachTarget.namespace;
// Announce reattach metadata. Prompt and args aren't known here — they
// belong to the prior run that created this pod and are already present
// on the running container.
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: [],
commandNotes: [
`Image: ${reattachTarget.image}`,
`Namespace: ${namespace}`,
`Reattached from prior run: ${reattachTarget.priorRunId || "unknown"}`,
`Timeout: ${timeoutSec}s`,
],
prompt: "",
context: ctx.context,
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
}
await onLog("stdout", `[paperclip] Reattaching to in-flight K8s Job ${jobName} in namespace ${namespace} (prior run ${reattachTarget.priorRunId || "unknown"})\n`);
}
else {
// Build Job manifest
const built = buildJobManifest({ ctx, selfPod });
const job = built.job;
jobName = built.jobName;
namespace = built.namespace;
const prompt = built.prompt;
const claudeArgs = built.claudeArgs;
const promptMetrics = built.promptMetrics;
promptSecret = built.promptSecret;
// Report invocation metadata
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: claudeArgs,
commandNotes: [
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
context: ctx.context,
});
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
},
},
stringData: promptSecret.data,
},
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
}
catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
};
}
}
// Create the Job
try {
await batchApi.createNamespacedJob({ namespace, body: job });
}
catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
errorCode: "k8s_job_create_failed",
};
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
}
// Create the Job
const batchApi = getBatchApi(kubeconfigPath);
try {
await batchApi.createNamespacedJob({ namespace, body: job });
}
catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
errorCode: "k8s_job_create_failed",
};
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
let stdout = "";
let exitCode = null;
let jobTimedOut = false;
@@ -404,8 +620,24 @@ export async function execute(ctx) {
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
let podName;
try {
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
if (reattachTarget) {
// Pod is already running from the prior run — look it up directly.
const podList = await coreApi.listNamespacedPod({
namespace,
labelSelector: `job-name=${jobName}`,
});
const pod = podList.items[0];
const name = pod?.metadata?.name;
if (!name) {
throw new Error(`Reattach target Job ${jobName} has no pod`);
}
podName = name;
await onLog("stdout", `[paperclip] Reattached to pod ${podName}\n`);
}
else {
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
}
// Notify the server that execution has started. This sets
// processStartedAt and refreshes updatedAt in the DB, which the
// stale-run reaper (reapOrphanedRuns) uses to decide liveness.
@@ -419,13 +651,14 @@ export async function execute(ctx) {
}
catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Pod scheduling failed: ${msg}\n`);
const phase = reattachTarget ? "reattach" : "scheduling";
await onLog("stderr", `[paperclip] Pod ${phase} failed: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Pod scheduling failed: ${msg}`,
errorCode: "k8s_pod_schedule_failed",
errorMessage: `Pod ${phase} failed: ${msg}`,
errorCode: reattachTarget ? "k8s_pod_reattach_failed" : "k8s_pod_schedule_failed",
};
}
// Stream logs and wait for completion concurrently.
@@ -457,18 +690,32 @@ export async function execute(ctx) {
let lastLogAt = Date.now();
let keepaliveTick = 0;
let keepaliveJobTerminal = false;
let keepaliveJobTerminalAt = null;
keepaliveTimer = setInterval(() => {
// Fire-and-forget the async work; setInterval callbacks must be
// synchronous or the timer will drift.
void (async () => {
if (keepaliveJobTerminal)
if (keepaliveJobTerminal) {
// Post-terminal window: keep refreshing onSpawn during cleanup
// (job deletion, log parsing, K8s API calls) so the reaper doesn't
// fire a false process_lost while execute() is still running.
if (ctx.onSpawn &&
keepaliveJobTerminalAt !== null &&
Date.now() - keepaliveJobTerminalAt <= POST_TERMINAL_KEEPALIVE_MS) {
keepaliveTick++;
if (keepaliveTick % 6 === 0) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => { });
}
}
return;
}
// Verify the Job is still alive before announcing or refreshing.
try {
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
const terminal = job.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True");
if (terminal) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
return;
}
}
@@ -477,10 +724,9 @@ export async function execute(ctx) {
// connection resets should NOT permanently disable the keepalive —
// the next tick will re-check and the reaper uses the staleness
// window as a safety net.
const statusCode = err?.response?.statusCode
?? err?.statusCode;
if (statusCode === 404) {
if (isK8s404(err)) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
return;
}
// Log transient errors but leave keepaliveJobTerminal false so
@@ -525,23 +771,44 @@ export async function execute(ctx) {
if (logResult.status === "fulfilled") {
stdout = logResult.value;
}
// If the follow stream missed output (container exited quickly), do a
// one-shot log read as fallback before the pod is cleaned up.
if (!stdout.trim()) {
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
stdout = await readPodLogs(namespace, podName, kubeconfigPath);
if (stdout.trim()) {
// One-shot log fallback: handles two failure modes with a single read.
// Mode 1 — empty stream: the follow stream returned nothing (fast exit before connection).
// Mode 2 — partial stream: we have some output but no result event (follow stream raced
// with container exit and captured only the init line before the connection dropped).
// A one-shot readPodLogs is more reliable for already-terminated containers and reads
// from the beginning of the log, giving us the full output.
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
// the authoritative parse happens once below after all fallbacks complete).
const hasResultEvent = stdout.includes('"type":"result"');
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
if (needsOneShot) {
if (!stdout.trim()) {
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
}
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
if (!stdout.trim() && oneShotLogs.trim()) {
stdout = oneShotLogs;
await onLog("stdout", stdout);
}
else if (oneShotLogs && oneShotLogs.length > stdout.length) {
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
stdout = oneShotLogs;
}
}
if (completionResult.status === "fulfilled") {
jobTimedOut = completionResult.value.timedOut;
if (completionResult.value.jobGone) {
// Job was deleted by TTL or externally before we observed the Complete/Failed
// condition. The container must have exited first (TTL only fires after
// completion), so log streaming has captured the full output — continue
// to stdout parsing rather than returning an error.
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
}
}
else {
// waitForJobCompletion threw — re-check job state to avoid returning
// while the job is still running (which would cause UI staleness and
// concurrency errors on retry). Use a bounded timeout (60s) so we
// don't hang the heartbeat indefinitely if the K8s API is degraded.
// waitForJobCompletion threw an unexpected error — re-check job state to
// avoid returning while the job is still running. Use a bounded timeout
// (60s) so we don't hang the heartbeat indefinitely if the K8s API is degraded.
jobTimedOut = false;
const RECHECK_TIMEOUT_MS = 60_000;
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath);
@@ -550,6 +817,11 @@ export async function execute(ctx) {
// Return an error so the UI knows the run is not done.
jobTimedOut = true;
}
else if (actualState.jobGone) {
// Job was deleted before we could confirm terminal state — same as the
// fulfilled+jobGone case above: proceed with captured output.
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
}
else if (!actualState.succeeded) {
// Job still not terminal — the completion error was likely transient.
// Return an error so the UI knows the run is not done, rather than
@@ -615,16 +887,11 @@ export async function execute(ctx) {
};
}
if (!parsed) {
const stderrLine = stdout.split(/\r?\n/).map((l) => l.trim()).find(Boolean) ?? "";
return {
exitCode,
signal: null,
timedOut: false,
errorMessage: exitCode === 0
? "Failed to parse Claude JSON output"
: stderrLine
? `Claude exited with code ${exitCode ?? -1}: ${stderrLine}`
: `Claude exited with code ${exitCode ?? -1}`,
errorMessage: buildPartialRunError(exitCode, parsedStream.model, stdout),
resultJson: { stdout },
};
}
@@ -636,8 +903,7 @@ export async function execute(ctx) {
outputTokens: asNumber(usageObj.output_tokens, 0),
};
})();
const runtimeSessionParams = parseObject(runtime.sessionParams);
const fallbackSessionId = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const fallbackSessionId = currentSessionIdRaw;
const resolvedSessionId = parsedStream.sessionId
?? (asString(parsed.session_id, fallbackSessionId) || fallbackSessionId);
const model = asString(config.model, "");
+1 -1
View File
File diff suppressed because one or more lines are too long
+20
View File
@@ -1,5 +1,19 @@
import type * as k8s from "@kubernetes/client-node";
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
/**
* Build the shell command prefix that installs a native Node.js PostToolUse
* hook into Claude Code's settings. The hook truncates oversized tool outputs
* before they reach the model — replacing the RTK binary init-container
* approach with a self-contained Node.js implementation.
*
* Both scripts are base64-encoded so they can be embedded in a sh -c command
* string without any quoting or escaping issues.
*
* @param maxOutputBytes Byte threshold above which tool output is truncated.
* @returns A shell command string (suitable for "&&"-chaining
* before the claude invocation).
*/
export declare function buildRtkSetupCommands(maxOutputBytes: number): string;
import type { SelfPodInfo } from "./k8s-client.js";
export interface JobBuildInput {
ctx: AdapterExecutionContext;
@@ -24,5 +38,11 @@ export interface JobBuildResult {
* staged as a K8s Secret before creating the Job. */
promptSecret: PromptSecret | null;
}
/**
* Sanitize a string for use as a Kubernetes label value (RFC 1123 subset:
* `[a-zA-Z0-9]([-_.a-zA-Z0-9]*[a-zA-Z0-9])?`, max 63 chars). Returns `null`
* when no usable characters remain — the caller should omit the label.
*/
export declare function sanitizeLabelValue(value: string, maxLen?: number): string | null;
export declare function buildJobManifest(input: JobBuildInput): JobBuildResult;
//# sourceMappingURL=job-manifest.d.ts.map
+1 -1
View File
@@ -1 +1 @@
{"version":3,"file":"job-manifest.d.ts","sourceRoot":"","sources":["../../src/server/job-manifest.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,KAAK,GAAG,MAAM,yBAAyB,CAAC;AACpD,OAAO,KAAK,EAAE,uBAAuB,EAAE,MAAM,4BAA4B,CAAC;AAgD1E,OAAO,KAAK,EAAE,WAAW,EAAE,MAAM,iBAAiB,CAAC;AA6CnD,MAAM,WAAW,aAAa;IAC5B,GAAG,EAAE,uBAAuB,CAAC;IAC7B,OAAO,EAAE,WAAW,CAAC;CACtB;AAED;;+EAE+E;AAC/E,MAAM,WAAW,YAAY;IAC3B,IAAI,EAAE,MAAM,CAAC;IACb,SAAS,EAAE,MAAM,CAAC;IAClB,IAAI,EAAE,MAAM,CAAC,MAAM,EAAE,MAAM,CAAC,CAAC;CAC9B;AAED,MAAM,WAAW,cAAc;IAC7B,GAAG,EAAE,GAAG,CAAC,KAAK,CAAC;IACf,OAAO,EAAE,MAAM,CAAC;IAChB,SAAS,EAAE,MAAM,CAAC;IAClB,MAAM,EAAE,MAAM,CAAC;IACf,UAAU,EAAE,MAAM,EAAE,CAAC;IACrB,aAAa,EAAE,MAAM,CAAC,MAAM,EAAE,MAAM,CAAC,CAAC;IACtC;0DACsD;IACtD,YAAY,EAAE,YAAY,GAAG,IAAI,CAAC;CACnC;AAuHD,wBAAgB,gBAAgB,CAAC,KAAK,EAAE,aAAa,GAAG,cAAc,CAkRrE"}
{"version":3,"file":"job-manifest.d.ts","sourceRoot":"","sources":["../../src/server/job-manifest.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,KAAK,GAAG,MAAM,yBAAyB,CAAC;AACpD,OAAO,KAAK,EAAE,uBAAuB,EAAE,MAAM,4BAA4B,CAAC;AAY1E;;;;;;;;;;;;GAYG;AACH,wBAAgB,qBAAqB,CAAC,cAAc,EAAE,MAAM,GAAG,MAAM,CAiEpE;AAsCD,OAAO,KAAK,EAAE,WAAW,EAAE,MAAM,iBAAiB,CAAC;AA6CnD,MAAM,WAAW,aAAa;IAC5B,GAAG,EAAE,uBAAuB,CAAC;IAC7B,OAAO,EAAE,WAAW,CAAC;CACtB;AAED;;+EAE+E;AAC/E,MAAM,WAAW,YAAY;IAC3B,IAAI,EAAE,MAAM,CAAC;IACb,SAAS,EAAE,MAAM,CAAC;IAClB,IAAI,EAAE,MAAM,CAAC,MAAM,EAAE,MAAM,CAAC,CAAC;CAC9B;AAED,MAAM,WAAW,cAAc;IAC7B,GAAG,EAAE,GAAG,CAAC,KAAK,CAAC;IACf,OAAO,EAAE,MAAM,CAAC;IAChB,SAAS,EAAE,MAAM,CAAC;IAClB,MAAM,EAAE,MAAM,CAAC;IACf,UAAU,EAAE,MAAM,EAAE,CAAC;IACrB,aAAa,EAAE,MAAM,CAAC,MAAM,EAAE,MAAM,CAAC,CAAC;IACtC;0DACsD;IACtD,YAAY,EAAE,YAAY,GAAG,IAAI,CAAC;CACnC;AAMD;;;;GAIG;AACH,wBAAgB,kBAAkB,CAAC,KAAK,EAAE,MAAM,EAAE,MAAM,SAAK,GAAG,MAAM,GAAG,IAAI,CAI5E;AAmHD,wBAAgB,gBAAgB,CAAC,KAAK,EAAE,aAAa,GAAG,cAAc,CAmSrE"}
+106 -1
View File
@@ -1,5 +1,81 @@
import { asString, asNumber, asBoolean, asStringArray, parseObject, buildPaperclipEnv, renderTemplate, } from "@paperclipai/adapter-utils/server-utils";
import { createHash } from "node:crypto";
/**
* Build the shell command prefix that installs a native Node.js PostToolUse
* hook into Claude Code's settings. The hook truncates oversized tool outputs
* before they reach the model — replacing the RTK binary init-container
* approach with a self-contained Node.js implementation.
*
* Both scripts are base64-encoded so they can be embedded in a sh -c command
* string without any quoting or escaping issues.
*
* @param maxOutputBytes Byte threshold above which tool output is truncated.
* @returns A shell command string (suitable for "&&"-chaining
* before the claude invocation).
*/
export function buildRtkSetupCommands(maxOutputBytes) {
// --- Filter script ----------------------------------------------------------
// This script runs as the PostToolUse hook inside every K8s Job pod.
// Claude Code writes the hook event as JSON to the script's stdin; the script
// truncates the tool_response/tool_result content when it exceeds the
// threshold and writes the (possibly modified) JSON to stdout.
//
// Field-name coverage:
// • tool_response — documented hook event format for PostToolUse
// • tool_result — alternative name seen in some Claude Code versions
// Content may be a plain string or an array of typed blocks (text/image/…).
const filterScript = [
`const c=[];`,
`process.stdin.on('data',d=>c.push(d));`,
`process.stdin.on('end',()=>{`,
`const raw=Buffer.concat(c).toString('utf-8');`,
`let o;try{o=JSON.parse(raw);}catch{process.stdout.write(raw);return;}`,
`const MAX=${maxOutputBytes};`,
`function trunc(s){`,
`if(typeof s!=='string')return s;`,
`const b=Buffer.from(s,'utf-8');`,
`if(b.length<=MAX)return s;`,
`return b.slice(0,MAX).toString('utf-8')+'\\n[...'+(b.length-MAX)+' bytes truncated by paperclip-rtk]';`,
`}`,
`const tr=o&&(o.tool_response||o.tool_result);`,
`if(tr){`,
`if(typeof tr.content==='string'){tr.content=trunc(tr.content);}`,
`else if(Array.isArray(tr.content)){`,
`tr.content=tr.content.map(function(b){`,
`if(b&&typeof b==='object'&&typeof b.text==='string'){`,
`return Object.assign({},b,{text:trunc(b.text)});`,
`}return b;`,
`});`,
`}`,
`}`,
`process.stdout.write(JSON.stringify(o));`,
`});`,
].join("");
// --- Settings script --------------------------------------------------------
// Reads the existing ~/.claude/settings.json (if any), merges in the RTK
// PostToolUse hook, and writes the file back. All other settings sections
// are preserved; only PostToolUse is replaced so we own the full hook list
// for this run.
const settingsScript = [
`const fs=require('fs'),pt=require('path');`,
`const p=pt.join(process.env.HOME,'.claude','settings.json');`,
`let s={};try{s=JSON.parse(fs.readFileSync(p,'utf-8'));}catch(e){}`,
`s.hooks=s.hooks||{};`,
`s.hooks.PostToolUse=[{matcher:'.*',hooks:[{type:'command',command:'node /tmp/.rtk-filter.js'}]}];`,
`fs.mkdirSync(pt.dirname(p),{recursive:true});`,
`fs.writeFileSync(p,JSON.stringify(s));`,
].join("");
// Encode as base64 so the strings can be embedded directly in a shell command
// without any quoting concerns (base64 alphabet: A-Za-z0-9+/=).
const filterB64 = Buffer.from(filterScript, "utf-8").toString("base64");
const settingsB64 = Buffer.from(settingsScript, "utf-8").toString("base64");
return [
// Write the filter script
`node -e "require('fs').writeFileSync('/tmp/.rtk-filter.js',Buffer.from('${filterB64}','base64').toString('utf-8'))"`,
// Install the Claude Code PostToolUse hook (merge into existing settings)
`node -e "eval(Buffer.from('${settingsB64}','base64').toString('utf-8'))"`,
].join(" && ");
}
/** Prompts above this size (bytes) are staged via a Secret instead of an
* init container env var, protecting against the ~1 MiB PodSpec limit. */
const LARGE_PROMPT_THRESHOLD_BYTES = 256 * 1024;
@@ -91,6 +167,16 @@ function parseKeyValueConfig(raw) {
function sanitizeForK8sName(value, maxLen = 16) {
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
}
/**
* Sanitize a string for use as a Kubernetes label value (RFC 1123 subset:
* `[a-zA-Z0-9]([-_.a-zA-Z0-9]*[a-zA-Z0-9])?`, max 63 chars). Returns `null`
* when no usable characters remain — the caller should omit the label.
*/
export function sanitizeLabelValue(value, maxLen = 63) {
const cleaned = value.replace(/[^a-zA-Z0-9._-]/g, "").slice(0, maxLen);
const trimmed = cleaned.replace(/^[^a-zA-Z0-9]+/, "").replace(/[^a-zA-Z0-9]+$/, "");
return trimmed.length > 0 ? trimmed : null;
}
/**
* Build a short deterministic hash suffix from the raw inputs to avoid
* collisions when sanitized slugs happen to be identical.
@@ -202,6 +288,8 @@ export function buildJobManifest(input) {
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
const extraLabels = parseKeyValueConfig(config.labels);
const enableRtk = asBoolean(config.enableRtk, false);
const rtkMaxOutputBytes = asNumber(config.rtkMaxOutputBytes, 50000);
// Resolve working directory — use workspace cwd, fall back to /paperclip
const workspaceContext = parseObject(context.paperclipWorkspace);
const workspaceCwd = asString(workspaceContext.cwd, "");
@@ -289,6 +377,17 @@ export function buildJobManifest(input) {
"paperclip.io/company-id": agent.companyId,
"paperclip.io/adapter-type": "claude_k8s",
};
// Reattach-target labels: let a future execute() identify this Job as the
// continuation of the same logical unit of work (same task + same resume
// session) so it can attach to the running pod across a Paperclip restart
// instead of deleting it and starting over (FAR-124).
const taskIdRaw = asString(context.taskId, "") || asString(context.issueId, "");
const taskLabel = taskIdRaw ? sanitizeLabelValue(taskIdRaw) : null;
if (taskLabel)
labels["paperclip.io/task-id"] = taskLabel;
const sessionLabel = runtimeSessionId ? sanitizeLabelValue(runtimeSessionId) : null;
if (sessionLabel)
labels["paperclip.io/session-id"] = sessionLabel;
for (const [key, value] of Object.entries(extraLabels)) {
labels[key] = value;
}
@@ -345,7 +444,13 @@ export function buildJobManifest(input) {
};
// Build the claude command string for the main container
const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
const mainCommand = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
const claudeInvocation = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
// When RTK output filtering is enabled, prepend the Node.js hook setup.
// This writes a filter script and a Claude Code settings file that installs
// it as a PostToolUse hook — no external binary or init container required.
const mainCommand = enableRtk
? `${buildRtkSetupCommands(rtkMaxOutputBytes)} && ${claudeInvocation}`
: claudeInvocation;
// Decide prompt delivery strategy: env var (small) or Secret volume (large).
const promptBytes = Buffer.byteLength(prompt, "utf-8");
const useLargePromptPath = promptBytes > LARGE_PROMPT_THRESHOLD_BYTES;
+1 -1
View File
File diff suppressed because one or more lines are too long
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.31",
"version": "0.1.35",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.31",
"version": "0.1.35",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.32",
"version": "0.1.36",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+5 -1
View File
@@ -15,7 +15,6 @@ Core fields:
- model (string, optional): Claude model id
- effort (string, optional): reasoning effort passed via --effort (low|medium|high)
- maxTurnsPerRun (number, optional): max turns for one run
- dangerouslySkipPermissions (boolean, optional): pass --dangerously-skip-permissions to claude
- instructionsFilePath (string, optional): absolute path to a markdown instructions file injected at runtime via --append-system-prompt-file
- extraArgs (string[], optional): additional CLI args appended to the claude command
- env (object, optional): KEY=VALUE environment variables; overrides inherited vars from the Deployment
@@ -31,6 +30,11 @@ Kubernetes fields:
- labels (object, optional): extra labels added to Job metadata
- ttlSecondsAfterFinished (number, optional): auto-cleanup delay; default 300
- retainJobs (boolean, optional): skip cleanup on completion for debugging
- reattachOrphanedJobs (boolean, optional): when true (default), attach to a running orphaned Job that matches the current agent/task/session instead of blocking; when false, any non-terminal orphan blocks the new run
Output filtering fields:
- enableRtk (boolean, optional): truncate oversized tool outputs before they reach the model via a PostToolUse hook; default false
- rtkMaxOutputBytes (number, optional): byte threshold for tool output truncation when enableRtk is true; default 50000
Operational fields:
- timeoutSec (number, optional): run timeout in seconds; 0 means no timeout
+2 -4
View File
@@ -34,12 +34,10 @@ describe("getConfigSchema", () => {
expect(field!.default).toBe(1000);
});
it("dangerouslySkipPermissions defaults to true", () => {
it("does not expose dangerouslySkipPermissions in UI schema", () => {
const schema = getConfigSchema();
const field = schema.fields.find((f: ConfigFieldSchema) => f.key === "dangerouslySkipPermissions");
expect(field).toBeDefined();
expect(field!.type).toBe("toggle");
expect(field!.default).toBe(true);
expect(field).toBeUndefined();
});
it("reattachOrphanedJobs defaults to true", () => {
+1 -8
View File
@@ -34,13 +34,6 @@ export function getConfigSchema(): AdapterConfigSchema {
hint: "Maximum number of agentic turns (tool calls) per heartbeat run. 0 means unlimited.",
default: 1000,
},
{
type: "toggle",
key: "dangerouslySkipPermissions",
label: "Skip Permissions",
hint: "Pass --dangerously-skip-permissions to Claude. Enabled by default for unattended K8s Jobs.",
default: true,
},
// Kubernetes
{
type: "text",
@@ -93,7 +86,7 @@ export function getConfigSchema(): AdapterConfigSchema {
type: "toggle",
key: "reattachOrphanedJobs",
label: "Reattach to Orphaned Jobs",
hint: "If a prior K8s Job for the same agent/task/session is still running (e.g. Paperclip restarted mid-run), attach to it and stream its output instead of deleting it and starting a new pod. Default: on.",
hint: "If a prior K8s Job for the same agent/task/session is still running (e.g. Paperclip restarted mid-run), attach to it and stream its output instead of blocking the new run. When false, any non-terminal orphan blocks the new run. Default: on.",
default: true,
},
// Resource Limits
File diff suppressed because it is too large Load Diff
+272 -69
View File
@@ -35,6 +35,13 @@ const POST_TERMINAL_KEEPALIVE_MS = 90_000;
// against the K8s client library not propagating writable.destroy() into an
// abort of the underlying HTTP request.
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
// After the log stream exits (container stopped producing output), wait this
// long for the K8s Job condition to be confirmed before treating the job as
// done. K8s Job conditions can lag pod exit by several seconds or more under
// cluster load. Without this bound, waitForJobCompletion keeps polling while
// streamPodLogs keeps reconnecting — together they can hold execute() open for
// minutes, causing stale "running" status in the UI (FAR-23).
const LOG_EXIT_COMPLETION_GRACE_MS = 30_000;
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
@@ -89,30 +96,46 @@ export function buildPartialRunError(
: `Claude exited with code ${exitCode ?? -1}`;
}
export type OrphanClassification =
| "reattach"
| "block_session_mismatch"
| "block_task_mismatch"
| "block_task_unknown";
/**
* Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does
* not match the current runId) as a potential reattach target. A Job is
* reattachable when it belongs to the same agent, same task, and same resume
* session as the current run — meaning the previous Paperclip instance was
* mid-stream on the exact piece of work this new run was dispatched to do.
* Classify a non-terminal orphaned K8s Job (one whose `paperclip.io/run-id`
* label does not match the current runId but does belong to this agent) as a
* reattach candidate or a block reason.
*
* Decision matrix:
* - taskId mismatch (both present, different values) → block_task_mismatch
* - taskId missing on either side → block_task_unknown
* - taskId match + both have sessionId + sessionIds differ → block_session_mismatch
* - taskId match + one or both sides missing sessionId → reattach (reconcile)
* - taskId match + both have sessionId + sessionIds match → reattach (happy path)
*
* Exported for unit tests.
*/
export function isReattachableOrphan(
export function classifyOrphan(
job: k8s.V1Job,
expected: { agentId: string; taskId: string | null; sessionId: string | null },
): boolean {
if (!expected.taskId || !expected.sessionId) return false;
expected: { taskId: string | null; sessionId: string | null },
): OrphanClassification {
const labels = job.metadata?.labels ?? {};
if (labels["paperclip.io/adapter-type"] !== "claude_k8s") return false;
if (labels["paperclip.io/agent-id"] !== expected.agentId) return false;
if (labels["paperclip.io/task-id"] !== expected.taskId) return false;
if (labels["paperclip.io/session-id"] !== expected.sessionId) return false;
const conditions = job.status?.conditions ?? [];
const terminal = conditions.some(
(c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True",
);
if (terminal) return false;
return true;
const jobTaskId = labels["paperclip.io/task-id"] ?? null;
const jobSessionId = labels["paperclip.io/session-id"] ?? null;
// taskId missing on either side
if (!expected.taskId || !jobTaskId) return "block_task_unknown";
// taskId mismatch
if (expected.taskId !== jobTaskId) return "block_task_mismatch";
// taskId matches — check sessionId
if (expected.sessionId && jobSessionId && expected.sessionId !== jobSessionId) {
return "block_session_mismatch";
}
return "reattach";
}
/**
@@ -176,7 +199,7 @@ async function waitForPod(
const containerStatuses = pod.status?.containerStatuses ?? [];
// Log phase transitions
const statusKey = `${phase}:${initStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.terminated?.reason ?? "ok").join(",")}:${containerStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.running ? "running" : "waiting").join(",")}`;
const statusKey = `${phase}:${initStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.terminated?.reason ?? "ok").join(",")}:${containerStatuses.map((s) => s.state?.waiting?.reason ?? (s.state?.running ? "running" : "waiting")).join(",")}`;
if (statusKey !== lastStatus) {
const details: string[] = [`phase=${phase}`];
for (const init of initStatuses) {
@@ -301,7 +324,10 @@ export async function streamPodLogsOnce(
if (stopSignal.stopped) {
if (!writable.destroyed) writable.destroy();
if (!bailTimer && bailResolve) {
bailTimer = setTimeout(bailResolve, LOG_STREAM_BAIL_TIMEOUT_MS);
bailTimer = setTimeout(() => {
onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {});
bailResolve!();
}, LOG_STREAM_BAIL_TIMEOUT_MS);
}
}
}, 200);
@@ -338,6 +364,11 @@ export async function streamPodLogsOnce(
*
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*
* onFirstStreamExit is called the first time streamPodLogsOnce returns
* (container has exited or stream disconnected). Used by execute() to
* start the LOG_EXIT_COMPLETION_GRACE_MS grace timer (FAR-23) without
* waiting for all reconnects to exhaust.
*/
async function streamPodLogs(
namespace: string,
@@ -345,6 +376,8 @@ async function streamPodLogs(
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
dedup?: LogLineDedupFilter,
onFirstStreamExit?: () => void,
): Promise<string> {
const allChunks: string[] = [];
let attempt = 0;
@@ -354,7 +387,7 @@ async function streamPodLogs(
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Shared across reconnects so replayed lines inside the `sinceSeconds`
// overlap window are dropped before they reach the streaming UI (FAR-123).
const dedup = new LogLineDedupFilter();
if (!dedup) dedup = new LogLineDedupFilter();
while (!stopSignal?.stopped) {
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
@@ -375,6 +408,9 @@ async function streamPodLogs(
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
// Signal first stream exit immediately so the grace-period timer in
// execute() can start without waiting for all reconnects to complete.
if (attempt === 0) onFirstStreamExit?.();
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
@@ -521,6 +557,17 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// the current runId. When reattachOrphanedJobs is enabled and the orphan matches
// the current agent+task+session, we attach to it instead of deleting it (FAR-124).
const agentId = ctx.agent.id;
const sanitizedAgentId = sanitizeLabelValue(agentId);
if (!sanitizedAgentId) {
await onLog("stderr", `[paperclip] Cannot create K8s Job: agent.id "${agentId}" produces no valid RFC 1123 label characters\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Agent ID "${agentId}" cannot be sanitized to a valid Kubernetes label`,
errorCode: "k8s_agent_id_invalid",
};
}
const selfPod = await getSelfPodInfo(kubeconfigPath);
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
@@ -534,7 +581,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const batchApi = getBatchApi(kubeconfigPath);
const existing = await batchApi.listNamespacedJob({
namespace: guardNamespace,
labelSelector: `paperclip.io/agent-id=${agentId},paperclip.io/adapter-type=claude_k8s`,
labelSelector: `paperclip.io/agent-id=${sanitizedAgentId},paperclip.io/adapter-type=claude_k8s`,
});
const running = existing.items.filter(
(j) => !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"),
@@ -549,45 +596,72 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
(j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId,
);
// Pick the most recent reattachable orphan — same agent + task + session,
// not terminal. Only one target is chosen; any other orphans get
// cleaned up as before.
if (reattachOrphanedJobs && orphaned.length > 0) {
const candidates = orphaned
.filter((j) =>
isReattachableOrphan(j, {
agentId,
taskId: currentTaskLabel,
sessionId: currentSessionLabel,
}),
)
.sort((a, b) => {
const at = new Date(a.metadata?.creationTimestamp ?? 0).getTime();
const bt = new Date(b.metadata?.creationTimestamp ?? 0).getTime();
return bt - at;
});
const chosen = candidates[0];
const chosenName = chosen?.metadata?.name;
if (chosen && chosenName) {
reattachTarget = {
jobName: chosenName,
namespace: chosen.metadata?.namespace ?? guardNamespace,
priorRunId: chosen.metadata?.labels?.["paperclip.io/run-id"] ?? "",
image: chosen.spec?.template?.spec?.containers?.[0]?.image ?? "unknown",
if (orphaned.length > 0) {
if (!reattachOrphanedJobs) {
// When reattach is disabled, block on any non-terminal orphan.
const names = orphaned.map((j) => j.metadata?.name).join(", ");
await onLog("stderr", `[paperclip] Concurrent run blocked: orphaned Job(s) running and reattach disabled: ${names}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Concurrent run blocked: orphaned Job(s) still running for this agent (reattach disabled)`,
errorCode: "k8s_concurrent_run_blocked",
};
}
}
const toDelete = orphaned.filter(
(j) => !reattachTarget || j.metadata?.name !== reattachTarget.jobName,
);
if (toDelete.length > 0) {
const orphanNames = toDelete.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${toDelete.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of toDelete) {
const name = j.metadata?.name;
if (name) {
await cleanupJob(guardNamespace, name, onLog, kubeconfigPath);
// Apply the decision matrix to each orphan, newest-first. The first
// reattachable orphan becomes the target; any block classification
// stops the new run immediately. Orphans are never deleted here —
// terminal ones are cleaned up by TTL; live mismatches should not be
// killed because they may still be doing real work.
const sortedOrphans = [...orphaned].sort((a, b) => {
const at = new Date(a.metadata?.creationTimestamp ?? 0).getTime();
const bt = new Date(b.metadata?.creationTimestamp ?? 0).getTime();
return bt - at;
});
for (const orphan of sortedOrphans) {
const classification = classifyOrphan(orphan, {
taskId: currentTaskLabel,
sessionId: currentSessionLabel,
});
const orphanName = orphan.metadata?.name ?? "unknown";
if (classification === "reattach") {
if (!reattachTarget) {
reattachTarget = {
jobName: orphanName,
namespace: orphan.metadata?.namespace ?? guardNamespace,
priorRunId: orphan.metadata?.labels?.["paperclip.io/run-id"] ?? "",
image: orphan.spec?.template?.spec?.containers?.[0]?.image ?? "unknown",
};
}
} else if (classification === "block_task_unknown") {
await onLog("stderr", `[paperclip] Blocked: orphaned Job ${orphanName} has missing task label — cannot safely reattach\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Concurrent run blocked: orphaned Job ${orphanName} has unknown task context`,
errorCode: "k8s_orphan_task_unknown",
};
} else if (classification === "block_task_mismatch") {
await onLog("stderr", `[paperclip] Blocked: orphaned Job ${orphanName} belongs to a different task\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Concurrent run blocked: orphaned Job ${orphanName} is running a different task`,
errorCode: "k8s_concurrent_run_blocked",
};
} else if (classification === "block_session_mismatch") {
await onLog("stderr", `[paperclip] Blocked: orphaned Job ${orphanName} has a different session\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Concurrent run blocked: orphaned Job ${orphanName} has a mismatched session`,
errorCode: "k8s_orphan_session_mismatch",
};
}
}
}
@@ -686,6 +760,26 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
await onLog("stdout", `[paperclip] Reattaching to in-flight K8s Job ${jobName} in namespace ${namespace} (prior run ${reattachTarget.priorRunId || "unknown"})\n`);
// Relabel the reattached Job with the current run-id (and session-id if
// available) so the next concurrency guard sees it as owned by this run
// rather than an orphan from the prior run.
const labelPatch: Array<{ op: "add" | "replace"; path: string; value: string }> = [
{ op: "replace", path: "/metadata/labels/paperclip.io~1run-id", value: runId },
];
if (currentSessionLabel) {
labelPatch.push({ op: "replace", path: "/metadata/labels/paperclip.io~1session-id", value: currentSessionLabel });
}
try {
await batchApi.patchNamespacedJob({
name: jobName,
namespace,
body: labelPatch,
});
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Warning: failed to relabel reattached Job ${jobName}: ${msg}\n`);
}
} else {
// Build Job manifest
const built = buildJobManifest({ ctx, selfPod, promptBundle });
@@ -696,6 +790,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const claudeArgs = built.claudeArgs;
const promptMetrics = built.promptMetrics;
promptSecret = built.promptSecret;
if (built.skippedLabels.length > 0) {
await onLog("stderr", `[paperclip] Warning: skipped ${built.skippedLabels.length} extra label(s) with reserved prefix: ${built.skippedLabels.join(", ")}\n`);
}
// Report invocation metadata
if (onMeta) {
@@ -751,11 +848,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
// Create the Job
let createdJobUid: string | undefined;
try {
await batchApi.createNamespacedJob({ namespace, body: job });
const created = await batchApi.createNamespacedJob({ namespace, body: job });
createdJobUid = created.metadata?.uid;
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
if (promptSecret) {
try {
await coreApi.deleteNamespacedSecret({ name: promptSecret.name, namespace: promptSecret.namespace });
} catch { /* best-effort */ }
}
return {
exitCode: null,
signal: null,
@@ -765,6 +869,35 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
// Attach ownerReference so K8s GC cleans up the Secret if the process
// crashes before the finally block runs.
if (promptSecret && createdJobUid) {
try {
await coreApi.patchNamespacedSecret({
name: promptSecret.name,
namespace: promptSecret.namespace,
body: [
{
op: "add",
path: "/metadata/ownerReferences",
value: [
{
apiVersion: "batch/v1",
kind: "Job",
name: jobName,
uid: createdJobUid,
blockOwnerDeletion: false,
},
],
},
],
});
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Warning: failed to set ownerReference on prompt Secret: ${msg}\n`);
}
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
}
@@ -853,6 +986,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let keepaliveTick = 0;
let keepaliveJobTerminal = false;
let keepaliveJobTerminalAt: number | null = null;
let consecutiveTerminalReadings = 0;
keepaliveTimer = setInterval(() => {
// Fire-and-forget the async work; setInterval callbacks must be
// synchronous or the timer will drift.
@@ -875,16 +1009,32 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
// Verify the Job is still alive before announcing or refreshing.
// Require two consecutive terminal readings before latching to
// guard against a stale K8s API cache returning a false terminal
// status on a single read (finding #5, FAR-15).
try {
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
const terminal = job.status?.conditions?.some(
(c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True",
);
if (terminal) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
consecutiveTerminalReadings++;
if (consecutiveTerminalReadings >= 2) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
if (ctx.onSpawn) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
return;
}
// First terminal reading — do not latch yet; next tick confirms.
keepaliveTick++;
if (ctx.onSpawn && (keepaliveTick === 1 || keepaliveTick % 12 === 0)) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
return;
}
consecutiveTerminalReadings = 0;
} catch (err: unknown) {
// Only treat 404 (Job deleted) as terminal. Transient 5xx or
// connection resets should NOT permanently disable the keepalive —
@@ -893,6 +1043,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
if (isK8s404(err)) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
if (ctx.onSpawn) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
return;
}
// Log transient errors but leave keepaliveJobTerminal false so
@@ -922,13 +1075,60 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// Shared signal: when job completion resolves, tell the log
// streamer to stop reconnecting.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
// Track when the log stream first exits so the grace-period can fire
// if the K8s Job condition lags behind container exit (FAR-23).
// Set via onFirstStreamExit callback (called after attempt=0 returns)
// rather than in .then() of streamPodLogs, which would create a
// deadlock: streamPodLogs only resolves after stopSignal is set, but
// stopSignal is set by the grace timer which needs logExitTime to be
// non-null.
let logExitTime: number | null = null;
const trackedLogStream = streamPodLogs(
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
() => { logExitTime = Date.now(); },
);
// completionWithGrace races waitForJobCompletion against a grace timer
// that fires LOG_EXIT_COMPLETION_GRACE_MS after the log stream exits.
// This bounds the stale-UI window when K8s Job conditions lag container
// exit (FAR-23): without it, waitForJobCompletion polls indefinitely
// while streamPodLogs reconnects, holding execute() open for minutes.
// logStopSignal.stopped is set on every settled path (fulfilled, rejected,
// or grace) so streamPodLogs stops reconnecting promptly.
type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean };
let gracePoller: ReturnType<typeof setInterval> | null = null;
const completionWithGrace = new Promise<CompletionResult>((resolve, reject) => {
let settled = false;
const settleOk = (r: CompletionResult) => {
if (settled) return;
settled = true;
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
logStopSignal.stopped = true;
resolve(r);
};
const settleErr = (err: unknown) => {
if (settled) return;
settled = true;
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
logStopSignal.stopped = true;
reject(err);
};
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr);
gracePoller = setInterval(() => {
if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) {
void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, jobGone: true });
}
}, 1_000);
});
const [logResult, completionResult] = await Promise.allSettled([
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal),
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then((r) => {
logStopSignal.stopped = true;
return r;
}),
trackedLogStream,
completionWithGrace,
]);
// Stop the keepalive immediately once the job has reached a terminal
@@ -952,7 +1152,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// from the beginning of the log, giving us the full output.
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
// the authoritative parse happens once below after all fallbacks complete).
const hasResultEvent = stdout.includes('"type":"result"');
const hasResultEvent = stdout.split("\n").some((l) => { try { return JSON.parse(l).type === "result"; } catch { return false; } });
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
if (needsOneShot) {
if (!stdout.trim()) {
@@ -961,9 +1161,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
if (!stdout.trim() && oneShotLogs.trim()) {
stdout = oneShotLogs;
await onLog("stdout", stdout);
const deduped = logDedup.filter(stdout) + logDedup.flush();
if (deduped) await onLog("stdout", deduped);
} else if (oneShotLogs && oneShotLogs.length > stdout.length) {
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
const deduped = logDedup.filter(oneShotLogs) + logDedup.flush();
if (deduped) await onLog("stdout", deduped);
stdout = oneShotLogs;
}
}
+95 -4
View File
@@ -166,6 +166,75 @@ describe("buildJobManifest", () => {
expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBeUndefined();
expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBeUndefined();
});
it("drops user label with paperclip.io/ prefix", () => {
ctx.config = { labels: { "paperclip.io/run-id": "hijacked" } };
const { job, skippedLabels } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/run-id"]).not.toBe("hijacked");
expect(skippedLabels).toContain("paperclip.io/run-id");
});
it("drops user label with app.kubernetes.io/ prefix", () => {
ctx.config = { labels: { "app.kubernetes.io/managed-by": "attacker" } };
const { job, skippedLabels } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["app.kubernetes.io/managed-by"]).toBe("paperclip");
expect(skippedLabels).toContain("app.kubernetes.io/managed-by");
});
it("passes through user label without reserved prefix", () => {
ctx.config = { labels: { "custom.io/team": "platform" } };
const { job, skippedLabels } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.["custom.io/team"]).toBe("platform");
expect(skippedLabels).not.toContain("custom.io/team");
});
it("populates skippedLabels with all dropped keys", () => {
ctx.config = {
labels: {
"paperclip.io/agent-id": "x",
"app.kubernetes.io/component": "y",
"safe": "z",
},
};
const { skippedLabels } = buildJobManifest({ ctx, selfPod });
expect(skippedLabels).toHaveLength(2);
expect(skippedLabels).toContain("paperclip.io/agent-id");
expect(skippedLabels).toContain("app.kubernetes.io/component");
});
});
describe("system label sanitization (N4)", () => {
it("sanitizes agent.id with @ to a valid RFC 1123 label", () => {
ctx.agent.id = "user@example.com";
const { job } = buildJobManifest({ ctx, selfPod });
const label = job.metadata?.labels?.["paperclip.io/agent-id"];
expect(label).toMatch(/^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$/);
expect(label).not.toContain("@");
});
it("sanitizes agent.id with spaces to a valid RFC 1123 label", () => {
ctx.agent.id = "my agent id";
const { job } = buildJobManifest({ ctx, selfPod });
const label = job.metadata?.labels?.["paperclip.io/agent-id"];
expect(label).toMatch(/^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$/);
});
it("omits paperclip.io/run-id when sanitized value is null (all-invalid runId)", () => {
// inject an all-special-chars runId via context override — buildJobManifest
// uses ctx.runId directly
const badCtx = makeCtx({ runId: "@@@" });
const { job, skippedLabels } = buildJobManifest({ ctx: badCtx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/run-id"]).toBeUndefined();
expect(skippedLabels).toContain("paperclip.io/run-id");
});
it("selector matches sanitized agent-id label", () => {
ctx.agent.id = "Agent@Test";
const { job } = buildJobManifest({ ctx, selfPod });
const agentLabel = job.metadata?.labels?.["paperclip.io/agent-id"];
// the label should equal what sanitizeLabelValue produces
expect(agentLabel).toBe("AgentTest");
});
});
describe("annotations", () => {
@@ -438,10 +507,10 @@ describe("buildJobManifest", () => {
it("uses configured resource overrides", () => {
ctx.config = {
resources: {
requests: { cpu: "500m", memory: "1Gi" },
limits: { cpu: "2000m", memory: "4Gi" },
},
"resources.requests.cpu": "500m",
"resources.requests.memory": "1Gi",
"resources.limits.cpu": "2000m",
"resources.limits.memory": "4Gi",
};
const { job } = buildJobManifest({ ctx, selfPod });
const resources = job.spec?.template?.spec?.containers[0]?.resources;
@@ -802,6 +871,28 @@ describe("buildJobManifest", () => {
expect(filterScript).toContain("tool_result");
});
it("filter script truncates without corrupting multi-byte UTF-8", () => {
// "中" is U+4E2D, 3 bytes in UTF-8: E4 B8 AD
// With MAX=5, two "中" (6 bytes) should truncate to one (3 bytes), not
// produce a replacement character from slicing mid-codepoint.
const setup = buildRtkSetupCommands(5);
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
// Extract the trunc function from the filter script and evaluate it
const fnMatch = filterScript.match(/(function trunc\(s\)\{.*\})(?=const tr=)/);
expect(fnMatch).toBeTruthy();
// eslint-disable-next-line no-eval
const trunc = eval(`(()=>{const MAX=5;${fnMatch![1]};return trunc;})()`);
const result = trunc("中中");
expect(result).not.toContain("");
expect(result).toContain("中");
expect(result).toContain("truncated by paperclip-rtk");
// Should report bytes from the actual truncation point, not MAX
expect(result).toContain("3 bytes truncated");
});
it("filter script handles array content (block format)", () => {
const setup = buildRtkSetupCommands(50000);
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
+30 -16
View File
@@ -47,7 +47,8 @@ export function buildRtkSetupCommands(maxOutputBytes: number): string {
`if(typeof s!=='string')return s;`,
`const b=Buffer.from(s,'utf-8');`,
`if(b.length<=MAX)return s;`,
`return b.slice(0,MAX).toString('utf-8')+'\\n[...'+(b.length-MAX)+' bytes truncated by paperclip-rtk]';`,
`let e=MAX;if(e>0){let p=e-1;while(p>0&&(b[p]&0xC0)===0x80)p--;const l=b[p];let n=1;if((l&0xE0)===0xC0)n=2;else if((l&0xF0)===0xE0)n=3;else if((l&0xF8)===0xF0)n=4;if(p+n>e)e=p;}`,
`return b.slice(0,e).toString('utf-8')+'\\n[...'+(b.length-e)+' bytes truncated by paperclip-rtk]';`,
`}`,
`const tr=o&&(o.tool_response||o.tool_result);`,
`if(tr){`,
@@ -199,10 +200,14 @@ export interface JobBuildResult {
/** Non-null when the prompt is too large for an env var and must be
* staged as a K8s Secret before creating the Job. */
promptSecret: PromptSecret | null;
/** User-supplied extra labels that were dropped because they used a reserved prefix. */
skippedLabels: string[];
}
function sanitizeForK8sName(value: string, maxLen = 16): string {
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
// Trim trailing hyphens after slicing so names don't end with `-` when
// truncation lands on a hyphen boundary (finding #16, FAR-15).
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen).replace(/-+$/, "");
}
/**
@@ -345,7 +350,6 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const extraArgs = asStringArray(config.extraArgs);
const timeoutSec = asNumber(config.timeoutSec, 0);
const ttlSeconds = asNumber(config.ttlSecondsAfterFinished, 300);
const resources = parseObject(config.resources);
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
const extraLabels = parseKeyValueConfig(config.labels);
@@ -427,29 +431,35 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
// Build env vars
const envVars = buildEnvVars(ctx, selfPod, config);
// Resource defaults
const resourceRequests = parseObject(resources.requests);
const resourceLimits = parseObject(resources.limits);
// Resource defaults — UI stores dotted keys (e.g. "resources.requests.cpu")
// as flat config entries, so read them directly from config with the dotted key.
const containerResources: k8s.V1ResourceRequirements = {
requests: {
cpu: asString(resourceRequests.cpu, "1000m"),
memory: asString(resourceRequests.memory, "2Gi"),
cpu: asString(config["resources.requests.cpu"], "1000m"),
memory: asString(config["resources.requests.memory"], "2Gi"),
},
limits: {
cpu: asString(resourceLimits.cpu, "4000m"),
memory: asString(resourceLimits.memory, "8Gi"),
cpu: asString(config["resources.limits.cpu"], "4000m"),
memory: asString(config["resources.limits.memory"], "8Gi"),
},
};
// Labels
// Labels — system identifiers must pass RFC 1123 label value format.
const sanitizedAgentId = sanitizeLabelValue(agent.id);
const sanitizedRunId = sanitizeLabelValue(runId);
const sanitizedCompanyId = sanitizeLabelValue(agent.companyId);
const skippedLabels: string[] = [];
if (!sanitizedRunId) skippedLabels.push("paperclip.io/run-id");
if (!sanitizedCompanyId) skippedLabels.push("paperclip.io/company-id");
const labels: Record<string, string> = {
"app.kubernetes.io/managed-by": "paperclip",
"app.kubernetes.io/component": "agent-job",
"paperclip.io/agent-id": agent.id,
"paperclip.io/run-id": runId,
"paperclip.io/company-id": agent.companyId,
// sanitizedAgentId null-check is enforced in execute.ts before Job creation
"paperclip.io/agent-id": sanitizedAgentId ?? agent.id,
"paperclip.io/adapter-type": "claude_k8s",
};
if (sanitizedRunId) labels["paperclip.io/run-id"] = sanitizedRunId;
if (sanitizedCompanyId) labels["paperclip.io/company-id"] = sanitizedCompanyId;
// Reattach-target labels: let a future execute() identify this Job as the
// continuation of the same logical unit of work (same task + same resume
// session) so it can attach to the running pod across a Paperclip restart
@@ -460,7 +470,11 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const sessionLabel = runtimeSessionId ? sanitizeLabelValue(runtimeSessionId) : null;
if (sessionLabel) labels["paperclip.io/session-id"] = sessionLabel;
for (const [key, value] of Object.entries(extraLabels)) {
labels[key] = value;
if (key.startsWith("paperclip.io/") || key.startsWith("app.kubernetes.io/")) {
skippedLabels.push(key);
} else {
labels[key] = value;
}
}
// Volumes
@@ -627,5 +641,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
},
};
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret };
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret, skippedLabels };
}
+6 -1
View File
@@ -106,7 +106,12 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise<SelfPodIn
throw new Error(`claude_k8s: pod ${hostname} has no spec`);
}
const mainContainer = spec.containers[0];
// Match the Paperclip container by name ("paperclip") to avoid service-mesh
// sidecars or other injected containers being picked up as the source of
// truth for the Job spec (finding #9, FAR-15). Fall back to the first
// container if no name match is found (matches prior behavior).
const mainContainer =
spec.containers.find((c) => c.name === "paperclip") ?? spec.containers[0];
if (!mainContainer?.image) {
throw new Error(`claude_k8s: pod ${hostname} has no container image`);
}
+15 -6
View File
@@ -9,9 +9,12 @@ export function parseClaudeStreamJson(stdout: string) {
let model = "";
let finalResult: Record<string, unknown> | null = null;
const assistantTexts: string[] = [];
// Belt-and-braces dedup: track seen text blocks to filter duplicates
// caused by log stream reconnects replaying overlapping windows.
const seenTexts = new Set<string>();
// Belt-and-braces dedup: key by (message.id, textIndex) so a session that
// legitimately emits the same text twice in different turns isn't collapsed
// (finding #11, FAR-15). The log-dedup filter handles reconnect overlaps
// at the line level; this guard only needs to protect against the same
// message block being parsed twice.
const seenBlocks = new Set<string>();
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
@@ -29,14 +32,20 @@ export function parseClaudeStreamJson(stdout: string) {
if (type === "assistant") {
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
const message = parseObject(event.message);
const messageId = asString(message.id, "");
const content = Array.isArray(message.content) ? message.content : [];
for (const entry of content) {
for (let i = 0; i < content.length; i++) {
const entry = content[i];
if (typeof entry !== "object" || entry === null || Array.isArray(entry)) continue;
const block = entry as Record<string, unknown>;
if (asString(block.type, "") === "text") {
const text = asString(block.text, "");
if (text && !seenTexts.has(text)) {
seenTexts.add(text);
if (!text) continue;
// Prefer (messageId, index) when the message has an id; fall back
// to text content when it doesn't (legacy/partial events).
const key = messageId ? `${messageId}:${i}` : `text:${text}`;
if (!seenBlocks.has(key)) {
seenBlocks.add(key);
assistantTexts.push(text);
}
}
+49
View File
@@ -0,0 +1,49 @@
import { describe, it, expect, vi } from "vitest";
import os from "node:os";
import path from "node:path";
import { prepareClaudePromptBundle } from "./prompt-cache.js";
const onLog = vi.fn();
describe("prepareClaudePromptBundle path traversal validation", () => {
const validArgs = {
skills: [],
instructionsContents: null,
onLog,
};
it("rejects companyId containing ..", async () => {
await expect(prepareClaudePromptBundle({ ...validArgs, companyId: ".." })).rejects.toThrow(/companyId/);
});
it("rejects companyId containing ../x", async () => {
await expect(prepareClaudePromptBundle({ ...validArgs, companyId: "../x" })).rejects.toThrow(/companyId/);
});
it("rejects companyId containing /", async () => {
await expect(prepareClaudePromptBundle({ ...validArgs, companyId: "a/b" })).rejects.toThrow(/companyId/);
});
it("rejects companyId containing backslash", async () => {
await expect(prepareClaudePromptBundle({ ...validArgs, companyId: "a\\b" })).rejects.toThrow(/companyId/);
});
it("rejects companyId containing null byte", async () => {
await expect(prepareClaudePromptBundle({ ...validArgs, companyId: "a\0b" })).rejects.toThrow(/companyId/);
});
it("rejects empty companyId", async () => {
await expect(prepareClaudePromptBundle({ ...validArgs, companyId: "" })).rejects.toThrow(/companyId/);
});
it("rejects whitespace-only companyId", async () => {
await expect(prepareClaudePromptBundle({ ...validArgs, companyId: " " })).rejects.toThrow(/companyId/);
});
it("accepts a valid companyId", async () => {
vi.stubEnv("PAPERCLIP_HOME", path.join(os.tmpdir(), `prompt-cache-test-${process.pid}`));
const result = await prepareClaudePromptBundle({ ...validArgs, companyId: "acme-co" });
expect(result.rootDir).toContain("acme-co");
vi.unstubAllEnvs();
});
});
+9
View File
@@ -21,6 +21,13 @@ export interface ClaudePromptBundle {
const DEFAULT_PAPERCLIP_INSTANCE_ID = "default";
function validatePathComponent(value: string, fieldName: string): void {
if (value.trim().length === 0) throw new Error(`Invalid ${fieldName}: must not be empty`);
if (value.includes("/") || value.includes("\\")) throw new Error(`Invalid ${fieldName}: must not contain path separators`);
if (value.includes("..")) throw new Error(`Invalid ${fieldName}: must not contain ".."`);
if (value.includes("\0")) throw new Error(`Invalid ${fieldName}: must not contain null bytes`);
}
function resolveManagedClaudePromptCacheRoot(companyId: string): string {
const paperclipHome =
(typeof process.env.PAPERCLIP_HOME === "string" && process.env.PAPERCLIP_HOME.trim().length > 0
@@ -31,6 +38,8 @@ function resolveManagedClaudePromptCacheRoot(companyId: string): string {
(typeof process.env.PAPERCLIP_INSTANCE_ID === "string" && process.env.PAPERCLIP_INSTANCE_ID.trim().length > 0
? process.env.PAPERCLIP_INSTANCE_ID.trim()
: null) ?? DEFAULT_PAPERCLIP_INSTANCE_ID;
validatePathComponent(companyId, "companyId");
validatePathComponent(instanceId, "instanceId");
return path.resolve(paperclipHome, "instances", instanceId, "companies", companyId, "claude-prompt-cache");
}
+16 -9
View File
@@ -85,8 +85,13 @@ async function checkRbac(
{ resource: "jobs", group: "batch", verb: "create", code: "k8s_rbac_job_create", label: "create Jobs" },
{ resource: "jobs", group: "batch", verb: "delete", code: "k8s_rbac_job_delete", label: "delete Jobs" },
{ resource: "jobs", group: "batch", verb: "get", code: "k8s_rbac_job_get", label: "get Jobs" },
{ resource: "jobs", group: "batch", verb: "list", code: "k8s_rbac_job_list", label: "list Jobs" },
{ resource: "pods", group: "", verb: "list", code: "k8s_rbac_pod_list", label: "list Pods" },
{ resource: "pods/log", group: "", verb: "get", code: "k8s_rbac_pod_log", label: "get Pod logs" },
{ resource: "secrets", group: "", verb: "create", code: "k8s_rbac_secret_create", label: "create Secrets" },
{ resource: "secrets", group: "", verb: "delete", code: "k8s_rbac_secret_delete", label: "delete Secrets" },
{ resource: "secrets", group: "", verb: "get", code: "k8s_rbac_secret_get", label: "get Secrets" },
{ resource: "persistentvolumeclaims", group: "", verb: "get", code: "k8s_rbac_pvc_get", label: "get PersistentVolumeClaims" },
];
for (const check of rbacChecks) {
@@ -221,16 +226,18 @@ export async function testEnvironment(
// 2. Target namespace exists
const nsOk = await checkNamespace(namespace, selfPod.namespace, checks, kubeconfigPath);
if (!nsOk) {
return { adapterType: ctx.adapterType, status: summarizeStatus(checks), checks, testedAt: new Date().toISOString() };
}
// 3-5. Run remaining checks in parallel
await Promise.all([
checkRbac(namespace, checks, kubeconfigPath),
checkSecret(namespace, secretRef, checks, kubeconfigPath),
checkPvc(selfPod, checks, kubeconfigPath),
]);
// 3-5. Run remaining checks even if namespace check failed so operators see
// all issues at once instead of fixing them one at a time.
if (nsOk) {
await Promise.all([
checkRbac(namespace, checks, kubeconfigPath),
checkSecret(namespace, secretRef, checks, kubeconfigPath),
checkPvc(selfPod, checks, kubeconfigPath),
]);
} else {
await checkRbac(namespace, checks, kubeconfigPath);
}
return {
adapterType: ctx.adapterType,