Compare commits

...

8 Commits

Author SHA1 Message Date
Test User 3fe4721da6 fix: eliminate reconnect duplicate logs with KMP-based dedup (FAR-105)
The prior fix (sinceSeconds window anchored to lastLogReceivedAt) still
caused duplicates: on reconnect, the K8s API re-streams N seconds of
already-seen content and those bytes were forwarded to onLog verbatim.

New approach:
- First stream attempt: emit chunks to onLog in real-time as before.
- Reconnect attempts: buffer all incoming chunks rather than emitting
  immediately, then call findNewLogContent() to strip any prefix of the
  buffered data that overlaps with content already sent, and forward only
  the genuinely new suffix.
- findNewLogContent uses the KMP failure-function algorithm (O(N)) to
  find the longest prefix of the reconnect data that matches a suffix of
  the accumulated content, guaranteeing correct deduplication even for
  large sinceSeconds windows or rapidly repeated lines.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-21 20:13:58 +00:00
farhoodliquor-paperclip[bot] 0660749c1f Merge pull request #3 from farhoodliquor/fix/p0-correctness-far107
fix: P0+P1 correctness fixes (FAR-107 PR 1-2/3)
2026-04-20 19:41:16 +00:00
Test User b45cc29787 chore: bump version to 0.1.25 for PR #3
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-20 19:40:26 +00:00
Test User 1e517bb9bb fix: P1 correctness and operational fixes from FAR-104/FAR-105 analysis
5. Cap log stream reconnect attempts at 50 — prevents infinite
   reconnect loops during sustained API partitions.

6. Fire keepalive refresh earlier — tick 1 + every 12 ticks (~3min)
   instead of every 16 ticks (~4min), providing better safety margin
   under the 5-minute reaper window.

7. Catch rejections from onLog inside keepalive — add .catch(() => {})
   to prevent unhandledRejection on SSE backpressure.

8. Prevent sanitized-name collisions — extend slugs to 16 chars each,
   add a 6-char SHA-256 hash suffix, shorten prefix to `ac-` to stay
   well within the 63-char DNS label limit.

10. Fix config-hint parity for nodeSelector and labels — parse both
    `key=value` multiline text and JSON objects, matching what the
    textarea hint promises.

11. Large-prompt fallback via Secret — prompts >256 KiB are staged as a
    K8s Secret and mounted as a volume instead of passed via env var,
    protecting against the ~1 MiB PodSpec limit.

13. Track last-seen log timestamp on reconnect — anchor sinceSeconds at
    the last received log line instead of stream start, fixing FAR-105
    duplicative logs. Belt-and-braces: dedupe assistantTexts at the
    parser boundary in parse.ts.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-20 19:05:07 +00:00
Test User d74b6d34b3 fix: P0 correctness fixes from FAR-104/FAR-105 analysis
1. Inherit envFrom and env.valueFrom from self pod — secrets wired via
   valueFrom.secretKeyRef or envFrom.secretRef are now forwarded to Job
   pods, fixing credentials silently dropped for K8s-idiomatic secret
   patterns (e.g. ANTHROPIC_API_KEY via Secret).

2. Distinguish 404 vs transient errors in keepalive — only mark the
   keepalive as terminal on 404 (Job deleted). Transient 5xx/connection
   errors are logged and retried on the next tick, preventing premature
   reaper kills during API instability.

3. Fail closed on concurrency-guard read failure — a failing
   listNamespacedJob now returns k8s_concurrency_guard_unreachable
   instead of silently proceeding, protecting against zombie Jobs on
   shared PVCs.

4. Bound the waitForJobCompletion re-check — pass a 60s timeout instead
   of polling forever, preventing indefinite hangs when the K8s API is
   degraded.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-20 18:57:16 +00:00
Test User c35253ddd4 0.1.24 2026-04-20 18:03:53 +00:00
Test User 5f358b2a26 chore: update package-lock.json
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-20 18:03:50 +00:00
Test User 5c28e6c191 fix: use printf instead of echo in init container to prevent prompt corruption
Busybox echo interprets escape sequences by default (\c, \n, \t, \0NNN, etc.).
If the prompt contains \c (common in file paths or shell references), echo
silently stops output at that point, truncating the prompt file. This can
leave Claude CLI with an empty or garbled stdin, causing it to hang with
zero output — manifesting as endless keepalive messages in the UI.

printf '%s' passes content through verbatim, avoiding the issue.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-20 18:03:37 +00:00
8 changed files with 508 additions and 70 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.22",
"version": "0.1.26",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.22",
"version": "0.1.26",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.23",
"version": "0.1.26",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+176 -25
View File
@@ -14,6 +14,7 @@ import { Writable } from "node:stream";
const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
/**
* Wait for the Job's pod to reach a terminal or running state.
@@ -162,11 +163,56 @@ async function streamPodLogsOnce(
return chunks.join("");
}
/**
* Given content already sent to the UI and new content from a reconnect
* stream, return only the portion that is genuinely new.
*
* The K8s sinceSeconds parameter causes reconnect streams to begin with
* content that was already sent during a prior stream attempt. We find
* the longest prefix of reconnectContent that overlaps with the tail of
* existingContent (i.e. the largest block of already-seen bytes at the
* start of the reconnect data) and skip it.
*
* Uses the KMP failure-function algorithm for O(|reconnectContent|) time.
*/
function findNewLogContent(existingContent: string, reconnectContent: string): string {
if (!reconnectContent) return "";
if (!existingContent) return reconnectContent;
// The overlap can be at most reconnectContent.length bytes long, so we
// only need to inspect the matching-length tail of existingContent.
const limit = Math.min(reconnectContent.length, existingContent.length);
const pattern = reconnectContent.slice(0, limit); // potential overlap prefix
const text = existingContent.slice(-limit); // tail to search in
// Build KMP failure function on (pattern + sentinel + text).
// The value of f at the final position equals the longest prefix of
// `pattern` that is simultaneously a suffix of `text` — the overlap length.
const sep = "\x00"; // sentinel unlikely to appear in log bytes
const s = pattern + sep + text;
const f = new Int32Array(s.length);
let k = 0;
for (let i = 1; i < s.length; i++) {
while (k > 0 && s.charCodeAt(i) !== s.charCodeAt(k)) k = f[k - 1];
if (s.charCodeAt(i) === s.charCodeAt(k)) k++;
f[i] = k;
}
return reconnectContent.slice(f[s.length - 1]);
}
/**
* Stream pod logs with automatic reconnection. Keeps retrying the log
* stream until the stop signal fires (job completed) or the container
* exits normally. This handles silent K8s API connection drops that
* would otherwise cause the UI to stop receiving real output.
*
* On reconnects, content is buffered rather than emitted directly.
* findNewLogContent then strips any overlap introduced by sinceSeconds,
* and only the genuinely new bytes are forwarded to onLog. This is the
* definitive fix for FAR-105 duplicate log output.
*
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*/
async function streamPodLogs(
namespace: string,
@@ -175,24 +221,59 @@ async function streamPodLogs(
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
): Promise<string> {
const allChunks: string[] = [];
let accumulated = ""; // all log content received and emitted so far
let attempt = 0;
const streamStartedAt = Math.floor(Date.now() / 1000);
// Updated per-chunk (inside callbacks) so it reflects when the last
// byte of log data actually arrived, not when the stream object closed.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
while (!stopSignal?.stopped) {
// On reconnect, ask for logs since the stream originally started to
// avoid missing output during the reconnect gap. Duplicates are
// tolerable — the UI deduplicates log chunks.
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
break;
}
// On reconnect, ask for logs since the last received chunk (+5s buffer)
// to avoid missing output produced during the reconnect gap.
const sinceSeconds = attempt > 0
? Math.max(1, Math.floor(Date.now() / 1000) - streamStartedAt + 5)
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
: undefined;
if (attempt > 0) {
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt})...\n`);
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
}
if (attempt === 0) {
// First attempt: emit chunks to onLog in real-time and track content.
const trackingOnLog: typeof onLog = async (stream, chunk) => {
accumulated += chunk;
lastLogReceivedAt = Math.floor(Date.now() / 1000);
return onLog(stream, chunk);
};
await streamPodLogsOnce(namespace, podName, trackingOnLog, kubeconfigPath, undefined);
} else {
// Reconnect: buffer all chunks received from this stream attempt.
// The sinceSeconds window means the stream begins with content that
// was already emitted — we must not forward those bytes again.
const reconnectChunks: string[] = [];
const bufferingLog: typeof onLog = async (_stream, chunk) => {
reconnectChunks.push(chunk);
lastLogReceivedAt = Math.floor(Date.now() / 1000);
};
await streamPodLogsOnce(namespace, podName, bufferingLog, kubeconfigPath, sinceSeconds);
const reconnectContent = reconnectChunks.join("");
if (reconnectContent) {
// Strip the overlapping prefix (already-sent content) and emit only
// what extends beyond what the UI has already seen.
const newContent = findNewLogContent(accumulated, reconnectContent);
accumulated += newContent;
if (newContent) {
await onLog("stdout", newContent);
}
}
}
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
if (result) allChunks.push(result);
attempt++;
// If the job is done or the container exited, no need to reconnect.
@@ -202,7 +283,7 @@ async function streamPodLogs(
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
}
return allChunks.join("");
return accumulated;
}
/**
@@ -356,12 +437,24 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
}
} catch {
// If we can't check, proceed — the heartbeat service enforces concurrency too
} catch (err: unknown) {
// If we can't list jobs, fail closed — the K8s concurrency guard is the
// only thing preventing zombie Jobs on a shared PVC from corrupting
// sessions. 404 (namespace not found) is treated as a hard failure;
// other errors (5xx, network) are also surfaced.
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Concurrency guard failed: unable to list jobs: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Concurrency guard unreachable: ${msg}`,
errorCode: "k8s_concurrency_guard_unreachable",
};
}
// Build Job manifest
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics } = buildJobManifest({
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
ctx,
selfPod,
});
@@ -384,6 +477,42 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
} as Parameters<typeof onMeta>[0]);
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
const coreApi = getCoreApi(kubeconfigPath);
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
},
},
stringData: promptSecret.data,
},
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
};
}
}
// Create the Job
const batchApi = getBatchApi(kubeconfigPath);
try {
@@ -486,21 +615,32 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
keepaliveJobTerminal = true;
return;
}
} catch {
// Job may have been deleted out from under us, or the API call
// transiently failed. Either way, do not refresh updatedAt
// either the Job really is gone, or the next tick will re-check.
keepaliveJobTerminal = true;
} catch (err: unknown) {
// Only treat 404 (Job deleted) as terminal. Transient 5xx or
// connection resets should NOT permanently disable the keepalive
// the next tick will re-check and the reaper uses the staleness
// window as a safety net.
const statusCode = (err as { response?: { statusCode?: number } })?.response?.statusCode
?? (err as { statusCode?: number })?.statusCode;
if (statusCode === 404) {
keepaliveJobTerminal = true;
return;
}
// Log transient errors but leave keepaliveJobTerminal false so
// the next tick retries.
const msg = err instanceof Error ? err.message : String(err);
void onLog("stderr", `[paperclip] keepalive: transient error checking job status: ${msg}\n`).catch(() => {});
return;
}
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`);
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {});
// Refresh updatedAt every ~4 minutes (16 ticks × 15s) to stay
// well within the 5-minute reaper staleness window.
// Refresh updatedAt every ~3 minutes (12 ticks × 15s = 180s) to
// stay well within the 5-minute reaper staleness window. Also
// fire on tick 1 for an early safety margin after job start.
keepaliveTick++;
if (ctx.onSpawn && keepaliveTick % 16 === 0) {
if (ctx.onSpawn && (keepaliveTick === 1 || keepaliveTick % 12 === 0)) {
void ctx.onSpawn({ pid: -1, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
})();
@@ -550,11 +690,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
} else {
// waitForJobCompletion threw — re-check job state to avoid returning
// while the job is still running (which would cause UI staleness and
// concurrency errors on retry).
// concurrency errors on retry). Use a bounded timeout (60s) so we
// don't hang the heartbeat indefinitely if the K8s API is degraded.
jobTimedOut = false;
const actualState = await waitForJobCompletion(namespace, jobName, 0, kubeconfigPath);
const RECHECK_TIMEOUT_MS = 60_000;
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath);
if (actualState.timedOut) {
// Truly a timeout after re-check — treat as timed out.
// Re-check itself timed out — the job may still be running.
// Return an error so the UI knows the run is not done.
jobTimedOut = true;
} else if (!actualState.succeeded) {
// Job still not terminal — the completion error was likely transient.
@@ -582,6 +725,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
} else {
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
}
// Clean up prompt Secret if one was created
if (promptSecret) {
try {
await coreApi.deleteNamespacedSecret({ name: promptSecret.name, namespace: promptSecret.namespace });
} catch {
// Best-effort cleanup — TTL or manual deletion will catch stragglers
}
}
}
// Parse Claude output (reuse claude_local parsing)
+145 -12
View File
@@ -24,6 +24,8 @@ function makeSelfPod(overrides: Partial<SelfPodInfo> = {}): SelfPodInfo {
pvcClaimName: "paperclip-data",
secretVolumes: [],
inheritedEnv: {},
inheritedEnvValueFrom: [],
inheritedEnvFrom: [],
...overrides,
};
}
@@ -38,25 +40,44 @@ describe("buildJobManifest", () => {
});
describe("job naming", () => {
it("uses agent-claude- prefix", () => {
it("uses ac- prefix", () => {
const { jobName } = buildJobManifest({ ctx, selfPod });
expect(jobName).toMatch(/^agent-claude-/);
expect(jobName).toMatch(/^ac-/);
});
it("includes sanitized agent id slug", () => {
it("includes sanitized agent id slug (up to 16 chars)", () => {
ctx.agent.id = "Agent-ABC!@#";
const { jobName } = buildJobManifest({ ctx, selfPod });
// sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-8
// "Agent-ABC!@#" -> "agent-abc" (strips !@#, slice to 8 = "agent-ab")
expect(jobName).toContain("agent-ab");
// sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-16
expect(jobName).toContain("agent-abc");
});
it("includes sanitized run id slug", () => {
it("includes sanitized run id slug (up to 16 chars)", () => {
ctx.runId = "RUN-ABC-12345";
const { jobName } = buildJobManifest({ ctx, selfPod });
// sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-8
// "RUN-ABC-12345" -> "run-abc-12345" (slice to 8 = "run-abc-")
expect(jobName).toContain("run-abc-");
expect(jobName).toContain("run-abc-12345");
});
it("includes a deterministic hash suffix", () => {
const result1 = buildJobManifest({ ctx, selfPod });
const result2 = buildJobManifest({ ctx, selfPod });
expect(result1.jobName).toBe(result2.jobName);
// Hash suffix is 6 hex chars at the end
expect(result1.jobName).toMatch(/-[0-9a-f]{6}$/);
});
it("different agent+run pairs produce different names", () => {
const result1 = buildJobManifest({ ctx, selfPod });
ctx.runId = "run-different";
const result2 = buildJobManifest({ ctx, selfPod });
expect(result1.jobName).not.toBe(result2.jobName);
});
it("stays within 63-char DNS label limit", () => {
ctx.agent.id = "a".repeat(100);
ctx.runId = "r".repeat(100);
const { jobName } = buildJobManifest({ ctx, selfPod });
expect(jobName.length).toBeLessThanOrEqual(63);
});
});
@@ -181,7 +202,7 @@ describe("buildJobManifest", () => {
it("write-prompt writes PROMPT_CONTENT to /tmp/prompt/prompt.txt", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const init = job.spec?.template?.spec?.initContainers?.[0];
expect(init?.command).toEqual(["sh", "-c", "echo \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"]);
expect(init?.command).toEqual(["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"]);
});
it("write-prompt mounts prompt volume", () => {
@@ -331,6 +352,50 @@ describe("buildJobManifest", () => {
const apiUrl = job.spec?.template?.spec?.containers[0]?.env?.find((e) => e.name === "PAPERCLIP_API_URL");
expect(apiUrl?.value).toBe("http://paperclip:8080");
});
it("includes valueFrom env vars from selfPod", () => {
selfPod.inheritedEnvValueFrom = [
{ name: "ANTHROPIC_API_KEY", valueFrom: { secretKeyRef: { name: "api-keys", key: "anthropic" } } },
];
const { job } = buildJobManifest({ ctx, selfPod });
const envList = job.spec?.template?.spec?.containers[0]?.env ?? [];
const apiKeyEntry = envList.find((e) => e.name === "ANTHROPIC_API_KEY");
expect(apiKeyEntry?.valueFrom?.secretKeyRef?.name).toBe("api-keys");
expect(apiKeyEntry?.valueFrom?.secretKeyRef?.key).toBe("anthropic");
expect(apiKeyEntry?.value).toBeUndefined();
});
it("literal env overrides valueFrom with the same name", () => {
selfPod.inheritedEnv = { MY_VAR: "literal-value" };
selfPod.inheritedEnvValueFrom = [
{ name: "MY_VAR", valueFrom: { secretKeyRef: { name: "sec", key: "k" } } },
];
const { job } = buildJobManifest({ ctx, selfPod });
const envList = job.spec?.template?.spec?.containers[0]?.env ?? [];
const myVar = envList.filter((e) => e.name === "MY_VAR");
expect(myVar).toHaveLength(1);
expect(myVar[0]?.value).toBe("literal-value");
expect(myVar[0]?.valueFrom).toBeUndefined();
});
it("includes envFrom sources from selfPod on the container", () => {
selfPod.inheritedEnvFrom = [
{ secretRef: { name: "api-secrets" } },
{ configMapRef: { name: "app-config" } },
];
const { job } = buildJobManifest({ ctx, selfPod });
const container = job.spec?.template?.spec?.containers[0];
expect(container?.envFrom).toHaveLength(2);
expect(container?.envFrom?.[0]?.secretRef?.name).toBe("api-secrets");
expect(container?.envFrom?.[1]?.configMapRef?.name).toBe("app-config");
});
it("omits envFrom when selfPod has none", () => {
selfPod.inheritedEnvFrom = [];
const { job } = buildJobManifest({ ctx, selfPod });
const container = job.spec?.template?.spec?.containers[0];
expect(container?.envFrom).toBeUndefined();
});
});
describe("resources", () => {
@@ -498,7 +563,7 @@ describe("buildJobManifest", () => {
});
describe("return value", () => {
it("returns job, jobName, namespace, prompt, claudeArgs, promptMetrics", () => {
it("returns job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret", () => {
const result = buildJobManifest({ ctx, selfPod });
expect(result.job).toBeDefined();
expect(result.jobName).toBeDefined();
@@ -506,6 +571,74 @@ describe("buildJobManifest", () => {
expect(result.prompt).toBeDefined();
expect(result.claudeArgs).toBeDefined();
expect(result.promptMetrics).toBeDefined();
expect(result.promptSecret).toBeNull();
});
});
describe("nodeSelector key=value parsing", () => {
it("parses key=value multiline text", () => {
ctx.config = { nodeSelector: "disktype=ssd\ntopology.kubernetes.io/zone=us-east-1a" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.spec?.template?.spec?.nodeSelector).toEqual({
disktype: "ssd",
"topology.kubernetes.io/zone": "us-east-1a",
});
});
it("still accepts JSON objects", () => {
ctx.config = { nodeSelector: { disktype: "ssd" } };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" });
});
it("parses JSON string format", () => {
ctx.config = { nodeSelector: '{"disktype":"ssd"}' };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" });
});
it("skips comment lines and blank lines", () => {
ctx.config = { nodeSelector: "# comment\n\ndisktype=ssd\n" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" });
});
});
describe("labels key=value parsing", () => {
it("parses key=value multiline text for extra labels", () => {
ctx.config = { labels: "env=prod\nteam=platform" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.env).toBe("prod");
expect(job.metadata?.labels?.team).toBe("platform");
});
});
describe("large prompt Secret fallback", () => {
it("returns null promptSecret for small prompts", () => {
const { promptSecret } = buildJobManifest({ ctx, selfPod });
expect(promptSecret).toBeNull();
});
it("returns promptSecret for prompts >256 KiB", () => {
// Build a prompt >256 KiB via a custom template
const largePrompt = "x".repeat(300 * 1024);
ctx.config = { promptTemplate: largePrompt };
const { promptSecret, job } = buildJobManifest({ ctx, selfPod });
expect(promptSecret).not.toBeNull();
expect(promptSecret!.data["prompt.txt"]).toBe(largePrompt);
// Init container should copy from secret volume, not use PROMPT_CONTENT env
const init = job.spec?.template?.spec?.initContainers?.[0];
expect(init?.command).toContainEqual(expect.stringContaining("cp"));
expect(init?.env).toBeUndefined();
// Should have prompt-secret volume
const secretVol = job.spec?.template?.spec?.volumes?.find((v) => v.name === "prompt-secret");
expect(secretVol?.secret?.secretName).toBe(promptSecret!.name);
});
it("uses env var init container for small prompts", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const init = job.spec?.template?.spec?.initContainers?.[0];
expect(init?.env?.[0]?.name).toBe("PROMPT_CONTENT");
});
});
});
+146 -26
View File
@@ -9,6 +9,11 @@ import {
buildPaperclipEnv,
renderTemplate,
} from "@paperclipai/adapter-utils/server-utils";
import { createHash } from "node:crypto";
/** Prompts above this size (bytes) are staged via a Secret instead of an
* init container env var, protecting against the ~1 MiB PodSpec limit. */
const LARGE_PROMPT_THRESHOLD_BYTES = 256 * 1024;
// Inline prompt assembly — these functions are not yet in the published adapter-utils
function joinPromptSections(sections: string[], separator = "\n\n"): string {
@@ -44,11 +49,63 @@ function renderPaperclipWakePrompt(wake: unknown, _opts?: { resumedSession?: boo
}
import type { SelfPodInfo } from "./k8s-client.js";
/**
* Parse a config value that may be either a JSON object or multiline
* `key=value` text (one pair per line). This fixes the config-hint
* parity issue where textarea hints promise `key=value` per line but
* `parseObject` only handles JSON.
*/
function parseKeyValueConfig(raw: unknown): Record<string, string> {
if (typeof raw === "object" && raw !== null && !Array.isArray(raw)) {
// Already an object (JSON was parsed upstream)
const result: Record<string, string> = {};
for (const [k, v] of Object.entries(raw as Record<string, unknown>)) {
if (typeof v === "string") result[k] = v;
}
return result;
}
if (typeof raw !== "string" || !raw.trim()) return {};
// Try JSON parse first
try {
const parsed = JSON.parse(raw);
if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) {
const result: Record<string, string> = {};
for (const [k, v] of Object.entries(parsed as Record<string, unknown>)) {
if (typeof v === "string") result[k] = v;
}
return result;
}
} catch {
// Not JSON — fall through to key=value parsing
}
// Parse key=value lines
const result: Record<string, string> = {};
for (const line of raw.split(/\r?\n/)) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith("#")) continue;
const eqIdx = trimmed.indexOf("=");
if (eqIdx <= 0) continue;
const key = trimmed.slice(0, eqIdx).trim();
const value = trimmed.slice(eqIdx + 1).trim();
if (key) result[key] = value;
}
return result;
}
export interface JobBuildInput {
ctx: AdapterExecutionContext;
selfPod: SelfPodInfo;
}
/** When the prompt exceeds the env-var size limit, the manifest uses a
* Secret-backed volume instead of the init container's PROMPT_CONTENT env.
* The caller must create this Secret before the Job and clean it up after. */
export interface PromptSecret {
name: string;
namespace: string;
data: Record<string, string>;
}
export interface JobBuildResult {
job: k8s.V1Job;
jobName: string;
@@ -56,10 +113,21 @@ export interface JobBuildResult {
prompt: string;
claudeArgs: string[];
promptMetrics: Record<string, number>;
/** Non-null when the prompt is too large for an env var and must be
* staged as a K8s Secret before creating the Job. */
promptSecret: PromptSecret | null;
}
function sanitizeForK8sName(value: string): string {
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, 8);
function sanitizeForK8sName(value: string, maxLen = 16): string {
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
}
/**
* Build a short deterministic hash suffix from the raw inputs to avoid
* collisions when sanitized slugs happen to be identical.
*/
function shortHash(input: string, len = 6): string {
return createHash("sha256").update(input).digest("hex").slice(0, len);
}
function buildEnvVars(
@@ -148,12 +216,22 @@ function buildEnvVars(
// HOME must be /paperclip to match PVC mount and enable session resume
merged.HOME = "/paperclip";
// Convert to V1EnvVar array
// Convert literal env to V1EnvVar array
const envVars: k8s.V1EnvVar[] = Object.entries(merged).map(([name, value]) => ({
name,
value,
}));
// Append valueFrom entries from the Deployment container (secretKeyRef,
// configMapKeyRef, fieldRef, etc.). Skip any whose name was already set
// by a literal value — the literal value wins (same precedence as above).
const literalNames = new Set(Object.keys(merged));
for (const entry of selfPod.inheritedEnvValueFrom) {
if (!literalNames.has(entry.name)) {
envVars.push(entry);
}
}
return envVars;
}
@@ -174,9 +252,9 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const timeoutSec = asNumber(config.timeoutSec, 0);
const ttlSeconds = asNumber(config.ttlSecondsAfterFinished, 300);
const resources = parseObject(config.resources);
const nodeSelector = parseObject(config.nodeSelector);
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
const extraLabels = parseObject(config.labels);
const extraLabels = parseKeyValueConfig(config.labels);
// Resolve working directory — use workspace cwd, fall back to /paperclip
const workspaceContext = parseObject(context.paperclipWorkspace);
@@ -184,9 +262,13 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const configuredCwd = asString(config.cwd, "");
const workingDir = workspaceCwd || configuredCwd || "/paperclip";
const agentSlug = sanitizeForK8sName(agent.id);
const runSlug = sanitizeForK8sName(runId);
const jobName = `agent-claude-${agentSlug}-${runSlug}`;
// Build a deterministic, collision-resistant job name within the 63-char
// DNS label limit. Layout: "ac-{agentSlug}-{runSlug}-{hash}" where the
// hash is derived from the raw (un-truncated) agent+run IDs.
const agentSlug = sanitizeForK8sName(agent.id, 16);
const runSlug = sanitizeForK8sName(runId, 16);
const hash = shortHash(`${agent.id}:${runId}`);
const jobName = `ac-${agentSlug}-${runSlug}-${hash}`;
// Build prompt (same logic as claude_local)
const promptTemplate = asString(
@@ -265,7 +347,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
"paperclip.io/adapter-type": "claude_k8s",
};
for (const [key, value] of Object.entries(extraLabels)) {
if (typeof value === "string") labels[key] = value;
labels[key] = value;
}
// Volumes
@@ -328,6 +410,57 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
const mainCommand = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
// Decide prompt delivery strategy: env var (small) or Secret volume (large).
const promptBytes = Buffer.byteLength(prompt, "utf-8");
const useLargePromptPath = promptBytes > LARGE_PROMPT_THRESHOLD_BYTES;
let promptSecret: PromptSecret | null = null;
const promptSecretName = `${jobName}-prompt`;
if (useLargePromptPath) {
// Stage prompt as a Secret; the init container copies from the mounted
// secret volume to the emptyDir so the main container reads it the
// same way regardless of prompt size.
promptSecret = {
name: promptSecretName,
namespace,
data: { "prompt.txt": prompt },
};
volumes.push({
name: "prompt-secret",
secret: { secretName: promptSecretName, optional: false },
});
}
const initContainer: k8s.V1Container = useLargePromptPath
? {
name: "write-prompt",
image: "busybox:1.36",
imagePullPolicy: "IfNotPresent",
command: ["sh", "-c", "cp /tmp/prompt-secret/prompt.txt /tmp/prompt/prompt.txt"],
volumeMounts: [
{ name: "prompt", mountPath: "/tmp/prompt" },
{ name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true },
],
securityContext,
resources: {
requests: { cpu: "10m", memory: "16Mi" },
limits: { cpu: "100m", memory: "64Mi" },
},
}
: {
name: "write-prompt",
image: "busybox:1.36",
imagePullPolicy: "IfNotPresent",
command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
env: [{ name: "PROMPT_CONTENT", value: prompt }],
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
securityContext,
resources: {
requests: { cpu: "10m", memory: "16Mi" },
limits: { cpu: "100m", memory: "64Mi" },
},
};
const job: k8s.V1Job = {
apiVersion: "batch/v1",
kind: "Job",
@@ -352,23 +485,9 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
securityContext: podSecurityContext,
...(selfPod.imagePullSecrets.length > 0 ? { imagePullSecrets: selfPod.imagePullSecrets } : {}),
...(selfPod.dnsConfig ? { dnsConfig: selfPod.dnsConfig } : {}),
...(Object.keys(nodeSelector).length > 0 ? { nodeSelector: nodeSelector as Record<string, string> } : {}),
...(Object.keys(nodeSelector).length > 0 ? { nodeSelector } : {}),
...(tolerations.length > 0 ? { tolerations: tolerations as k8s.V1Toleration[] } : {}),
initContainers: [
{
name: "write-prompt",
image: "busybox:1.36",
imagePullPolicy: "IfNotPresent",
command: ["sh", "-c", "echo \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
env: [{ name: "PROMPT_CONTENT", value: prompt }],
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
securityContext,
resources: {
requests: { cpu: "10m", memory: "16Mi" },
limits: { cpu: "100m", memory: "64Mi" },
},
},
],
initContainers: [initContainer],
containers: [
{
name: "claude",
@@ -377,6 +496,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
workingDir,
command: ["sh", "-c", mainCommand],
env: envVars,
...(selfPod.inheritedEnvFrom.length > 0 ? { envFrom: selfPod.inheritedEnvFrom } : {}),
volumeMounts,
securityContext,
resources: containerResources,
@@ -388,5 +508,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
},
};
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics };
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret };
}
+18 -3
View File
@@ -20,8 +20,12 @@ export interface SelfPodInfo {
dnsConfig: k8s.V1PodDNSConfig | undefined;
pvcClaimName: string | null;
secretVolumes: SelfPodSecretVolume[];
/** Env vars inherited from the Deployment container. */
/** Env vars inherited from the Deployment container (literal name/value pairs). */
inheritedEnv: Record<string, string>;
/** Env vars with valueFrom (secretKeyRef, configMapKeyRef, etc.) from the Deployment container. */
inheritedEnvValueFrom: k8s.V1EnvVar[];
/** envFrom sources (secretRef, configMapRef) from the Deployment container. */
inheritedEnvFrom: k8s.V1EnvFromSource[];
}
let cachedSelfPod: SelfPodInfo | null = null;
@@ -134,12 +138,21 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise<SelfPodIn
// Collect env vars from the pod spec's container definition.
// Agent config env (set in buildEnvVars) will override these.
const inheritedEnv: Record<string, string> = {};
const inheritedEnvValueFrom: k8s.V1EnvVar[] = [];
for (const envItem of mainContainer.env ?? []) {
if (!envItem.name) continue;
const value = envItem.value ?? "";
if (value) inheritedEnv[envItem.name] = value;
if (envItem.valueFrom) {
// Preserve valueFrom entries (secretKeyRef, configMapKeyRef, fieldRef, etc.)
inheritedEnvValueFrom.push({ name: envItem.name, valueFrom: envItem.valueFrom });
} else {
const value = envItem.value ?? "";
if (value) inheritedEnv[envItem.name] = value;
}
}
// Capture envFrom sources (secretRef, configMapRef) from the container spec
const inheritedEnvFrom: k8s.V1EnvFromSource[] = mainContainer.envFrom ?? [];
cachedSelfPod = {
namespace,
image: mainContainer.image,
@@ -150,6 +163,8 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise<SelfPodIn
pvcClaimName,
secretVolumes,
inheritedEnv,
inheritedEnvValueFrom,
inheritedEnvFrom,
};
return cachedSelfPod;
+13
View File
@@ -141,6 +141,19 @@ more raw output`;
expect(result.summary).toContain("JSON output");
expect(result.summary).not.toContain("some raw output");
});
it("deduplicates identical assistant text blocks from reconnect replays", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: { content: [{ type: "text", text: "Hello world" }] },
});
// Simulate the same assistant event appearing twice (log stream reconnect replay)
const stdout = `${assistantEvent}\n${assistantEvent}\n`;
const result = parseClaudeStreamJson(stdout);
expect(result.summary).toBe("Hello world");
// Should not be "Hello world\n\nHello world"
expect(result.summary.split("Hello world").length).toBe(2);
});
});
describe("extractClaudeLoginUrl", () => {
+7 -1
View File
@@ -9,6 +9,9 @@ export function parseClaudeStreamJson(stdout: string) {
let model = "";
let finalResult: Record<string, unknown> | null = null;
const assistantTexts: string[] = [];
// Belt-and-braces dedup: track seen text blocks to filter duplicates
// caused by log stream reconnects replaying overlapping windows.
const seenTexts = new Set<string>();
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
@@ -32,7 +35,10 @@ export function parseClaudeStreamJson(stdout: string) {
const block = entry as Record<string, unknown>;
if (asString(block.type, "") === "text") {
const text = asString(block.text, "");
if (text) assistantTexts.push(text);
if (text && !seenTexts.has(text)) {
seenTexts.add(text);
assistantTexts.push(text);
}
}
}
continue;