fix: P0+P1 correctness fixes (FAR-107 PR 1-2/3) #3

Merged
farhoodliquor-paperclip[bot] merged 3 commits from fix/p0-correctness-far107 into master 2026-04-20 19:41:17 +00:00
8 changed files with 445 additions and 66 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.24",
"version": "0.1.25",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.24",
"version": "0.1.25",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.24",
"version": "0.1.25",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+114 -22
View File
@@ -14,6 +14,7 @@ import { Writable } from "node:stream";
const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
/**
* Wait for the Job's pod to reach a terminal or running state.
@@ -167,6 +168,9 @@ async function streamPodLogsOnce(
* stream until the stop signal fires (job completed) or the container
* exits normally. This handles silent K8s API connection drops that
* would otherwise cause the UI to stop receiving real output.
*
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*/
async function streamPodLogs(
namespace: string,
@@ -177,22 +181,40 @@ async function streamPodLogs(
): Promise<string> {
const allChunks: string[] = [];
let attempt = 0;
const streamStartedAt = Math.floor(Date.now() / 1000);
// Track the timestamp of the last successfully received log line so
// reconnects use a tight window instead of an ever-growing one anchored
// at stream start. This is the primary fix for FAR-105 duplicative logs.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
while (!stopSignal?.stopped) {
// On reconnect, ask for logs since the stream originally started to
// avoid missing output during the reconnect gap. Duplicates are
// tolerable — the UI deduplicates log chunks.
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
break;
}
// On reconnect, ask for logs since the last received line (+5s buffer)
// instead of since stream start. This keeps the window tight and
// avoids ever-growing duplicate output.
const sinceSeconds = attempt > 0
? Math.max(1, Math.floor(Date.now() / 1000) - streamStartedAt + 5)
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
: undefined;
if (attempt > 0) {
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt})...\n`);
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
if (result) allChunks.push(result);
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
// so any log lines in `result` were received up to this moment).
lastLogReceivedAt = Math.floor(Date.now() / 1000);
} else if (attempt === 0) {
// First attempt returned nothing — update timestamp so reconnect
// window stays reasonable.
lastLogReceivedAt = preStreamTs;
}
attempt++;
// If the job is done or the container exited, no need to reconnect.
@@ -356,12 +378,24 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
}
} catch {
// If we can't check, proceed — the heartbeat service enforces concurrency too
} catch (err: unknown) {
// If we can't list jobs, fail closed — the K8s concurrency guard is the
// only thing preventing zombie Jobs on a shared PVC from corrupting
// sessions. 404 (namespace not found) is treated as a hard failure;
// other errors (5xx, network) are also surfaced.
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Concurrency guard failed: unable to list jobs: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Concurrency guard unreachable: ${msg}`,
errorCode: "k8s_concurrency_guard_unreachable",
};
}
// Build Job manifest
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics } = buildJobManifest({
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
ctx,
selfPod,
});
@@ -384,6 +418,42 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
} as Parameters<typeof onMeta>[0]);
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
const coreApi = getCoreApi(kubeconfigPath);
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
},
},
stringData: promptSecret.data,
},
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
};
}
}
// Create the Job
const batchApi = getBatchApi(kubeconfigPath);
try {
@@ -486,21 +556,32 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
keepaliveJobTerminal = true;
return;
}
} catch {
// Job may have been deleted out from under us, or the API call
// transiently failed. Either way, do not refresh updatedAt
// either the Job really is gone, or the next tick will re-check.
keepaliveJobTerminal = true;
} catch (err: unknown) {
// Only treat 404 (Job deleted) as terminal. Transient 5xx or
// connection resets should NOT permanently disable the keepalive
// the next tick will re-check and the reaper uses the staleness
// window as a safety net.
const statusCode = (err as { response?: { statusCode?: number } })?.response?.statusCode
?? (err as { statusCode?: number })?.statusCode;
if (statusCode === 404) {
keepaliveJobTerminal = true;
return;
}
// Log transient errors but leave keepaliveJobTerminal false so
// the next tick retries.
const msg = err instanceof Error ? err.message : String(err);
void onLog("stderr", `[paperclip] keepalive: transient error checking job status: ${msg}\n`).catch(() => {});
return;
}
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`);
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {});
// Refresh updatedAt every ~4 minutes (16 ticks × 15s) to stay
// well within the 5-minute reaper staleness window.
// Refresh updatedAt every ~3 minutes (12 ticks × 15s = 180s) to
// stay well within the 5-minute reaper staleness window. Also
// fire on tick 1 for an early safety margin after job start.
keepaliveTick++;
if (ctx.onSpawn && keepaliveTick % 16 === 0) {
if (ctx.onSpawn && (keepaliveTick === 1 || keepaliveTick % 12 === 0)) {
void ctx.onSpawn({ pid: -1, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
})();
@@ -550,11 +631,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
} else {
// waitForJobCompletion threw — re-check job state to avoid returning
// while the job is still running (which would cause UI staleness and
// concurrency errors on retry).
// concurrency errors on retry). Use a bounded timeout (60s) so we
// don't hang the heartbeat indefinitely if the K8s API is degraded.
jobTimedOut = false;
const actualState = await waitForJobCompletion(namespace, jobName, 0, kubeconfigPath);
const RECHECK_TIMEOUT_MS = 60_000;
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath);
if (actualState.timedOut) {
// Truly a timeout after re-check — treat as timed out.
// Re-check itself timed out — the job may still be running.
// Return an error so the UI knows the run is not done.
jobTimedOut = true;
} else if (!actualState.succeeded) {
// Job still not terminal — the completion error was likely transient.
@@ -582,6 +666,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
} else {
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
}
// Clean up prompt Secret if one was created
if (promptSecret) {
try {
await coreApi.deleteNamespacedSecret({ name: promptSecret.name, namespace: promptSecret.namespace });
} catch {
// Best-effort cleanup — TTL or manual deletion will catch stragglers
}
}
}
// Parse Claude output (reuse claude_local parsing)
+144 -11
View File
@@ -24,6 +24,8 @@ function makeSelfPod(overrides: Partial<SelfPodInfo> = {}): SelfPodInfo {
pvcClaimName: "paperclip-data",
secretVolumes: [],
inheritedEnv: {},
inheritedEnvValueFrom: [],
inheritedEnvFrom: [],
...overrides,
};
}
@@ -38,25 +40,44 @@ describe("buildJobManifest", () => {
});
describe("job naming", () => {
it("uses agent-claude- prefix", () => {
it("uses ac- prefix", () => {
const { jobName } = buildJobManifest({ ctx, selfPod });
expect(jobName).toMatch(/^agent-claude-/);
expect(jobName).toMatch(/^ac-/);
});
it("includes sanitized agent id slug", () => {
it("includes sanitized agent id slug (up to 16 chars)", () => {
ctx.agent.id = "Agent-ABC!@#";
const { jobName } = buildJobManifest({ ctx, selfPod });
// sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-8
// "Agent-ABC!@#" -> "agent-abc" (strips !@#, slice to 8 = "agent-ab")
expect(jobName).toContain("agent-ab");
// sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-16
expect(jobName).toContain("agent-abc");
});
it("includes sanitized run id slug", () => {
it("includes sanitized run id slug (up to 16 chars)", () => {
ctx.runId = "RUN-ABC-12345";
const { jobName } = buildJobManifest({ ctx, selfPod });
// sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-8
// "RUN-ABC-12345" -> "run-abc-12345" (slice to 8 = "run-abc-")
expect(jobName).toContain("run-abc-");
expect(jobName).toContain("run-abc-12345");
});
it("includes a deterministic hash suffix", () => {
const result1 = buildJobManifest({ ctx, selfPod });
const result2 = buildJobManifest({ ctx, selfPod });
expect(result1.jobName).toBe(result2.jobName);
// Hash suffix is 6 hex chars at the end
expect(result1.jobName).toMatch(/-[0-9a-f]{6}$/);
});
it("different agent+run pairs produce different names", () => {
const result1 = buildJobManifest({ ctx, selfPod });
ctx.runId = "run-different";
const result2 = buildJobManifest({ ctx, selfPod });
expect(result1.jobName).not.toBe(result2.jobName);
});
it("stays within 63-char DNS label limit", () => {
ctx.agent.id = "a".repeat(100);
ctx.runId = "r".repeat(100);
const { jobName } = buildJobManifest({ ctx, selfPod });
expect(jobName.length).toBeLessThanOrEqual(63);
});
});
@@ -331,6 +352,50 @@ describe("buildJobManifest", () => {
const apiUrl = job.spec?.template?.spec?.containers[0]?.env?.find((e) => e.name === "PAPERCLIP_API_URL");
expect(apiUrl?.value).toBe("http://paperclip:8080");
});
it("includes valueFrom env vars from selfPod", () => {
selfPod.inheritedEnvValueFrom = [
{ name: "ANTHROPIC_API_KEY", valueFrom: { secretKeyRef: { name: "api-keys", key: "anthropic" } } },
];
const { job } = buildJobManifest({ ctx, selfPod });
const envList = job.spec?.template?.spec?.containers[0]?.env ?? [];
const apiKeyEntry = envList.find((e) => e.name === "ANTHROPIC_API_KEY");
expect(apiKeyEntry?.valueFrom?.secretKeyRef?.name).toBe("api-keys");
expect(apiKeyEntry?.valueFrom?.secretKeyRef?.key).toBe("anthropic");
expect(apiKeyEntry?.value).toBeUndefined();
});
it("literal env overrides valueFrom with the same name", () => {
selfPod.inheritedEnv = { MY_VAR: "literal-value" };
selfPod.inheritedEnvValueFrom = [
{ name: "MY_VAR", valueFrom: { secretKeyRef: { name: "sec", key: "k" } } },
];
const { job } = buildJobManifest({ ctx, selfPod });
const envList = job.spec?.template?.spec?.containers[0]?.env ?? [];
const myVar = envList.filter((e) => e.name === "MY_VAR");
expect(myVar).toHaveLength(1);
expect(myVar[0]?.value).toBe("literal-value");
expect(myVar[0]?.valueFrom).toBeUndefined();
});
it("includes envFrom sources from selfPod on the container", () => {
selfPod.inheritedEnvFrom = [
{ secretRef: { name: "api-secrets" } },
{ configMapRef: { name: "app-config" } },
];
const { job } = buildJobManifest({ ctx, selfPod });
const container = job.spec?.template?.spec?.containers[0];
expect(container?.envFrom).toHaveLength(2);
expect(container?.envFrom?.[0]?.secretRef?.name).toBe("api-secrets");
expect(container?.envFrom?.[1]?.configMapRef?.name).toBe("app-config");
});
it("omits envFrom when selfPod has none", () => {
selfPod.inheritedEnvFrom = [];
const { job } = buildJobManifest({ ctx, selfPod });
const container = job.spec?.template?.spec?.containers[0];
expect(container?.envFrom).toBeUndefined();
});
});
describe("resources", () => {
@@ -498,7 +563,7 @@ describe("buildJobManifest", () => {
});
describe("return value", () => {
it("returns job, jobName, namespace, prompt, claudeArgs, promptMetrics", () => {
it("returns job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret", () => {
const result = buildJobManifest({ ctx, selfPod });
expect(result.job).toBeDefined();
expect(result.jobName).toBeDefined();
@@ -506,6 +571,74 @@ describe("buildJobManifest", () => {
expect(result.prompt).toBeDefined();
expect(result.claudeArgs).toBeDefined();
expect(result.promptMetrics).toBeDefined();
expect(result.promptSecret).toBeNull();
});
});
describe("nodeSelector key=value parsing", () => {
it("parses key=value multiline text", () => {
ctx.config = { nodeSelector: "disktype=ssd\ntopology.kubernetes.io/zone=us-east-1a" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.spec?.template?.spec?.nodeSelector).toEqual({
disktype: "ssd",
"topology.kubernetes.io/zone": "us-east-1a",
});
});
it("still accepts JSON objects", () => {
ctx.config = { nodeSelector: { disktype: "ssd" } };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" });
});
it("parses JSON string format", () => {
ctx.config = { nodeSelector: '{"disktype":"ssd"}' };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" });
});
it("skips comment lines and blank lines", () => {
ctx.config = { nodeSelector: "# comment\n\ndisktype=ssd\n" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" });
});
});
describe("labels key=value parsing", () => {
it("parses key=value multiline text for extra labels", () => {
ctx.config = { labels: "env=prod\nteam=platform" };
const { job } = buildJobManifest({ ctx, selfPod });
expect(job.metadata?.labels?.env).toBe("prod");
expect(job.metadata?.labels?.team).toBe("platform");
});
});
describe("large prompt Secret fallback", () => {
it("returns null promptSecret for small prompts", () => {
const { promptSecret } = buildJobManifest({ ctx, selfPod });
expect(promptSecret).toBeNull();
});
it("returns promptSecret for prompts >256 KiB", () => {
// Build a prompt >256 KiB via a custom template
const largePrompt = "x".repeat(300 * 1024);
ctx.config = { promptTemplate: largePrompt };
const { promptSecret, job } = buildJobManifest({ ctx, selfPod });
expect(promptSecret).not.toBeNull();
expect(promptSecret!.data["prompt.txt"]).toBe(largePrompt);
// Init container should copy from secret volume, not use PROMPT_CONTENT env
const init = job.spec?.template?.spec?.initContainers?.[0];
expect(init?.command).toContainEqual(expect.stringContaining("cp"));
expect(init?.env).toBeUndefined();
// Should have prompt-secret volume
const secretVol = job.spec?.template?.spec?.volumes?.find((v) => v.name === "prompt-secret");
expect(secretVol?.secret?.secretName).toBe(promptSecret!.name);
});
it("uses env var init container for small prompts", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const init = job.spec?.template?.spec?.initContainers?.[0];
expect(init?.env?.[0]?.name).toBe("PROMPT_CONTENT");
});
});
});
+146 -26
View File
@@ -9,6 +9,11 @@ import {
buildPaperclipEnv,
renderTemplate,
} from "@paperclipai/adapter-utils/server-utils";
import { createHash } from "node:crypto";
/** Prompts above this size (bytes) are staged via a Secret instead of an
* init container env var, protecting against the ~1 MiB PodSpec limit. */
const LARGE_PROMPT_THRESHOLD_BYTES = 256 * 1024;
// Inline prompt assembly — these functions are not yet in the published adapter-utils
function joinPromptSections(sections: string[], separator = "\n\n"): string {
@@ -44,11 +49,63 @@ function renderPaperclipWakePrompt(wake: unknown, _opts?: { resumedSession?: boo
}
import type { SelfPodInfo } from "./k8s-client.js";
/**
* Parse a config value that may be either a JSON object or multiline
* `key=value` text (one pair per line). This fixes the config-hint
* parity issue where textarea hints promise `key=value` per line but
* `parseObject` only handles JSON.
*/
function parseKeyValueConfig(raw: unknown): Record<string, string> {
if (typeof raw === "object" && raw !== null && !Array.isArray(raw)) {
// Already an object (JSON was parsed upstream)
const result: Record<string, string> = {};
for (const [k, v] of Object.entries(raw as Record<string, unknown>)) {
if (typeof v === "string") result[k] = v;
}
return result;
}
if (typeof raw !== "string" || !raw.trim()) return {};
// Try JSON parse first
try {
const parsed = JSON.parse(raw);
if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) {
const result: Record<string, string> = {};
for (const [k, v] of Object.entries(parsed as Record<string, unknown>)) {
if (typeof v === "string") result[k] = v;
}
return result;
}
} catch {
// Not JSON — fall through to key=value parsing
}
// Parse key=value lines
const result: Record<string, string> = {};
for (const line of raw.split(/\r?\n/)) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith("#")) continue;
const eqIdx = trimmed.indexOf("=");
if (eqIdx <= 0) continue;
const key = trimmed.slice(0, eqIdx).trim();
const value = trimmed.slice(eqIdx + 1).trim();
if (key) result[key] = value;
}
return result;
}
export interface JobBuildInput {
ctx: AdapterExecutionContext;
selfPod: SelfPodInfo;
}
/** When the prompt exceeds the env-var size limit, the manifest uses a
* Secret-backed volume instead of the init container's PROMPT_CONTENT env.
* The caller must create this Secret before the Job and clean it up after. */
export interface PromptSecret {
name: string;
namespace: string;
data: Record<string, string>;
}
export interface JobBuildResult {
job: k8s.V1Job;
jobName: string;
@@ -56,10 +113,21 @@ export interface JobBuildResult {
prompt: string;
claudeArgs: string[];
promptMetrics: Record<string, number>;
/** Non-null when the prompt is too large for an env var and must be
* staged as a K8s Secret before creating the Job. */
promptSecret: PromptSecret | null;
}
function sanitizeForK8sName(value: string): string {
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, 8);
function sanitizeForK8sName(value: string, maxLen = 16): string {
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
}
/**
* Build a short deterministic hash suffix from the raw inputs to avoid
* collisions when sanitized slugs happen to be identical.
*/
function shortHash(input: string, len = 6): string {
return createHash("sha256").update(input).digest("hex").slice(0, len);
}
function buildEnvVars(
@@ -148,12 +216,22 @@ function buildEnvVars(
// HOME must be /paperclip to match PVC mount and enable session resume
merged.HOME = "/paperclip";
// Convert to V1EnvVar array
// Convert literal env to V1EnvVar array
const envVars: k8s.V1EnvVar[] = Object.entries(merged).map(([name, value]) => ({
name,
value,
}));
// Append valueFrom entries from the Deployment container (secretKeyRef,
// configMapKeyRef, fieldRef, etc.). Skip any whose name was already set
// by a literal value — the literal value wins (same precedence as above).
const literalNames = new Set(Object.keys(merged));
for (const entry of selfPod.inheritedEnvValueFrom) {
if (!literalNames.has(entry.name)) {
envVars.push(entry);
}
}
return envVars;
}
@@ -174,9 +252,9 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const timeoutSec = asNumber(config.timeoutSec, 0);
const ttlSeconds = asNumber(config.ttlSecondsAfterFinished, 300);
const resources = parseObject(config.resources);
const nodeSelector = parseObject(config.nodeSelector);
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
const extraLabels = parseObject(config.labels);
const extraLabels = parseKeyValueConfig(config.labels);
// Resolve working directory — use workspace cwd, fall back to /paperclip
const workspaceContext = parseObject(context.paperclipWorkspace);
@@ -184,9 +262,13 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const configuredCwd = asString(config.cwd, "");
const workingDir = workspaceCwd || configuredCwd || "/paperclip";
const agentSlug = sanitizeForK8sName(agent.id);
const runSlug = sanitizeForK8sName(runId);
const jobName = `agent-claude-${agentSlug}-${runSlug}`;
// Build a deterministic, collision-resistant job name within the 63-char
// DNS label limit. Layout: "ac-{agentSlug}-{runSlug}-{hash}" where the
// hash is derived from the raw (un-truncated) agent+run IDs.
const agentSlug = sanitizeForK8sName(agent.id, 16);
const runSlug = sanitizeForK8sName(runId, 16);
const hash = shortHash(`${agent.id}:${runId}`);
const jobName = `ac-${agentSlug}-${runSlug}-${hash}`;
// Build prompt (same logic as claude_local)
const promptTemplate = asString(
@@ -265,7 +347,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
"paperclip.io/adapter-type": "claude_k8s",
};
for (const [key, value] of Object.entries(extraLabels)) {
if (typeof value === "string") labels[key] = value;
labels[key] = value;
}
// Volumes
@@ -328,6 +410,57 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
const mainCommand = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
// Decide prompt delivery strategy: env var (small) or Secret volume (large).
const promptBytes = Buffer.byteLength(prompt, "utf-8");
const useLargePromptPath = promptBytes > LARGE_PROMPT_THRESHOLD_BYTES;
let promptSecret: PromptSecret | null = null;
const promptSecretName = `${jobName}-prompt`;
if (useLargePromptPath) {
// Stage prompt as a Secret; the init container copies from the mounted
// secret volume to the emptyDir so the main container reads it the
// same way regardless of prompt size.
promptSecret = {
name: promptSecretName,
namespace,
data: { "prompt.txt": prompt },
};
volumes.push({
name: "prompt-secret",
secret: { secretName: promptSecretName, optional: false },
});
}
const initContainer: k8s.V1Container = useLargePromptPath
? {
name: "write-prompt",
image: "busybox:1.36",
imagePullPolicy: "IfNotPresent",
command: ["sh", "-c", "cp /tmp/prompt-secret/prompt.txt /tmp/prompt/prompt.txt"],
volumeMounts: [
{ name: "prompt", mountPath: "/tmp/prompt" },
{ name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true },
],
securityContext,
resources: {
requests: { cpu: "10m", memory: "16Mi" },
limits: { cpu: "100m", memory: "64Mi" },
},
}
: {
name: "write-prompt",
image: "busybox:1.36",
imagePullPolicy: "IfNotPresent",
command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
env: [{ name: "PROMPT_CONTENT", value: prompt }],
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
securityContext,
resources: {
requests: { cpu: "10m", memory: "16Mi" },
limits: { cpu: "100m", memory: "64Mi" },
},
};
const job: k8s.V1Job = {
apiVersion: "batch/v1",
kind: "Job",
@@ -352,23 +485,9 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
securityContext: podSecurityContext,
...(selfPod.imagePullSecrets.length > 0 ? { imagePullSecrets: selfPod.imagePullSecrets } : {}),
...(selfPod.dnsConfig ? { dnsConfig: selfPod.dnsConfig } : {}),
...(Object.keys(nodeSelector).length > 0 ? { nodeSelector: nodeSelector as Record<string, string> } : {}),
...(Object.keys(nodeSelector).length > 0 ? { nodeSelector } : {}),
...(tolerations.length > 0 ? { tolerations: tolerations as k8s.V1Toleration[] } : {}),
initContainers: [
{
name: "write-prompt",
image: "busybox:1.36",
imagePullPolicy: "IfNotPresent",
command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
env: [{ name: "PROMPT_CONTENT", value: prompt }],
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
securityContext,
resources: {
requests: { cpu: "10m", memory: "16Mi" },
limits: { cpu: "100m", memory: "64Mi" },
},
},
],
initContainers: [initContainer],
containers: [
{
name: "claude",
@@ -377,6 +496,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
workingDir,
command: ["sh", "-c", mainCommand],
env: envVars,
...(selfPod.inheritedEnvFrom.length > 0 ? { envFrom: selfPod.inheritedEnvFrom } : {}),
volumeMounts,
securityContext,
resources: containerResources,
@@ -388,5 +508,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
},
};
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics };
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret };
}
+18 -3
View File
@@ -20,8 +20,12 @@ export interface SelfPodInfo {
dnsConfig: k8s.V1PodDNSConfig | undefined;
pvcClaimName: string | null;
secretVolumes: SelfPodSecretVolume[];
/** Env vars inherited from the Deployment container. */
/** Env vars inherited from the Deployment container (literal name/value pairs). */
inheritedEnv: Record<string, string>;
/** Env vars with valueFrom (secretKeyRef, configMapKeyRef, etc.) from the Deployment container. */
inheritedEnvValueFrom: k8s.V1EnvVar[];
/** envFrom sources (secretRef, configMapRef) from the Deployment container. */
inheritedEnvFrom: k8s.V1EnvFromSource[];
}
let cachedSelfPod: SelfPodInfo | null = null;
@@ -134,12 +138,21 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise<SelfPodIn
// Collect env vars from the pod spec's container definition.
// Agent config env (set in buildEnvVars) will override these.
const inheritedEnv: Record<string, string> = {};
const inheritedEnvValueFrom: k8s.V1EnvVar[] = [];
for (const envItem of mainContainer.env ?? []) {
if (!envItem.name) continue;
const value = envItem.value ?? "";
if (value) inheritedEnv[envItem.name] = value;
if (envItem.valueFrom) {
// Preserve valueFrom entries (secretKeyRef, configMapKeyRef, fieldRef, etc.)
inheritedEnvValueFrom.push({ name: envItem.name, valueFrom: envItem.valueFrom });
} else {
const value = envItem.value ?? "";
if (value) inheritedEnv[envItem.name] = value;
}
}
// Capture envFrom sources (secretRef, configMapRef) from the container spec
const inheritedEnvFrom: k8s.V1EnvFromSource[] = mainContainer.envFrom ?? [];
cachedSelfPod = {
namespace,
image: mainContainer.image,
@@ -150,6 +163,8 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise<SelfPodIn
pvcClaimName,
secretVolumes,
inheritedEnv,
inheritedEnvValueFrom,
inheritedEnvFrom,
};
return cachedSelfPod;
+13
View File
@@ -141,6 +141,19 @@ more raw output`;
expect(result.summary).toContain("JSON output");
expect(result.summary).not.toContain("some raw output");
});
it("deduplicates identical assistant text blocks from reconnect replays", () => {
const assistantEvent = JSON.stringify({
type: "assistant",
message: { content: [{ type: "text", text: "Hello world" }] },
});
// Simulate the same assistant event appearing twice (log stream reconnect replay)
const stdout = `${assistantEvent}\n${assistantEvent}\n`;
const result = parseClaudeStreamJson(stdout);
expect(result.summary).toBe("Hello world");
// Should not be "Hello world\n\nHello world"
expect(result.summary.split("Hello world").length).toBe(2);
});
});
describe("extractClaudeLoginUrl", () => {
+7 -1
View File
@@ -9,6 +9,9 @@ export function parseClaudeStreamJson(stdout: string) {
let model = "";
let finalResult: Record<string, unknown> | null = null;
const assistantTexts: string[] = [];
// Belt-and-braces dedup: track seen text blocks to filter duplicates
// caused by log stream reconnects replaying overlapping windows.
const seenTexts = new Set<string>();
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
@@ -32,7 +35,10 @@ export function parseClaudeStreamJson(stdout: string) {
const block = entry as Record<string, unknown>;
if (asString(block.type, "") === "text") {
const text = asString(block.text, "");
if (text) assistantTexts.push(text);
if (text && !seenTexts.has(text)) {
seenTexts.add(text);
assistantTexts.push(text);
}
}
}
continue;