diff --git a/src/server/config-schema.test.ts b/src/server/config-schema.test.ts index 3f62027..c5aebfc 100644 --- a/src/server/config-schema.test.ts +++ b/src/server/config-schema.test.ts @@ -42,6 +42,14 @@ describe("getConfigSchema", () => { expect(field!.default).toBe(true); }); + it("reattachOrphanedJobs defaults to true", () => { + const schema = getConfigSchema(); + const field = schema.fields.find((f: ConfigFieldSchema) => f.key === "reattachOrphanedJobs"); + expect(field).toBeDefined(); + expect(field!.type).toBe("toggle"); + expect(field!.default).toBe(true); + }); + it("has imagePullPolicy as select with correct options", () => { const schema = getConfigSchema(); const field = schema.fields.find((f: ConfigFieldSchema) => f.key === "imagePullPolicy"); diff --git a/src/server/config-schema.ts b/src/server/config-schema.ts index 9e3de9a..5f8caa1 100644 --- a/src/server/config-schema.ts +++ b/src/server/config-schema.ts @@ -89,6 +89,13 @@ export function getConfigSchema(): AdapterConfigSchema { label: "Retain Jobs", hint: "Skip cleanup of completed Jobs for debugging purposes.", }, + { + type: "toggle", + key: "reattachOrphanedJobs", + label: "Reattach to Orphaned Jobs", + hint: "If a prior K8s Job for the same agent/task/session is still running (e.g. Paperclip restarted mid-run), attach to it and stream its output instead of deleting it and starting a new pod. Default: on.", + default: true, + }, // Resource Limits { type: "text", diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index 24e924a..1822a37 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -1,5 +1,29 @@ import { describe, it, expect } from "vitest"; -import { isK8s404, buildPartialRunError } from "./execute.js"; +import type * as k8s from "@kubernetes/client-node"; +import { isK8s404, buildPartialRunError, isReattachableOrphan } from "./execute.js"; + +function makeJob(opts: { + runId?: string; + agentId?: string; + taskId?: string; + sessionId?: string; + adapterType?: string; + terminal?: boolean; +}): k8s.V1Job { + const labels: Record = { + "paperclip.io/adapter-type": opts.adapterType ?? "claude_k8s", + }; + if (opts.agentId) labels["paperclip.io/agent-id"] = opts.agentId; + if (opts.runId) labels["paperclip.io/run-id"] = opts.runId; + if (opts.taskId) labels["paperclip.io/task-id"] = opts.taskId; + if (opts.sessionId) labels["paperclip.io/session-id"] = opts.sessionId; + return { + metadata: { name: "ac-job", namespace: "paperclip", labels }, + status: opts.terminal + ? { conditions: [{ type: "Complete", status: "True" }] } + : { conditions: [] }, + } as k8s.V1Job; +} describe("isK8s404", () => { it("returns false for non-Error values", () => { @@ -106,3 +130,59 @@ describe("buildPartialRunError", () => { expect(msg).toBe("Claude exited with code 1: real error line"); }); }); + +describe("isReattachableOrphan", () => { + const agentId = "agent-abc"; + const taskId = "task-xyz"; + const sessionId = "sess-123"; + + it("returns true when agent/task/session all match and Job is not terminal", () => { + const job = makeJob({ agentId, taskId, sessionId, runId: "old-run" }); + expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(true); + }); + + it("returns false when the Job is already Complete", () => { + const job = makeJob({ agentId, taskId, sessionId, runId: "old-run", terminal: true }); + expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false); + }); + + it("returns false when expected taskId is null (caller couldn't derive one)", () => { + const job = makeJob({ agentId, taskId, sessionId }); + expect(isReattachableOrphan(job, { agentId, taskId: null, sessionId })).toBe(false); + }); + + it("returns false when expected sessionId is null", () => { + const job = makeJob({ agentId, taskId, sessionId }); + expect(isReattachableOrphan(job, { agentId, taskId, sessionId: null })).toBe(false); + }); + + it("returns false when agent id doesn't match", () => { + const job = makeJob({ agentId: "agent-other", taskId, sessionId }); + expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false); + }); + + it("returns false when task id doesn't match", () => { + const job = makeJob({ agentId, taskId: "task-other", sessionId }); + expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false); + }); + + it("returns false when session id doesn't match", () => { + const job = makeJob({ agentId, taskId, sessionId: "sess-other" }); + expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false); + }); + + it("returns false when the Job is from a different adapter type", () => { + const job = makeJob({ agentId, taskId, sessionId, adapterType: "claude_local" }); + expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false); + }); + + it("returns false when Job has no task-id label (labels were introduced in FAR-124)", () => { + const job = makeJob({ agentId, sessionId }); + expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false); + }); + + it("returns false when Job has no session-id label", () => { + const job = makeJob({ agentId, taskId }); + expect(isReattachableOrphan(job, { agentId, taskId, sessionId })).toBe(false); + }); +}); diff --git a/src/server/execute.ts b/src/server/execute.ts index 868002a..bdaf1b8 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -7,7 +7,7 @@ import { isClaudeUnknownSessionError, } from "./parse.js"; import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js"; -import { buildJobManifest } from "./job-manifest.js"; +import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js"; import { LogLineDedupFilter } from "./log-dedup.js"; import type * as k8s from "@kubernetes/client-node"; import { Writable } from "node:stream"; @@ -70,6 +70,32 @@ export function buildPartialRunError( : `Claude exited with code ${exitCode ?? -1}`; } +/** + * Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does + * not match the current runId) as a potential reattach target. A Job is + * reattachable when it belongs to the same agent, same task, and same resume + * session as the current run — meaning the previous Paperclip instance was + * mid-stream on the exact piece of work this new run was dispatched to do. + * Exported for unit tests. + */ +export function isReattachableOrphan( + job: k8s.V1Job, + expected: { agentId: string; taskId: string | null; sessionId: string | null }, +): boolean { + if (!expected.taskId || !expected.sessionId) return false; + const labels = job.metadata?.labels ?? {}; + if (labels["paperclip.io/adapter-type"] !== "claude_k8s") return false; + if (labels["paperclip.io/agent-id"] !== expected.agentId) return false; + if (labels["paperclip.io/task-id"] !== expected.taskId) return false; + if (labels["paperclip.io/session-id"] !== expected.sessionId) return false; + const conditions = job.status?.conditions ?? []; + const terminal = conditions.some( + (c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True", + ); + if (terminal) return false; + return true; +} + /** * Wait for the Job's pod to reach a terminal or running state. * Returns the pod name once logs can be streamed, or throws on failure. @@ -411,10 +437,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId, ); - if (orphaned.length > 0) { - const orphanNames = orphaned.map((j) => j.metadata?.name).join(", "); - await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`); - for (const j of orphaned) { + // Pick the most recent reattachable orphan — same agent + task + session, + // not terminal. Only one target is chosen; any other orphans get + // cleaned up as before. + if (reattachOrphanedJobs && orphaned.length > 0) { + const candidates = orphaned + .filter((j) => + isReattachableOrphan(j, { + agentId, + taskId: currentTaskLabel, + sessionId: currentSessionLabel, + }), + ) + .sort((a, b) => { + const at = new Date(a.metadata?.creationTimestamp ?? 0).getTime(); + const bt = new Date(b.metadata?.creationTimestamp ?? 0).getTime(); + return bt - at; + }); + const chosen = candidates[0]; + const chosenName = chosen?.metadata?.name; + if (chosen && chosenName) { + reattachTarget = { + jobName: chosenName, + namespace: chosen.metadata?.namespace ?? guardNamespace, + priorRunId: chosen.metadata?.labels?.["paperclip.io/run-id"] ?? "", + image: chosen.spec?.template?.spec?.containers?.[0]?.image ?? "unknown", + }; + } + } + + const toDelete = orphaned.filter( + (j) => !reattachTarget || j.metadata?.name !== reattachTarget.jobName, + ); + if (toDelete.length > 0) { + const orphanNames = toDelete.map((j) => j.metadata?.name).join(", "); + await onLog("stdout", `[paperclip] Cleaning up ${toDelete.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`); + for (const j of toDelete) { const name = j.metadata?.name; if (name) { await cleanupJob(guardNamespace, name, onLog, kubeconfigPath); @@ -475,84 +541,120 @@ export async function execute(ctx: AdapterExecutionContext): Promise[0]); - } - - // If the prompt is large, create a Secret to hold it (avoids the ~1 MiB - // PodSpec limit). The Secret is cleaned up in the finally block. const coreApi = getCoreApi(kubeconfigPath); - if (promptSecret) { - try { - await coreApi.createNamespacedSecret({ - namespace: promptSecret.namespace, - body: { - apiVersion: "v1", - kind: "Secret", - metadata: { - name: promptSecret.name, - namespace: promptSecret.namespace, - labels: { - "app.kubernetes.io/managed-by": "paperclip", - "paperclip.io/adapter-type": "claude_k8s", - "paperclip.io/run-id": runId, + const batchApi = getBatchApi(kubeconfigPath); + + let jobName: string; + let namespace: string; + let promptSecret: { name: string; namespace: string; data: Record } | null = null; + + if (reattachTarget) { + jobName = reattachTarget.jobName; + namespace = reattachTarget.namespace; + + // Announce reattach metadata. Prompt and args aren't known here — they + // belong to the prior run that created this pod and are already present + // on the running container. + if (onMeta) { + await onMeta({ + adapterType: "claude_k8s", + command: `kubectl job/${jobName}`, + cwd: namespace, + commandArgs: [], + commandNotes: [ + `Image: ${reattachTarget.image}`, + `Namespace: ${namespace}`, + `Reattached from prior run: ${reattachTarget.priorRunId || "unknown"}`, + `Timeout: ${timeoutSec}s`, + ], + prompt: "", + context: ctx.context, + } as Parameters[0]); + } + + await onLog("stdout", `[paperclip] Reattaching to in-flight K8s Job ${jobName} in namespace ${namespace} (prior run ${reattachTarget.priorRunId || "unknown"})\n`); + } else { + // Build Job manifest + const built = buildJobManifest({ ctx, selfPod }); + const job = built.job; + jobName = built.jobName; + namespace = built.namespace; + const prompt = built.prompt; + const claudeArgs = built.claudeArgs; + const promptMetrics = built.promptMetrics; + promptSecret = built.promptSecret; + + // Report invocation metadata + if (onMeta) { + await onMeta({ + adapterType: "claude_k8s", + command: `kubectl job/${jobName}`, + cwd: namespace, + commandArgs: claudeArgs, + commandNotes: [ + `Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`, + `Namespace: ${namespace}`, + `Timeout: ${timeoutSec}s`, + ], + prompt, + ...(promptMetrics ? { promptMetrics } : {}), + context: ctx.context, + } as Parameters[0]); + } + + // If the prompt is large, create a Secret to hold it (avoids the ~1 MiB + // PodSpec limit). The Secret is cleaned up in the finally block. + if (promptSecret) { + try { + await coreApi.createNamespacedSecret({ + namespace: promptSecret.namespace, + body: { + apiVersion: "v1", + kind: "Secret", + metadata: { + name: promptSecret.name, + namespace: promptSecret.namespace, + labels: { + "app.kubernetes.io/managed-by": "paperclip", + "paperclip.io/adapter-type": "claude_k8s", + "paperclip.io/run-id": runId, + }, }, + stringData: promptSecret.data, }, - stringData: promptSecret.data, - }, - }); - await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`); + }); + await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`); + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: `Failed to create prompt Secret: ${msg}`, + errorCode: "k8s_prompt_secret_create_failed", + }; + } + } + + // Create the Job + try { + await batchApi.createNamespacedJob({ namespace, body: job }); } catch (err) { const msg = err instanceof Error ? err.message : String(err); - await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`); + await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`); return { exitCode: null, signal: null, timedOut: false, - errorMessage: `Failed to create prompt Secret: ${msg}`, - errorCode: "k8s_prompt_secret_create_failed", + errorMessage: `Failed to create Kubernetes Job: ${msg}`, + errorCode: "k8s_job_create_failed", }; } - } - // Create the Job - const batchApi = getBatchApi(kubeconfigPath); - try { - await batchApi.createNamespacedJob({ namespace, body: job }); - } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`); - return { - exitCode: null, - signal: null, - timedOut: false, - errorMessage: `Failed to create Kubernetes Job: ${msg}`, - errorCode: "k8s_job_create_failed", - }; + await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`); } - await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`); - let stdout = ""; let exitCode: number | null = null; let jobTimedOut = false; @@ -566,8 +668,23 @@ export async function execute(ctx: AdapterExecutionContext): Promise = {}): AdapterExecutionContext { @@ -136,6 +136,36 @@ describe("buildJobManifest", () => { expect(job.metadata?.labels?.env).toBe("prod"); expect(job.metadata?.labels?.["paperclip.io/adapter-type"]).toBe("claude_k8s"); }); + + it("adds task-id label when context provides taskId", () => { + ctx.context = { taskId: "task-xyz-789" }; + const { job } = buildJobManifest({ ctx, selfPod }); + expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBe("task-xyz-789"); + }); + + it("falls back to issueId when taskId absent", () => { + ctx.context = { issueId: "issue-42" }; + const { job } = buildJobManifest({ ctx, selfPod }); + expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBe("issue-42"); + }); + + it("adds session-id label when runtime provides sessionId", () => { + ctx.runtime = { ...ctx.runtime, sessionId: "sess-abc-1234" }; + const { job } = buildJobManifest({ ctx, selfPod }); + expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBe("sess-abc-1234"); + }); + + it("reads sessionId from runtime.sessionParams when sessionId prop missing", () => { + ctx.runtime = { ...ctx.runtime, sessionParams: { sessionId: "sess-from-params" } }; + const { job } = buildJobManifest({ ctx, selfPod }); + expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBe("sess-from-params"); + }); + + it("omits task-id and session-id labels when neither is provided", () => { + const { job } = buildJobManifest({ ctx, selfPod }); + expect(job.metadata?.labels?.["paperclip.io/task-id"]).toBeUndefined(); + expect(job.metadata?.labels?.["paperclip.io/session-id"]).toBeUndefined(); + }); }); describe("annotations", () => { @@ -729,3 +759,32 @@ describe("buildJobManifest", () => { }); }); }); + +describe("sanitizeLabelValue", () => { + it("passes through already-valid UUIDs and slugs", () => { + expect(sanitizeLabelValue("abc-123-def")).toBe("abc-123-def"); + expect(sanitizeLabelValue("0d8b4472-c42c-4052-aab1-e32897909afa")).toBe("0d8b4472-c42c-4052-aab1-e32897909afa"); + }); + + it("strips characters outside [a-zA-Z0-9._-]", () => { + expect(sanitizeLabelValue("task:xyz/123")).toBe("taskxyz123"); + expect(sanitizeLabelValue("abc 123")).toBe("abc123"); + }); + + it("trims leading/trailing non-alphanumeric characters", () => { + expect(sanitizeLabelValue("--abc--")).toBe("abc"); + expect(sanitizeLabelValue("...123...")).toBe("123"); + }); + + it("truncates to the configured maxLen", () => { + const long = "a".repeat(200); + const out = sanitizeLabelValue(long, 63); + expect(out?.length).toBe(63); + }); + + it("returns null when no alphanumeric characters remain", () => { + expect(sanitizeLabelValue("---")).toBeNull(); + expect(sanitizeLabelValue("")).toBeNull(); + expect(sanitizeLabelValue(" ")).toBeNull(); + }); +}); diff --git a/src/server/job-manifest.ts b/src/server/job-manifest.ts index 0ca96da..56569c0 100644 --- a/src/server/job-manifest.ts +++ b/src/server/job-manifest.ts @@ -202,6 +202,17 @@ function sanitizeForK8sName(value: string, maxLen = 16): string { return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen); } +/** + * Sanitize a string for use as a Kubernetes label value (RFC 1123 subset: + * `[a-zA-Z0-9]([-_.a-zA-Z0-9]*[a-zA-Z0-9])?`, max 63 chars). Returns `null` + * when no usable characters remain — the caller should omit the label. + */ +export function sanitizeLabelValue(value: string, maxLen = 63): string | null { + const cleaned = value.replace(/[^a-zA-Z0-9._-]/g, "").slice(0, maxLen); + const trimmed = cleaned.replace(/^[^a-zA-Z0-9]+/, "").replace(/[^a-zA-Z0-9]+$/, ""); + return trimmed.length > 0 ? trimmed : null; +} + /** * Build a short deterministic hash suffix from the raw inputs to avoid * collisions when sanitized slugs happen to be identical. @@ -428,6 +439,15 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { "paperclip.io/company-id": agent.companyId, "paperclip.io/adapter-type": "claude_k8s", }; + // Reattach-target labels: let a future execute() identify this Job as the + // continuation of the same logical unit of work (same task + same resume + // session) so it can attach to the running pod across a Paperclip restart + // instead of deleting it and starting over (FAR-124). + const taskIdRaw = asString(context.taskId, "") || asString(context.issueId, ""); + const taskLabel = taskIdRaw ? sanitizeLabelValue(taskIdRaw) : null; + if (taskLabel) labels["paperclip.io/task-id"] = taskLabel; + const sessionLabel = runtimeSessionId ? sanitizeLabelValue(runtimeSessionId) : null; + if (sessionLabel) labels["paperclip.io/session-id"] = sessionLabel; for (const [key, value] of Object.entries(extraLabels)) { labels[key] = value; }