fix: P1 correctness and operational fixes from FAR-104/FAR-105 analysis
5. Cap log stream reconnect attempts at 50 — prevents infinite
reconnect loops during sustained API partitions.
6. Fire keepalive refresh earlier — tick 1 + every 12 ticks (~3min)
instead of every 16 ticks (~4min), providing better safety margin
under the 5-minute reaper window.
7. Catch rejections from onLog inside keepalive — add .catch(() => {})
to prevent unhandledRejection on SSE backpressure.
8. Prevent sanitized-name collisions — extend slugs to 16 chars each,
add a 6-char SHA-256 hash suffix, shorten prefix to `ac-` to stay
well within the 63-char DNS label limit.
10. Fix config-hint parity for nodeSelector and labels — parse both
`key=value` multiline text and JSON objects, matching what the
textarea hint promises.
11. Large-prompt fallback via Secret — prompts >256 KiB are staged as a
K8s Secret and mounted as a volume instead of passed via env var,
protecting against the ~1 MiB PodSpec limit.
13. Track last-seen log timestamp on reconnect — anchor sinceSeconds at
the last received log line instead of stream start, fixing FAR-105
duplicative logs. Belt-and-braces: dedupe assistantTexts at the
parser boundary in parse.ts.
Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
+79
-12
@@ -14,6 +14,7 @@ import { Writable } from "node:stream";
|
|||||||
const POLL_INTERVAL_MS = 2000;
|
const POLL_INTERVAL_MS = 2000;
|
||||||
const KEEPALIVE_INTERVAL_MS = 15_000;
|
const KEEPALIVE_INTERVAL_MS = 15_000;
|
||||||
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
|
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
|
||||||
|
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for the Job's pod to reach a terminal or running state.
|
* Wait for the Job's pod to reach a terminal or running state.
|
||||||
@@ -167,6 +168,9 @@ async function streamPodLogsOnce(
|
|||||||
* stream until the stop signal fires (job completed) or the container
|
* stream until the stop signal fires (job completed) or the container
|
||||||
* exits normally. This handles silent K8s API connection drops that
|
* exits normally. This handles silent K8s API connection drops that
|
||||||
* would otherwise cause the UI to stop receiving real output.
|
* would otherwise cause the UI to stop receiving real output.
|
||||||
|
*
|
||||||
|
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
|
||||||
|
* loops during sustained API partitions.
|
||||||
*/
|
*/
|
||||||
async function streamPodLogs(
|
async function streamPodLogs(
|
||||||
namespace: string,
|
namespace: string,
|
||||||
@@ -177,22 +181,40 @@ async function streamPodLogs(
|
|||||||
): Promise<string> {
|
): Promise<string> {
|
||||||
const allChunks: string[] = [];
|
const allChunks: string[] = [];
|
||||||
let attempt = 0;
|
let attempt = 0;
|
||||||
const streamStartedAt = Math.floor(Date.now() / 1000);
|
// Track the timestamp of the last successfully received log line so
|
||||||
|
// reconnects use a tight window instead of an ever-growing one anchored
|
||||||
|
// at stream start. This is the primary fix for FAR-105 duplicative logs.
|
||||||
|
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||||
|
|
||||||
while (!stopSignal?.stopped) {
|
while (!stopSignal?.stopped) {
|
||||||
// On reconnect, ask for logs since the stream originally started to
|
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
|
||||||
// avoid missing output during the reconnect gap. Duplicates are
|
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
|
||||||
// tolerable — the UI deduplicates log chunks.
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// On reconnect, ask for logs since the last received line (+5s buffer)
|
||||||
|
// instead of since stream start. This keeps the window tight and
|
||||||
|
// avoids ever-growing duplicate output.
|
||||||
const sinceSeconds = attempt > 0
|
const sinceSeconds = attempt > 0
|
||||||
? Math.max(1, Math.floor(Date.now() / 1000) - streamStartedAt + 5)
|
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
|
||||||
: undefined;
|
: undefined;
|
||||||
|
|
||||||
if (attempt > 0) {
|
if (attempt > 0) {
|
||||||
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt})...\n`);
|
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
|
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
|
||||||
if (result) allChunks.push(result);
|
if (result) {
|
||||||
|
allChunks.push(result);
|
||||||
|
// Update last-received timestamp to now (the stream just ended,
|
||||||
|
// so any log lines in `result` were received up to this moment).
|
||||||
|
lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||||
|
} else if (attempt === 0) {
|
||||||
|
// First attempt returned nothing — update timestamp so reconnect
|
||||||
|
// window stays reasonable.
|
||||||
|
lastLogReceivedAt = preStreamTs;
|
||||||
|
}
|
||||||
attempt++;
|
attempt++;
|
||||||
|
|
||||||
// If the job is done or the container exited, no need to reconnect.
|
// If the job is done or the container exited, no need to reconnect.
|
||||||
@@ -373,7 +395,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Build Job manifest
|
// Build Job manifest
|
||||||
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics } = buildJobManifest({
|
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
|
||||||
ctx,
|
ctx,
|
||||||
selfPod,
|
selfPod,
|
||||||
});
|
});
|
||||||
@@ -396,6 +418,42 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||||||
} as Parameters<typeof onMeta>[0]);
|
} as Parameters<typeof onMeta>[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
|
||||||
|
// PodSpec limit). The Secret is cleaned up in the finally block.
|
||||||
|
const coreApi = getCoreApi(kubeconfigPath);
|
||||||
|
if (promptSecret) {
|
||||||
|
try {
|
||||||
|
await coreApi.createNamespacedSecret({
|
||||||
|
namespace: promptSecret.namespace,
|
||||||
|
body: {
|
||||||
|
apiVersion: "v1",
|
||||||
|
kind: "Secret",
|
||||||
|
metadata: {
|
||||||
|
name: promptSecret.name,
|
||||||
|
namespace: promptSecret.namespace,
|
||||||
|
labels: {
|
||||||
|
"app.kubernetes.io/managed-by": "paperclip",
|
||||||
|
"paperclip.io/adapter-type": "claude_k8s",
|
||||||
|
"paperclip.io/run-id": runId,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
stringData: promptSecret.data,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
|
||||||
|
} catch (err) {
|
||||||
|
const msg = err instanceof Error ? err.message : String(err);
|
||||||
|
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
|
||||||
|
return {
|
||||||
|
exitCode: null,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage: `Failed to create prompt Secret: ${msg}`,
|
||||||
|
errorCode: "k8s_prompt_secret_create_failed",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create the Job
|
// Create the Job
|
||||||
const batchApi = getBatchApi(kubeconfigPath);
|
const batchApi = getBatchApi(kubeconfigPath);
|
||||||
try {
|
try {
|
||||||
@@ -517,12 +575,13 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||||||
}
|
}
|
||||||
|
|
||||||
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
|
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
|
||||||
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`);
|
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {});
|
||||||
|
|
||||||
// Refresh updatedAt every ~4 minutes (16 ticks × 15s) to stay
|
// Refresh updatedAt every ~3 minutes (12 ticks × 15s = 180s) to
|
||||||
// well within the 5-minute reaper staleness window.
|
// stay well within the 5-minute reaper staleness window. Also
|
||||||
|
// fire on tick 1 for an early safety margin after job start.
|
||||||
keepaliveTick++;
|
keepaliveTick++;
|
||||||
if (ctx.onSpawn && keepaliveTick % 16 === 0) {
|
if (ctx.onSpawn && (keepaliveTick === 1 || keepaliveTick % 12 === 0)) {
|
||||||
void ctx.onSpawn({ pid: -1, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
|
void ctx.onSpawn({ pid: -1, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
|
||||||
}
|
}
|
||||||
})();
|
})();
|
||||||
@@ -607,6 +666,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||||||
} else {
|
} else {
|
||||||
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
|
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
|
||||||
}
|
}
|
||||||
|
// Clean up prompt Secret if one was created
|
||||||
|
if (promptSecret) {
|
||||||
|
try {
|
||||||
|
await coreApi.deleteNamespacedSecret({ name: promptSecret.name, namespace: promptSecret.namespace });
|
||||||
|
} catch {
|
||||||
|
// Best-effort cleanup — TTL or manual deletion will catch stragglers
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse Claude output (reuse claude_local parsing)
|
// Parse Claude output (reuse claude_local parsing)
|
||||||
|
|||||||
@@ -40,25 +40,44 @@ describe("buildJobManifest", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe("job naming", () => {
|
describe("job naming", () => {
|
||||||
it("uses agent-claude- prefix", () => {
|
it("uses ac- prefix", () => {
|
||||||
const { jobName } = buildJobManifest({ ctx, selfPod });
|
const { jobName } = buildJobManifest({ ctx, selfPod });
|
||||||
expect(jobName).toMatch(/^agent-claude-/);
|
expect(jobName).toMatch(/^ac-/);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("includes sanitized agent id slug", () => {
|
it("includes sanitized agent id slug (up to 16 chars)", () => {
|
||||||
ctx.agent.id = "Agent-ABC!@#";
|
ctx.agent.id = "Agent-ABC!@#";
|
||||||
const { jobName } = buildJobManifest({ ctx, selfPod });
|
const { jobName } = buildJobManifest({ ctx, selfPod });
|
||||||
// sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-8
|
// sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-16
|
||||||
// "Agent-ABC!@#" -> "agent-abc" (strips !@#, slice to 8 = "agent-ab")
|
expect(jobName).toContain("agent-abc");
|
||||||
expect(jobName).toContain("agent-ab");
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("includes sanitized run id slug", () => {
|
it("includes sanitized run id slug (up to 16 chars)", () => {
|
||||||
ctx.runId = "RUN-ABC-12345";
|
ctx.runId = "RUN-ABC-12345";
|
||||||
const { jobName } = buildJobManifest({ ctx, selfPod });
|
const { jobName } = buildJobManifest({ ctx, selfPod });
|
||||||
// sanitizeForK8sName: lowercase, strip non-alphanumeric (not dashes), slice 0-8
|
expect(jobName).toContain("run-abc-12345");
|
||||||
// "RUN-ABC-12345" -> "run-abc-12345" (slice to 8 = "run-abc-")
|
});
|
||||||
expect(jobName).toContain("run-abc-");
|
|
||||||
|
it("includes a deterministic hash suffix", () => {
|
||||||
|
const result1 = buildJobManifest({ ctx, selfPod });
|
||||||
|
const result2 = buildJobManifest({ ctx, selfPod });
|
||||||
|
expect(result1.jobName).toBe(result2.jobName);
|
||||||
|
// Hash suffix is 6 hex chars at the end
|
||||||
|
expect(result1.jobName).toMatch(/-[0-9a-f]{6}$/);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("different agent+run pairs produce different names", () => {
|
||||||
|
const result1 = buildJobManifest({ ctx, selfPod });
|
||||||
|
ctx.runId = "run-different";
|
||||||
|
const result2 = buildJobManifest({ ctx, selfPod });
|
||||||
|
expect(result1.jobName).not.toBe(result2.jobName);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("stays within 63-char DNS label limit", () => {
|
||||||
|
ctx.agent.id = "a".repeat(100);
|
||||||
|
ctx.runId = "r".repeat(100);
|
||||||
|
const { jobName } = buildJobManifest({ ctx, selfPod });
|
||||||
|
expect(jobName.length).toBeLessThanOrEqual(63);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -544,7 +563,7 @@ describe("buildJobManifest", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe("return value", () => {
|
describe("return value", () => {
|
||||||
it("returns job, jobName, namespace, prompt, claudeArgs, promptMetrics", () => {
|
it("returns job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret", () => {
|
||||||
const result = buildJobManifest({ ctx, selfPod });
|
const result = buildJobManifest({ ctx, selfPod });
|
||||||
expect(result.job).toBeDefined();
|
expect(result.job).toBeDefined();
|
||||||
expect(result.jobName).toBeDefined();
|
expect(result.jobName).toBeDefined();
|
||||||
@@ -552,6 +571,74 @@ describe("buildJobManifest", () => {
|
|||||||
expect(result.prompt).toBeDefined();
|
expect(result.prompt).toBeDefined();
|
||||||
expect(result.claudeArgs).toBeDefined();
|
expect(result.claudeArgs).toBeDefined();
|
||||||
expect(result.promptMetrics).toBeDefined();
|
expect(result.promptMetrics).toBeDefined();
|
||||||
|
expect(result.promptSecret).toBeNull();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("nodeSelector key=value parsing", () => {
|
||||||
|
it("parses key=value multiline text", () => {
|
||||||
|
ctx.config = { nodeSelector: "disktype=ssd\ntopology.kubernetes.io/zone=us-east-1a" };
|
||||||
|
const { job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
expect(job.spec?.template?.spec?.nodeSelector).toEqual({
|
||||||
|
disktype: "ssd",
|
||||||
|
"topology.kubernetes.io/zone": "us-east-1a",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("still accepts JSON objects", () => {
|
||||||
|
ctx.config = { nodeSelector: { disktype: "ssd" } };
|
||||||
|
const { job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("parses JSON string format", () => {
|
||||||
|
ctx.config = { nodeSelector: '{"disktype":"ssd"}' };
|
||||||
|
const { job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("skips comment lines and blank lines", () => {
|
||||||
|
ctx.config = { nodeSelector: "# comment\n\ndisktype=ssd\n" };
|
||||||
|
const { job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
expect(job.spec?.template?.spec?.nodeSelector).toEqual({ disktype: "ssd" });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("labels key=value parsing", () => {
|
||||||
|
it("parses key=value multiline text for extra labels", () => {
|
||||||
|
ctx.config = { labels: "env=prod\nteam=platform" };
|
||||||
|
const { job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
expect(job.metadata?.labels?.env).toBe("prod");
|
||||||
|
expect(job.metadata?.labels?.team).toBe("platform");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("large prompt Secret fallback", () => {
|
||||||
|
it("returns null promptSecret for small prompts", () => {
|
||||||
|
const { promptSecret } = buildJobManifest({ ctx, selfPod });
|
||||||
|
expect(promptSecret).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns promptSecret for prompts >256 KiB", () => {
|
||||||
|
// Build a prompt >256 KiB via a custom template
|
||||||
|
const largePrompt = "x".repeat(300 * 1024);
|
||||||
|
ctx.config = { promptTemplate: largePrompt };
|
||||||
|
const { promptSecret, job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
expect(promptSecret).not.toBeNull();
|
||||||
|
expect(promptSecret!.data["prompt.txt"]).toBe(largePrompt);
|
||||||
|
// Init container should copy from secret volume, not use PROMPT_CONTENT env
|
||||||
|
const init = job.spec?.template?.spec?.initContainers?.[0];
|
||||||
|
expect(init?.command).toContainEqual(expect.stringContaining("cp"));
|
||||||
|
expect(init?.env).toBeUndefined();
|
||||||
|
// Should have prompt-secret volume
|
||||||
|
const secretVol = job.spec?.template?.spec?.volumes?.find((v) => v.name === "prompt-secret");
|
||||||
|
expect(secretVol?.secret?.secretName).toBe(promptSecret!.name);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("uses env var init container for small prompts", () => {
|
||||||
|
const { job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
const init = job.spec?.template?.spec?.initContainers?.[0];
|
||||||
|
expect(init?.env?.[0]?.name).toBe("PROMPT_CONTENT");
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
+134
-25
@@ -9,6 +9,11 @@ import {
|
|||||||
buildPaperclipEnv,
|
buildPaperclipEnv,
|
||||||
renderTemplate,
|
renderTemplate,
|
||||||
} from "@paperclipai/adapter-utils/server-utils";
|
} from "@paperclipai/adapter-utils/server-utils";
|
||||||
|
import { createHash } from "node:crypto";
|
||||||
|
|
||||||
|
/** Prompts above this size (bytes) are staged via a Secret instead of an
|
||||||
|
* init container env var, protecting against the ~1 MiB PodSpec limit. */
|
||||||
|
const LARGE_PROMPT_THRESHOLD_BYTES = 256 * 1024;
|
||||||
|
|
||||||
// Inline prompt assembly — these functions are not yet in the published adapter-utils
|
// Inline prompt assembly — these functions are not yet in the published adapter-utils
|
||||||
function joinPromptSections(sections: string[], separator = "\n\n"): string {
|
function joinPromptSections(sections: string[], separator = "\n\n"): string {
|
||||||
@@ -44,11 +49,63 @@ function renderPaperclipWakePrompt(wake: unknown, _opts?: { resumedSession?: boo
|
|||||||
}
|
}
|
||||||
import type { SelfPodInfo } from "./k8s-client.js";
|
import type { SelfPodInfo } from "./k8s-client.js";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse a config value that may be either a JSON object or multiline
|
||||||
|
* `key=value` text (one pair per line). This fixes the config-hint
|
||||||
|
* parity issue where textarea hints promise `key=value` per line but
|
||||||
|
* `parseObject` only handles JSON.
|
||||||
|
*/
|
||||||
|
function parseKeyValueConfig(raw: unknown): Record<string, string> {
|
||||||
|
if (typeof raw === "object" && raw !== null && !Array.isArray(raw)) {
|
||||||
|
// Already an object (JSON was parsed upstream)
|
||||||
|
const result: Record<string, string> = {};
|
||||||
|
for (const [k, v] of Object.entries(raw as Record<string, unknown>)) {
|
||||||
|
if (typeof v === "string") result[k] = v;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
if (typeof raw !== "string" || !raw.trim()) return {};
|
||||||
|
// Try JSON parse first
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(raw);
|
||||||
|
if (typeof parsed === "object" && parsed !== null && !Array.isArray(parsed)) {
|
||||||
|
const result: Record<string, string> = {};
|
||||||
|
for (const [k, v] of Object.entries(parsed as Record<string, unknown>)) {
|
||||||
|
if (typeof v === "string") result[k] = v;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Not JSON — fall through to key=value parsing
|
||||||
|
}
|
||||||
|
// Parse key=value lines
|
||||||
|
const result: Record<string, string> = {};
|
||||||
|
for (const line of raw.split(/\r?\n/)) {
|
||||||
|
const trimmed = line.trim();
|
||||||
|
if (!trimmed || trimmed.startsWith("#")) continue;
|
||||||
|
const eqIdx = trimmed.indexOf("=");
|
||||||
|
if (eqIdx <= 0) continue;
|
||||||
|
const key = trimmed.slice(0, eqIdx).trim();
|
||||||
|
const value = trimmed.slice(eqIdx + 1).trim();
|
||||||
|
if (key) result[key] = value;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
export interface JobBuildInput {
|
export interface JobBuildInput {
|
||||||
ctx: AdapterExecutionContext;
|
ctx: AdapterExecutionContext;
|
||||||
selfPod: SelfPodInfo;
|
selfPod: SelfPodInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** When the prompt exceeds the env-var size limit, the manifest uses a
|
||||||
|
* Secret-backed volume instead of the init container's PROMPT_CONTENT env.
|
||||||
|
* The caller must create this Secret before the Job and clean it up after. */
|
||||||
|
export interface PromptSecret {
|
||||||
|
name: string;
|
||||||
|
namespace: string;
|
||||||
|
data: Record<string, string>;
|
||||||
|
}
|
||||||
|
|
||||||
export interface JobBuildResult {
|
export interface JobBuildResult {
|
||||||
job: k8s.V1Job;
|
job: k8s.V1Job;
|
||||||
jobName: string;
|
jobName: string;
|
||||||
@@ -56,10 +113,21 @@ export interface JobBuildResult {
|
|||||||
prompt: string;
|
prompt: string;
|
||||||
claudeArgs: string[];
|
claudeArgs: string[];
|
||||||
promptMetrics: Record<string, number>;
|
promptMetrics: Record<string, number>;
|
||||||
|
/** Non-null when the prompt is too large for an env var and must be
|
||||||
|
* staged as a K8s Secret before creating the Job. */
|
||||||
|
promptSecret: PromptSecret | null;
|
||||||
}
|
}
|
||||||
|
|
||||||
function sanitizeForK8sName(value: string): string {
|
function sanitizeForK8sName(value: string, maxLen = 16): string {
|
||||||
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, 8);
|
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a short deterministic hash suffix from the raw inputs to avoid
|
||||||
|
* collisions when sanitized slugs happen to be identical.
|
||||||
|
*/
|
||||||
|
function shortHash(input: string, len = 6): string {
|
||||||
|
return createHash("sha256").update(input).digest("hex").slice(0, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
function buildEnvVars(
|
function buildEnvVars(
|
||||||
@@ -184,9 +252,9 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
|||||||
const timeoutSec = asNumber(config.timeoutSec, 0);
|
const timeoutSec = asNumber(config.timeoutSec, 0);
|
||||||
const ttlSeconds = asNumber(config.ttlSecondsAfterFinished, 300);
|
const ttlSeconds = asNumber(config.ttlSecondsAfterFinished, 300);
|
||||||
const resources = parseObject(config.resources);
|
const resources = parseObject(config.resources);
|
||||||
const nodeSelector = parseObject(config.nodeSelector);
|
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
|
||||||
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
|
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
|
||||||
const extraLabels = parseObject(config.labels);
|
const extraLabels = parseKeyValueConfig(config.labels);
|
||||||
|
|
||||||
// Resolve working directory — use workspace cwd, fall back to /paperclip
|
// Resolve working directory — use workspace cwd, fall back to /paperclip
|
||||||
const workspaceContext = parseObject(context.paperclipWorkspace);
|
const workspaceContext = parseObject(context.paperclipWorkspace);
|
||||||
@@ -194,9 +262,13 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
|||||||
const configuredCwd = asString(config.cwd, "");
|
const configuredCwd = asString(config.cwd, "");
|
||||||
const workingDir = workspaceCwd || configuredCwd || "/paperclip";
|
const workingDir = workspaceCwd || configuredCwd || "/paperclip";
|
||||||
|
|
||||||
const agentSlug = sanitizeForK8sName(agent.id);
|
// Build a deterministic, collision-resistant job name within the 63-char
|
||||||
const runSlug = sanitizeForK8sName(runId);
|
// DNS label limit. Layout: "ac-{agentSlug}-{runSlug}-{hash}" where the
|
||||||
const jobName = `agent-claude-${agentSlug}-${runSlug}`;
|
// hash is derived from the raw (un-truncated) agent+run IDs.
|
||||||
|
const agentSlug = sanitizeForK8sName(agent.id, 16);
|
||||||
|
const runSlug = sanitizeForK8sName(runId, 16);
|
||||||
|
const hash = shortHash(`${agent.id}:${runId}`);
|
||||||
|
const jobName = `ac-${agentSlug}-${runSlug}-${hash}`;
|
||||||
|
|
||||||
// Build prompt (same logic as claude_local)
|
// Build prompt (same logic as claude_local)
|
||||||
const promptTemplate = asString(
|
const promptTemplate = asString(
|
||||||
@@ -275,7 +347,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
|||||||
"paperclip.io/adapter-type": "claude_k8s",
|
"paperclip.io/adapter-type": "claude_k8s",
|
||||||
};
|
};
|
||||||
for (const [key, value] of Object.entries(extraLabels)) {
|
for (const [key, value] of Object.entries(extraLabels)) {
|
||||||
if (typeof value === "string") labels[key] = value;
|
labels[key] = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Volumes
|
// Volumes
|
||||||
@@ -338,6 +410,57 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
|||||||
const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
|
const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
|
||||||
const mainCommand = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
|
const mainCommand = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
|
||||||
|
|
||||||
|
// Decide prompt delivery strategy: env var (small) or Secret volume (large).
|
||||||
|
const promptBytes = Buffer.byteLength(prompt, "utf-8");
|
||||||
|
const useLargePromptPath = promptBytes > LARGE_PROMPT_THRESHOLD_BYTES;
|
||||||
|
let promptSecret: PromptSecret | null = null;
|
||||||
|
const promptSecretName = `${jobName}-prompt`;
|
||||||
|
|
||||||
|
if (useLargePromptPath) {
|
||||||
|
// Stage prompt as a Secret; the init container copies from the mounted
|
||||||
|
// secret volume to the emptyDir so the main container reads it the
|
||||||
|
// same way regardless of prompt size.
|
||||||
|
promptSecret = {
|
||||||
|
name: promptSecretName,
|
||||||
|
namespace,
|
||||||
|
data: { "prompt.txt": prompt },
|
||||||
|
};
|
||||||
|
volumes.push({
|
||||||
|
name: "prompt-secret",
|
||||||
|
secret: { secretName: promptSecretName, optional: false },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const initContainer: k8s.V1Container = useLargePromptPath
|
||||||
|
? {
|
||||||
|
name: "write-prompt",
|
||||||
|
image: "busybox:1.36",
|
||||||
|
imagePullPolicy: "IfNotPresent",
|
||||||
|
command: ["sh", "-c", "cp /tmp/prompt-secret/prompt.txt /tmp/prompt/prompt.txt"],
|
||||||
|
volumeMounts: [
|
||||||
|
{ name: "prompt", mountPath: "/tmp/prompt" },
|
||||||
|
{ name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true },
|
||||||
|
],
|
||||||
|
securityContext,
|
||||||
|
resources: {
|
||||||
|
requests: { cpu: "10m", memory: "16Mi" },
|
||||||
|
limits: { cpu: "100m", memory: "64Mi" },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
: {
|
||||||
|
name: "write-prompt",
|
||||||
|
image: "busybox:1.36",
|
||||||
|
imagePullPolicy: "IfNotPresent",
|
||||||
|
command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
|
||||||
|
env: [{ name: "PROMPT_CONTENT", value: prompt }],
|
||||||
|
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
|
||||||
|
securityContext,
|
||||||
|
resources: {
|
||||||
|
requests: { cpu: "10m", memory: "16Mi" },
|
||||||
|
limits: { cpu: "100m", memory: "64Mi" },
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
const job: k8s.V1Job = {
|
const job: k8s.V1Job = {
|
||||||
apiVersion: "batch/v1",
|
apiVersion: "batch/v1",
|
||||||
kind: "Job",
|
kind: "Job",
|
||||||
@@ -362,23 +485,9 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
|||||||
securityContext: podSecurityContext,
|
securityContext: podSecurityContext,
|
||||||
...(selfPod.imagePullSecrets.length > 0 ? { imagePullSecrets: selfPod.imagePullSecrets } : {}),
|
...(selfPod.imagePullSecrets.length > 0 ? { imagePullSecrets: selfPod.imagePullSecrets } : {}),
|
||||||
...(selfPod.dnsConfig ? { dnsConfig: selfPod.dnsConfig } : {}),
|
...(selfPod.dnsConfig ? { dnsConfig: selfPod.dnsConfig } : {}),
|
||||||
...(Object.keys(nodeSelector).length > 0 ? { nodeSelector: nodeSelector as Record<string, string> } : {}),
|
...(Object.keys(nodeSelector).length > 0 ? { nodeSelector } : {}),
|
||||||
...(tolerations.length > 0 ? { tolerations: tolerations as k8s.V1Toleration[] } : {}),
|
...(tolerations.length > 0 ? { tolerations: tolerations as k8s.V1Toleration[] } : {}),
|
||||||
initContainers: [
|
initContainers: [initContainer],
|
||||||
{
|
|
||||||
name: "write-prompt",
|
|
||||||
image: "busybox:1.36",
|
|
||||||
imagePullPolicy: "IfNotPresent",
|
|
||||||
command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
|
|
||||||
env: [{ name: "PROMPT_CONTENT", value: prompt }],
|
|
||||||
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
|
|
||||||
securityContext,
|
|
||||||
resources: {
|
|
||||||
requests: { cpu: "10m", memory: "16Mi" },
|
|
||||||
limits: { cpu: "100m", memory: "64Mi" },
|
|
||||||
},
|
|
||||||
},
|
|
||||||
],
|
|
||||||
containers: [
|
containers: [
|
||||||
{
|
{
|
||||||
name: "claude",
|
name: "claude",
|
||||||
@@ -399,5 +508,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics };
|
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret };
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -141,6 +141,19 @@ more raw output`;
|
|||||||
expect(result.summary).toContain("JSON output");
|
expect(result.summary).toContain("JSON output");
|
||||||
expect(result.summary).not.toContain("some raw output");
|
expect(result.summary).not.toContain("some raw output");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("deduplicates identical assistant text blocks from reconnect replays", () => {
|
||||||
|
const assistantEvent = JSON.stringify({
|
||||||
|
type: "assistant",
|
||||||
|
message: { content: [{ type: "text", text: "Hello world" }] },
|
||||||
|
});
|
||||||
|
// Simulate the same assistant event appearing twice (log stream reconnect replay)
|
||||||
|
const stdout = `${assistantEvent}\n${assistantEvent}\n`;
|
||||||
|
const result = parseClaudeStreamJson(stdout);
|
||||||
|
expect(result.summary).toBe("Hello world");
|
||||||
|
// Should not be "Hello world\n\nHello world"
|
||||||
|
expect(result.summary.split("Hello world").length).toBe(2);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("extractClaudeLoginUrl", () => {
|
describe("extractClaudeLoginUrl", () => {
|
||||||
|
|||||||
+7
-1
@@ -9,6 +9,9 @@ export function parseClaudeStreamJson(stdout: string) {
|
|||||||
let model = "";
|
let model = "";
|
||||||
let finalResult: Record<string, unknown> | null = null;
|
let finalResult: Record<string, unknown> | null = null;
|
||||||
const assistantTexts: string[] = [];
|
const assistantTexts: string[] = [];
|
||||||
|
// Belt-and-braces dedup: track seen text blocks to filter duplicates
|
||||||
|
// caused by log stream reconnects replaying overlapping windows.
|
||||||
|
const seenTexts = new Set<string>();
|
||||||
|
|
||||||
for (const rawLine of stdout.split(/\r?\n/)) {
|
for (const rawLine of stdout.split(/\r?\n/)) {
|
||||||
const line = rawLine.trim();
|
const line = rawLine.trim();
|
||||||
@@ -32,7 +35,10 @@ export function parseClaudeStreamJson(stdout: string) {
|
|||||||
const block = entry as Record<string, unknown>;
|
const block = entry as Record<string, unknown>;
|
||||||
if (asString(block.type, "") === "text") {
|
if (asString(block.type, "") === "text") {
|
||||||
const text = asString(block.text, "");
|
const text = asString(block.text, "");
|
||||||
if (text) assistantTexts.push(text);
|
if (text && !seenTexts.has(text)) {
|
||||||
|
seenTexts.add(text);
|
||||||
|
assistantTexts.push(text);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
|||||||
Reference in New Issue
Block a user