diff --git a/package-lock.json b/package-lock.json index af7b548..24e370a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@farhoodliquor/paperclip-adapter-claude-k8s", - "version": "0.1.24", + "version": "0.1.25", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@farhoodliquor/paperclip-adapter-claude-k8s", - "version": "0.1.24", + "version": "0.1.25", "license": "MIT", "dependencies": { "@kubernetes/client-node": "^1.0.0", diff --git a/package.json b/package.json index ef2e32e..e363980 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@farhoodliquor/paperclip-adapter-claude-k8s", - "version": "0.1.24", + "version": "0.1.25", "description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs", "license": "MIT", "repository": { diff --git a/src/server/execute.ts b/src/server/execute.ts index b5a4b91..78994f5 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -14,6 +14,7 @@ import { Writable } from "node:stream"; const POLL_INTERVAL_MS = 2000; const KEEPALIVE_INTERVAL_MS = 15_000; const LOG_STREAM_RECONNECT_DELAY_MS = 3_000; +const MAX_LOG_RECONNECT_ATTEMPTS = 50; /** * Wait for the Job's pod to reach a terminal or running state. @@ -167,6 +168,9 @@ async function streamPodLogsOnce( * stream until the stop signal fires (job completed) or the container * exits normally. This handles silent K8s API connection drops that * would otherwise cause the UI to stop receiving real output. + * + * Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect + * loops during sustained API partitions. */ async function streamPodLogs( namespace: string, @@ -177,22 +181,40 @@ async function streamPodLogs( ): Promise { const allChunks: string[] = []; let attempt = 0; - const streamStartedAt = Math.floor(Date.now() / 1000); + // Track the timestamp of the last successfully received log line so + // reconnects use a tight window instead of an ever-growing one anchored + // at stream start. This is the primary fix for FAR-105 duplicative logs. + let lastLogReceivedAt = Math.floor(Date.now() / 1000); while (!stopSignal?.stopped) { - // On reconnect, ask for logs since the stream originally started to - // avoid missing output during the reconnect gap. Duplicates are - // tolerable — the UI deduplicates log chunks. + if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) { + await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`); + break; + } + + // On reconnect, ask for logs since the last received line (+5s buffer) + // instead of since stream start. This keeps the window tight and + // avoids ever-growing duplicate output. const sinceSeconds = attempt > 0 - ? Math.max(1, Math.floor(Date.now() / 1000) - streamStartedAt + 5) + ? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5) : undefined; if (attempt > 0) { - await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt})...\n`); + await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`); } + const preStreamTs = Math.floor(Date.now() / 1000); const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds); - if (result) allChunks.push(result); + if (result) { + allChunks.push(result); + // Update last-received timestamp to now (the stream just ended, + // so any log lines in `result` were received up to this moment). + lastLogReceivedAt = Math.floor(Date.now() / 1000); + } else if (attempt === 0) { + // First attempt returned nothing — update timestamp so reconnect + // window stays reasonable. + lastLogReceivedAt = preStreamTs; + } attempt++; // If the job is done or the container exited, no need to reconnect. @@ -356,12 +378,24 @@ export async function execute(ctx: AdapterExecutionContext): Promise[0]); } + // If the prompt is large, create a Secret to hold it (avoids the ~1 MiB + // PodSpec limit). The Secret is cleaned up in the finally block. + const coreApi = getCoreApi(kubeconfigPath); + if (promptSecret) { + try { + await coreApi.createNamespacedSecret({ + namespace: promptSecret.namespace, + body: { + apiVersion: "v1", + kind: "Secret", + metadata: { + name: promptSecret.name, + namespace: promptSecret.namespace, + labels: { + "app.kubernetes.io/managed-by": "paperclip", + "paperclip.io/adapter-type": "claude_k8s", + "paperclip.io/run-id": runId, + }, + }, + stringData: promptSecret.data, + }, + }); + await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`); + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: `Failed to create prompt Secret: ${msg}`, + errorCode: "k8s_prompt_secret_create_failed", + }; + } + } + // Create the Job const batchApi = getBatchApi(kubeconfigPath); try { @@ -486,21 +556,32 @@ export async function execute(ctx: AdapterExecutionContext): Promise {}); return; } const silenceSec = Math.round((Date.now() - lastLogAt) / 1000); - void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`); + void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {}); - // Refresh updatedAt every ~4 minutes (16 ticks × 15s) to stay - // well within the 5-minute reaper staleness window. + // Refresh updatedAt every ~3 minutes (12 ticks × 15s = 180s) to + // stay well within the 5-minute reaper staleness window. Also + // fire on tick 1 for an early safety margin after job start. keepaliveTick++; - if (ctx.onSpawn && keepaliveTick % 16 === 0) { + if (ctx.onSpawn && (keepaliveTick === 1 || keepaliveTick % 12 === 0)) { void ctx.onSpawn({ pid: -1, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {}); } })(); @@ -550,11 +631,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise = {}): SelfPodInfo { pvcClaimName: "paperclip-data", secretVolumes: [], inheritedEnv: {}, + inheritedEnvValueFrom: [], + inheritedEnvFrom: [], ...overrides, }; } @@ -38,25 +40,44 @@ describe("buildJobManifest", () => { }); describe("job naming", () => { - it("uses agent-claude- prefix", () => { + it("uses ac- prefix", () => { const { jobName } = buildJobManifest({ ctx, selfPod }); - expect(jobName).toMatch(/^agent-claude-/); + expect(jobName).toMatch(/^ac-/); }); - it("includes sanitized agent id slug", () => { + it("includes sanitized agent id slug (up to 16 chars)", () => { ctx.agent.id = "Agent-ABC!@#"; const { jobName } = buildJobManifest({ ctx, selfPod }); - // sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-8 - // "Agent-ABC!@#" -> "agent-abc" (strips !@#, slice to 8 = "agent-ab") - expect(jobName).toContain("agent-ab"); + // sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-16 + expect(jobName).toContain("agent-abc"); }); - it("includes sanitized run id slug", () => { + it("includes sanitized run id slug (up to 16 chars)", () => { ctx.runId = "RUN-ABC-12345"; const { jobName } = buildJobManifest({ ctx, selfPod }); - // sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-8 - // "RUN-ABC-12345" -> "run-abc-12345" (slice to 8 = "run-abc-") - expect(jobName).toContain("run-abc-"); + expect(jobName).toContain("run-abc-12345"); + }); + + it("includes a deterministic hash suffix", () => { + const result1 = buildJobManifest({ ctx, selfPod }); + const result2 = buildJobManifest({ ctx, selfPod }); + expect(result1.jobName).toBe(result2.jobName); + // Hash suffix is 6 hex chars at the end + expect(result1.jobName).toMatch(/-[0-9a-f]{6}$/); + }); + + it("different agent+run pairs produce different names", () => { + const result1 = buildJobManifest({ ctx, selfPod }); + ctx.runId = "run-different"; + const result2 = buildJobManifest({ ctx, selfPod }); + expect(result1.jobName).not.toBe(result2.jobName); + }); + + it("stays within 63-char DNS label limit", () => { + ctx.agent.id = "a".repeat(100); + ctx.runId = "r".repeat(100); + const { jobName } = buildJobManifest({ ctx, selfPod }); + expect(jobName.length).toBeLessThanOrEqual(63); }); }); @@ -331,6 +352,50 @@ describe("buildJobManifest", () => { const apiUrl = job.spec?.template?.spec?.containers[0]?.env?.find((e) => e.name === "PAPERCLIP_API_URL"); expect(apiUrl?.value).toBe("http://paperclip:8080"); }); + + it("includes valueFrom env vars from selfPod", () => { + selfPod.inheritedEnvValueFrom = [ + { name: "ANTHROPIC_API_KEY", valueFrom: { secretKeyRef: { name: "api-keys", key: "anthropic" } } }, + ]; + const { job } = buildJobManifest({ ctx, selfPod }); + const envList = job.spec?.template?.spec?.containers[0]?.env ?? []; + const apiKeyEntry = envList.find((e) => e.name === "ANTHROPIC_API_KEY"); + expect(apiKeyEntry?.valueFrom?.secretKeyRef?.name).toBe("api-keys"); + expect(apiKeyEntry?.valueFrom?.secretKeyRef?.key).toBe("anthropic"); + expect(apiKeyEntry?.value).toBeUndefined(); + }); + + it("literal env overrides valueFrom with the same name", () => { + selfPod.inheritedEnv = { MY_VAR: "literal-value" }; + selfPod.inheritedEnvValueFrom = [ + { name: "MY_VAR", valueFrom: { secretKeyRef: { name: "sec", key: "k" } } }, + ]; + const { job } = buildJobManifest({ ctx, selfPod }); + const envList = job.spec?.template?.spec?.containers[0]?.env ?? []; + const myVar = envList.filter((e) => e.name === "MY_VAR"); + expect(myVar).toHaveLength(1); + expect(myVar[0]?.value).toBe("literal-value"); + expect(myVar[0]?.valueFrom).toBeUndefined(); + }); + + it("includes envFrom sources from selfPod on the container", () => { + selfPod.inheritedEnvFrom = [ + { secretRef: { name: "api-secrets" } }, + { configMapRef: { name: "app-config" } }, + ]; + const { job } = buildJobManifest({ ctx, selfPod }); + const container = job.spec?.template?.spec?.containers[0]; + expect(container?.envFrom).toHaveLength(2); + expect(container?.envFrom?.[0]?.secretRef?.name).toBe("api-secrets"); + expect(container?.envFrom?.[1]?.configMapRef?.name).toBe("app-config"); + }); + + it("omits envFrom when selfPod has none", () => { + selfPod.inheritedEnvFrom = []; + const { job } = buildJobManifest({ ctx, selfPod }); + const container = job.spec?.template?.spec?.containers[0]; + expect(container?.envFrom).toBeUndefined(); + }); }); describe("resources", () => { @@ -498,7 +563,7 @@ describe("buildJobManifest", () => { }); describe("return value", () => { - it("returns job, jobName, namespace, prompt, claudeArgs, promptMetrics", () => { + it("returns job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret", () => { const result = buildJobManifest({ ctx, selfPod }); expect(result.job).toBeDefined(); expect(result.jobName).toBeDefined(); @@ -506,6 +571,74 @@ describe("buildJobManifest", () => { expect(result.prompt).toBeDefined(); expect(result.claudeArgs).toBeDefined(); expect(result.promptMetrics).toBeDefined(); + expect(result.promptSecret).toBeNull(); + }); + }); + + describe("nodeSelector key=value parsing", () => { + it("parses key=value multiline text", () => { + ctx.config = { nodeSelector: "disktype=ssd\ntopology.kubernetes.io/zone=us-east-1a" }; + const { job } = buildJobManifest({ ctx, selfPod }); + expect(job.spec?.template?.spec?.nodeSelector).toEqual({ + disktype: "ssd", + "topology.kubernetes.io/zone": "us-east-1a", + }); + }); + + it("still accepts JSON objects", () => { + ctx.config = { nodeSelector: { disktype: "ssd" } }; + const { job } = buildJobManifest({ ctx, selfPod }); + expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" }); + }); + + it("parses JSON string format", () => { + ctx.config = { nodeSelector: '{"disktype":"ssd"}' }; + const { job } = buildJobManifest({ ctx, selfPod }); + expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" }); + }); + + it("skips comment lines and blank lines", () => { + ctx.config = { nodeSelector: "# comment\n\ndisktype=ssd\n" }; + const { job } = buildJobManifest({ ctx, selfPod }); + expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" }); + }); + }); + + describe("labels key=value parsing", () => { + it("parses key=value multiline text for extra labels", () => { + ctx.config = { labels: "env=prod\nteam=platform" }; + const { job } = buildJobManifest({ ctx, selfPod }); + expect(job.metadata?.labels?.env).toBe("prod"); + expect(job.metadata?.labels?.team).toBe("platform"); + }); + }); + + describe("large prompt Secret fallback", () => { + it("returns null promptSecret for small prompts", () => { + const { promptSecret } = buildJobManifest({ ctx, selfPod }); + expect(promptSecret).toBeNull(); + }); + + it("returns promptSecret for prompts >256 KiB", () => { + // Build a prompt >256 KiB via a custom template + const largePrompt = "x".repeat(300 * 1024); + ctx.config = { promptTemplate: largePrompt }; + const { promptSecret, job } = buildJobManifest({ ctx, selfPod }); + expect(promptSecret).not.toBeNull(); + expect(promptSecret!.data["prompt.txt"]).toBe(largePrompt); + // Init container should copy from secret volume, not use PROMPT_CONTENT env + const init = job.spec?.template?.spec?.initContainers?.[0]; + expect(init?.command).toContainEqual(expect.stringContaining("cp")); + expect(init?.env).toBeUndefined(); + // Should have prompt-secret volume + const secretVol = job.spec?.template?.spec?.volumes?.find((v) => v.name === "prompt-secret"); + expect(secretVol?.secret?.secretName).toBe(promptSecret!.name); + }); + + it("uses env var init container for small prompts", () => { + const { job } = buildJobManifest({ ctx, selfPod }); + const init = job.spec?.template?.spec?.initContainers?.[0]; + expect(init?.env?.[0]?.name).toBe("PROMPT_CONTENT"); }); }); }); diff --git a/src/server/job-manifest.ts b/src/server/job-manifest.ts index 74a4276..b72f272 100644 --- a/src/server/job-manifest.ts +++ b/src/server/job-manifest.ts @@ -9,6 +9,11 @@ import { buildPaperclipEnv, renderTemplate, } from "@paperclipai/adapter-utils/server-utils"; +import { createHash } from "node:crypto"; + +/** Prompts above this size (bytes) are staged via a Secret instead of an + * init container env var, protecting against the ~1 MiB PodSpec limit. */ +const LARGE_PROMPT_THRESHOLD_BYTES = 256 * 1024; // Inline prompt assembly — these functions are not yet in the published adapter-utils function joinPromptSections(sections: string[], separator = "\n\n"): string { @@ -44,11 +49,63 @@ function renderPaperclipWakePrompt(wake: unknown, _opts?: { resumedSession?: boo } import type { SelfPodInfo } from "./k8s-client.js"; +/** + * Parse a config value that may be either a JSON object or multiline + * `key=value` text (one pair per line). This fixes the config-hint + * parity issue where textarea hints promise `key=value` per line but + * `parseObject` only handles JSON. + */ +function parseKeyValueConfig(raw: unknown): Record { + if (typeof raw === "object" && raw !== null && !Array.isArray(raw)) { + // Already an object (JSON was parsed upstream) + const result: Record = {}; + for (const [k, v] of Object.entries(raw as Record)) { + if (typeof v === "string") result[k] = v; + } + return result; + } + if (typeof raw !== "string" || !raw.trim()) return {}; + // Try JSON parse first + try { + const parsed = JSON.parse(raw); + if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) { + const result: Record = {}; + for (const [k, v] of Object.entries(parsed as Record)) { + if (typeof v === "string") result[k] = v; + } + return result; + } + } catch { + // Not JSON — fall through to key=value parsing + } + // Parse key=value lines + const result: Record = {}; + for (const line of raw.split(/\r?\n/)) { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith("#")) continue; + const eqIdx = trimmed.indexOf("="); + if (eqIdx <= 0) continue; + const key = trimmed.slice(0, eqIdx).trim(); + const value = trimmed.slice(eqIdx + 1).trim(); + if (key) result[key] = value; + } + return result; +} + export interface JobBuildInput { ctx: AdapterExecutionContext; selfPod: SelfPodInfo; } +/** When the prompt exceeds the env-var size limit, the manifest uses a + * Secret-backed volume instead of the init container's PROMPT_CONTENT env. + * The caller must create this Secret before the Job and clean it up after. */ +export interface PromptSecret { + name: string; + namespace: string; + data: Record; +} + export interface JobBuildResult { job: k8s.V1Job; jobName: string; @@ -56,10 +113,21 @@ export interface JobBuildResult { prompt: string; claudeArgs: string[]; promptMetrics: Record; + /** Non-null when the prompt is too large for an env var and must be + * staged as a K8s Secret before creating the Job. */ + promptSecret: PromptSecret | null; } -function sanitizeForK8sName(value: string): string { - return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, 8); +function sanitizeForK8sName(value: string, maxLen = 16): string { + return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen); +} + +/** + * Build a short deterministic hash suffix from the raw inputs to avoid + * collisions when sanitized slugs happen to be identical. + */ +function shortHash(input: string, len = 6): string { + return createHash("sha256").update(input).digest("hex").slice(0, len); } function buildEnvVars( @@ -148,12 +216,22 @@ function buildEnvVars( // HOME must be /paperclip to match PVC mount and enable session resume merged.HOME = "/paperclip"; - // Convert to V1EnvVar array + // Convert literal env to V1EnvVar array const envVars: k8s.V1EnvVar[] = Object.entries(merged).map(([name, value]) => ({ name, value, })); + // Append valueFrom entries from the Deployment container (secretKeyRef, + // configMapKeyRef, fieldRef, etc.). Skip any whose name was already set + // by a literal value — the literal value wins (same precedence as above). + const literalNames = new Set(Object.keys(merged)); + for (const entry of selfPod.inheritedEnvValueFrom) { + if (!literalNames.has(entry.name)) { + envVars.push(entry); + } + } + return envVars; } @@ -174,9 +252,9 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { const timeoutSec = asNumber(config.timeoutSec, 0); const ttlSeconds = asNumber(config.ttlSecondsAfterFinished, 300); const resources = parseObject(config.resources); - const nodeSelector = parseObject(config.nodeSelector); + const nodeSelector = parseKeyValueConfig(config.nodeSelector); const tolerations = Array.isArray(config.tolerations) ? config.tolerations : []; - const extraLabels = parseObject(config.labels); + const extraLabels = parseKeyValueConfig(config.labels); // Resolve working directory — use workspace cwd, fall back to /paperclip const workspaceContext = parseObject(context.paperclipWorkspace); @@ -184,9 +262,13 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { const configuredCwd = asString(config.cwd, ""); const workingDir = workspaceCwd || configuredCwd || "/paperclip"; - const agentSlug = sanitizeForK8sName(agent.id); - const runSlug = sanitizeForK8sName(runId); - const jobName = `agent-claude-${agentSlug}-${runSlug}`; + // Build a deterministic, collision-resistant job name within the 63-char + // DNS label limit. Layout: "ac-{agentSlug}-{runSlug}-{hash}" where the + // hash is derived from the raw (un-truncated) agent+run IDs. + const agentSlug = sanitizeForK8sName(agent.id, 16); + const runSlug = sanitizeForK8sName(runId, 16); + const hash = shortHash(`${agent.id}:${runId}`); + const jobName = `ac-${agentSlug}-${runSlug}-${hash}`; // Build prompt (same logic as claude_local) const promptTemplate = asString( @@ -265,7 +347,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { "paperclip.io/adapter-type": "claude_k8s", }; for (const [key, value] of Object.entries(extraLabels)) { - if (typeof value === "string") labels[key] = value; + labels[key] = value; } // Volumes @@ -328,6 +410,57 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" "); const mainCommand = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`; + // Decide prompt delivery strategy: env var (small) or Secret volume (large). + const promptBytes = Buffer.byteLength(prompt, "utf-8"); + const useLargePromptPath = promptBytes > LARGE_PROMPT_THRESHOLD_BYTES; + let promptSecret: PromptSecret | null = null; + const promptSecretName = `${jobName}-prompt`; + + if (useLargePromptPath) { + // Stage prompt as a Secret; the init container copies from the mounted + // secret volume to the emptyDir so the main container reads it the + // same way regardless of prompt size. + promptSecret = { + name: promptSecretName, + namespace, + data: { "prompt.txt": prompt }, + }; + volumes.push({ + name: "prompt-secret", + secret: { secretName: promptSecretName, optional: false }, + }); + } + + const initContainer: k8s.V1Container = useLargePromptPath + ? { + name: "write-prompt", + image: "busybox:1.36", + imagePullPolicy: "IfNotPresent", + command: ["sh", "-c", "cp /tmp/prompt-secret/prompt.txt /tmp/prompt/prompt.txt"], + volumeMounts: [ + { name: "prompt", mountPath: "/tmp/prompt" }, + { name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true }, + ], + securityContext, + resources: { + requests: { cpu: "10m", memory: "16Mi" }, + limits: { cpu: "100m", memory: "64Mi" }, + }, + } + : { + name: "write-prompt", + image: "busybox:1.36", + imagePullPolicy: "IfNotPresent", + command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"], + env: [{ name: "PROMPT_CONTENT", value: prompt }], + volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }], + securityContext, + resources: { + requests: { cpu: "10m", memory: "16Mi" }, + limits: { cpu: "100m", memory: "64Mi" }, + }, + }; + const job: k8s.V1Job = { apiVersion: "batch/v1", kind: "Job", @@ -352,23 +485,9 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { securityContext: podSecurityContext, ...(selfPod.imagePullSecrets.length > 0 ? { imagePullSecrets: selfPod.imagePullSecrets } : {}), ...(selfPod.dnsConfig ? { dnsConfig: selfPod.dnsConfig } : {}), - ...(Object.keys(nodeSelector).length > 0 ? { nodeSelector: nodeSelector as Record } : {}), + ...(Object.keys(nodeSelector).length > 0 ? { nodeSelector } : {}), ...(tolerations.length > 0 ? { tolerations: tolerations as k8s.V1Toleration[] } : {}), - initContainers: [ - { - name: "write-prompt", - image: "busybox:1.36", - imagePullPolicy: "IfNotPresent", - command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"], - env: [{ name: "PROMPT_CONTENT", value: prompt }], - volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }], - securityContext, - resources: { - requests: { cpu: "10m", memory: "16Mi" }, - limits: { cpu: "100m", memory: "64Mi" }, - }, - }, - ], + initContainers: [initContainer], containers: [ { name: "claude", @@ -377,6 +496,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { workingDir, command: ["sh", "-c", mainCommand], env: envVars, + ...(selfPod.inheritedEnvFrom.length > 0 ? { envFrom: selfPod.inheritedEnvFrom } : {}), volumeMounts, securityContext, resources: containerResources, @@ -388,5 +508,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { }, }; - return { job, jobName, namespace, prompt, claudeArgs, promptMetrics }; + return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret }; } diff --git a/src/server/k8s-client.ts b/src/server/k8s-client.ts index bd908ff..f2ea4ef 100644 --- a/src/server/k8s-client.ts +++ b/src/server/k8s-client.ts @@ -20,8 +20,12 @@ export interface SelfPodInfo { dnsConfig: k8s.V1PodDNSConfig | undefined; pvcClaimName: string | null; secretVolumes: SelfPodSecretVolume[]; - /** Env vars inherited from the Deployment container. */ + /** Env vars inherited from the Deployment container (literal name/value pairs). */ inheritedEnv: Record; + /** Env vars with valueFrom (secretKeyRef, configMapKeyRef, etc.) from the Deployment container. */ + inheritedEnvValueFrom: k8s.V1EnvVar[]; + /** envFrom sources (secretRef, configMapRef) from the Deployment container. */ + inheritedEnvFrom: k8s.V1EnvFromSource[]; } let cachedSelfPod: SelfPodInfo | null = null; @@ -134,12 +138,21 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise = {}; + const inheritedEnvValueFrom: k8s.V1EnvVar[] = []; for (const envItem of mainContainer.env ?? []) { if (!envItem.name) continue; - const value = envItem.value ?? ""; - if (value) inheritedEnv[envItem.name] = value; + if (envItem.valueFrom) { + // Preserve valueFrom entries (secretKeyRef, configMapKeyRef, fieldRef, etc.) + inheritedEnvValueFrom.push({ name: envItem.name, valueFrom: envItem.valueFrom }); + } else { + const value = envItem.value ?? ""; + if (value) inheritedEnv[envItem.name] = value; + } } + // Capture envFrom sources (secretRef, configMapRef) from the container spec + const inheritedEnvFrom: k8s.V1EnvFromSource[] = mainContainer.envFrom ?? []; + cachedSelfPod = { namespace, image: mainContainer.image, @@ -150,6 +163,8 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise { + const assistantEvent = JSON.stringify({ + type: "assistant", + message: { content: [{ type: "text", text: "Hello world" }] }, + }); + // Simulate the same assistant event appearing twice (log stream reconnect replay) + const stdout = `${assistantEvent}\n${assistantEvent}\n`; + const result = parseClaudeStreamJson(stdout); + expect(result.summary).toBe("Hello world"); + // Should not be "Hello world\n\nHello world" + expect(result.summary.split("Hello world").length).toBe(2); + }); }); describe("extractClaudeLoginUrl", () => { diff --git a/src/server/parse.ts b/src/server/parse.ts index c41e16c..12fed75 100644 --- a/src/server/parse.ts +++ b/src/server/parse.ts @@ -9,6 +9,9 @@ export function parseClaudeStreamJson(stdout: string) { let model = ""; let finalResult: Record | null = null; const assistantTexts: string[] = []; + // Belt-and-braces dedup: track seen text blocks to filter duplicates + // caused by log stream reconnects replaying overlapping windows. + const seenTexts = new Set(); for (const rawLine of stdout.split(/\r?\n/)) { const line = rawLine.trim(); @@ -32,7 +35,10 @@ export function parseClaudeStreamJson(stdout: string) { const block = entry as Record; if (asString(block.type, "") === "text") { const text = asString(block.text, ""); - if (text) assistantTexts.push(text); + if (text && !seenTexts.has(text)) { + seenTexts.add(text); + assistantTexts.push(text); + } } } continue;