fix: reattach to orphaned K8s Jobs across Paperclip restarts (FAR-124)

When the Paperclip pod restarts mid-run, the in-process setInterval
keepalive dies, `updatedAt` goes stale, and the server's orphan reaper
fails the run with the (misleading) "child pid 1 is no longer running"
message.  Paperclip then dispatches a continuation run, whose execute()
finds the previous run's K8s Job still happily running and deletes it
as an "orphan" — throwing away work and producing the transcript/run
cascade reported on FAR-124.

Changes:

- job-manifest: add `paperclip.io/task-id` and `paperclip.io/session-id`
  labels (sanitized via new `sanitizeLabelValue` helper) so a later
  execute() can identify an orphan as the continuation of the same
  logical unit of work.
- execute: in the concurrency guard, when `reattachOrphanedJobs` is on
  (default) and an orphan matches agent + task + session + is not
  terminal, pick it as the reattach target; delete only the other
  orphans.  Branch the build/create/waitForPod block so the reattach
  path skips manifest building, Secret creation, Job creation, and
  scheduling wait — it jumps straight to streaming logs and waiting
  for the existing pod's completion.
- config-schema: expose `reattachOrphanedJobs` toggle (default true).
- Tests: `sanitizeLabelValue`, `isReattachableOrphan`, new label
  presence/absence, config default.

No server-side changes; the misleading reaper message and lack of a
non-local retry path will be addressed in a follow-up upstream PR.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Test User
2026-04-22 21:59:25 +00:00
parent a4631ac756
commit c8968598e4
6 changed files with 370 additions and 79 deletions
+194 -77
View File
@@ -7,7 +7,7 @@ import {
isClaudeUnknownSessionError,
} from "./parse.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
import { buildJobManifest } from "./job-manifest.js";
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import type * as k8s from "@kubernetes/client-node";
import { Writable } from "node:stream";
@@ -70,6 +70,32 @@ export function buildPartialRunError(
: `Claude exited with code ${exitCode ?? -1}`;
}
/**
* Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does
* not match the current runId) as a potential reattach target. A Job is
* reattachable when it belongs to the same agent, same task, and same resume
* session as the current run — meaning the previous Paperclip instance was
* mid-stream on the exact piece of work this new run was dispatched to do.
* Exported for unit tests.
*/
export function isReattachableOrphan(
job: k8s.V1Job,
expected: { agentId: string; taskId: string | null; sessionId: string | null },
): boolean {
if (!expected.taskId || !expected.sessionId) return false;
const labels = job.metadata?.labels ?? {};
if (labels["paperclip.io/adapter-type"] !== "claude_k8s") return false;
if (labels["paperclip.io/agent-id"] !== expected.agentId) return false;
if (labels["paperclip.io/task-id"] !== expected.taskId) return false;
if (labels["paperclip.io/session-id"] !== expected.sessionId) return false;
const conditions = job.status?.conditions ?? [];
const terminal = conditions.some(
(c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True",
);
if (terminal) return false;
return true;
}
/**
* Wait for the Job's pod to reach a terminal or running state.
* Returns the pod name once logs can be streamed, or throws on failure.
@@ -411,10 +437,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
// still be running. We detect those by comparing the Job's run-id label against
// the current runId and clean them up so this execution can proceed.
// the current runId. When reattachOrphanedJobs is enabled and the orphan matches
// the current agent+task+session, we attach to it instead of deleting it (FAR-124).
const agentId = ctx.agent.id;
const selfPod = await getSelfPodInfo(kubeconfigPath);
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
const runtimeSessionParams = parseObject(runtime.sessionParams);
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null;
const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, "");
const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null;
let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null;
try {
const batchApi = getBatchApi(kubeconfigPath);
const existing = await batchApi.listNamespacedJob({
@@ -434,10 +468,42 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
(j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId,
);
if (orphaned.length > 0) {
const orphanNames = orphaned.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of orphaned) {
// Pick the most recent reattachable orphan — same agent + task + session,
// not terminal. Only one target is chosen; any other orphans get
// cleaned up as before.
if (reattachOrphanedJobs && orphaned.length > 0) {
const candidates = orphaned
.filter((j) =>
isReattachableOrphan(j, {
agentId,
taskId: currentTaskLabel,
sessionId: currentSessionLabel,
}),
)
.sort((a, b) => {
const at = new Date(a.metadata?.creationTimestamp ?? 0).getTime();
const bt = new Date(b.metadata?.creationTimestamp ?? 0).getTime();
return bt - at;
});
const chosen = candidates[0];
const chosenName = chosen?.metadata?.name;
if (chosen && chosenName) {
reattachTarget = {
jobName: chosenName,
namespace: chosen.metadata?.namespace ?? guardNamespace,
priorRunId: chosen.metadata?.labels?.["paperclip.io/run-id"] ?? "",
image: chosen.spec?.template?.spec?.containers?.[0]?.image ?? "unknown",
};
}
}
const toDelete = orphaned.filter(
(j) => !reattachTarget || j.metadata?.name !== reattachTarget.jobName,
);
if (toDelete.length > 0) {
const orphanNames = toDelete.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Cleaning up ${toDelete.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
for (const j of toDelete) {
const name = j.metadata?.name;
if (name) {
await cleanupJob(guardNamespace, name, onLog, kubeconfigPath);
@@ -475,84 +541,120 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
// Build Job manifest
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
ctx,
selfPod,
});
// Report invocation metadata
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: claudeArgs,
commandNotes: [
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
context: ctx.context,
} as Parameters<typeof onMeta>[0]);
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
const coreApi = getCoreApi(kubeconfigPath);
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
const batchApi = getBatchApi(kubeconfigPath);
let jobName: string;
let namespace: string;
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
if (reattachTarget) {
jobName = reattachTarget.jobName;
namespace = reattachTarget.namespace;
// Announce reattach metadata. Prompt and args aren't known here — they
// belong to the prior run that created this pod and are already present
// on the running container.
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: [],
commandNotes: [
`Image: ${reattachTarget.image}`,
`Namespace: ${namespace}`,
`Reattached from prior run: ${reattachTarget.priorRunId || "unknown"}`,
`Timeout: ${timeoutSec}s`,
],
prompt: "",
context: ctx.context,
} as Parameters<typeof onMeta>[0]);
}
await onLog("stdout", `[paperclip] Reattaching to in-flight K8s Job ${jobName} in namespace ${namespace} (prior run ${reattachTarget.priorRunId || "unknown"})\n`);
} else {
// Build Job manifest
const built = buildJobManifest({ ctx, selfPod });
const job = built.job;
jobName = built.jobName;
namespace = built.namespace;
const prompt = built.prompt;
const claudeArgs = built.claudeArgs;
const promptMetrics = built.promptMetrics;
promptSecret = built.promptSecret;
// Report invocation metadata
if (onMeta) {
await onMeta({
adapterType: "claude_k8s",
command: `kubectl job/${jobName}`,
cwd: namespace,
commandArgs: claudeArgs,
commandNotes: [
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
`Namespace: ${namespace}`,
`Timeout: ${timeoutSec}s`,
],
prompt,
...(promptMetrics ? { promptMetrics } : {}),
context: ctx.context,
} as Parameters<typeof onMeta>[0]);
}
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
// PodSpec limit). The Secret is cleaned up in the finally block.
if (promptSecret) {
try {
await coreApi.createNamespacedSecret({
namespace: promptSecret.namespace,
body: {
apiVersion: "v1",
kind: "Secret",
metadata: {
name: promptSecret.name,
namespace: promptSecret.namespace,
labels: {
"app.kubernetes.io/managed-by": "paperclip",
"paperclip.io/adapter-type": "claude_k8s",
"paperclip.io/run-id": runId,
},
},
stringData: promptSecret.data,
},
stringData: promptSecret.data,
},
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
});
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
};
}
}
// Create the Job
try {
await batchApi.createNamespacedJob({ namespace, body: job });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_prompt_secret_create_failed",
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
errorCode: "k8s_job_create_failed",
};
}
}
// Create the Job
const batchApi = getBatchApi(kubeconfigPath);
try {
await batchApi.createNamespacedJob({ namespace, body: job });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
errorCode: "k8s_job_create_failed",
};
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
let stdout = "";
let exitCode: number | null = null;
let jobTimedOut = false;
@@ -566,8 +668,23 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
let podName: string;
try {
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
if (reattachTarget) {
// Pod is already running from the prior run — look it up directly.
const podList = await coreApi.listNamespacedPod({
namespace,
labelSelector: `job-name=${jobName}`,
});
const pod = podList.items[0];
const name = pod?.metadata?.name;
if (!name) {
throw new Error(`Reattach target Job ${jobName} has no pod`);
}
podName = name;
await onLog("stdout", `[paperclip] Reattached to pod ${podName}\n`);
} else {
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
}
// Notify the server that execution has started. This sets
// processStartedAt and refreshes updatedAt in the DB, which the
@@ -581,13 +698,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Pod scheduling failed: ${msg}\n`);
const phase = reattachTarget ? "reattach" : "scheduling";
await onLog("stderr", `[paperclip] Pod ${phase} failed: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Pod scheduling failed: ${msg}`,
errorCode: "k8s_pod_schedule_failed",
errorMessage: `Pod ${phase} failed: ${msg}`,
errorCode: reattachTarget ? "k8s_pod_reattach_failed" : "k8s_pod_schedule_failed",
};
}
@@ -826,8 +944,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
})();
const runtimeSessionParams = parseObject(runtime.sessionParams);
const fallbackSessionId = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
const fallbackSessionId = currentSessionIdRaw;
const resolvedSessionId = parsedStream.sessionId
?? (asString(parsed.session_id as string, fallbackSessionId) || fallbackSessionId);
const model = asString(config.model, "");