chore: update lockfile
Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Vendored
+370
-104
@@ -1,12 +1,110 @@
|
||||
import { asString, asNumber, asBoolean, parseObject } from "@paperclipai/adapter-utils/server-utils";
|
||||
import { parseClaudeStreamJson, describeClaudeFailure, isClaudeMaxTurnsResult, isClaudeUnknownSessionError, } from "./parse.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
|
||||
import { buildJobManifest } from "./job-manifest.js";
|
||||
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
|
||||
import { LogLineDedupFilter } from "./log-dedup.js";
|
||||
import { Writable } from "node:stream";
|
||||
const POLL_INTERVAL_MS = 2000;
|
||||
const KEEPALIVE_INTERVAL_MS = 15_000;
|
||||
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
|
||||
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
|
||||
// How long to keep refreshing onSpawn after the Job reaches a terminal state.
|
||||
// Covers the cleanup path (delete job, parse stdout) so a slow K8s API call
|
||||
// doesn't trip the 5-minute reaper staleness window.
|
||||
const POST_TERMINAL_KEEPALIVE_MS = 90_000;
|
||||
/**
|
||||
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
|
||||
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
|
||||
* Exported for unit tests.
|
||||
*/
|
||||
export function isK8s404(err) {
|
||||
if (!(err instanceof Error))
|
||||
return false;
|
||||
const e = err;
|
||||
const resp = e.response;
|
||||
if (resp?.statusCode === 404 || resp?.status === 404)
|
||||
return true;
|
||||
if (e.statusCode === 404)
|
||||
return true;
|
||||
return /HTTP-Code:\s*404\b/.test(err.message);
|
||||
}
|
||||
/**
|
||||
* Build the error message when Claude's stdout contains no result event.
|
||||
* Skips system/init event lines so the UI doesn't display the raw init JSON.
|
||||
* Exported for unit tests.
|
||||
*/
|
||||
export function buildPartialRunError(exitCode, model, stdout) {
|
||||
if (exitCode === 0)
|
||||
return "Failed to parse Claude JSON output";
|
||||
// Walk stdout lines, skip system events, return the first real content line.
|
||||
const firstContentLine = stdout.split(/\r?\n/)
|
||||
.map((l) => l.trim())
|
||||
.find((l) => {
|
||||
if (!l)
|
||||
return false;
|
||||
try {
|
||||
const obj = JSON.parse(l);
|
||||
if (typeof obj === "object" && obj !== null && obj.type === "system")
|
||||
return false;
|
||||
}
|
||||
catch {
|
||||
// not JSON — treat as content
|
||||
}
|
||||
return true;
|
||||
}) ?? "";
|
||||
// If we only have system/init events and nothing else, surface the model
|
||||
// name so the operator can diagnose missing credentials or unsupported model.
|
||||
const initOnlyOutput = stdout.trim() !== "" && model !== "" && !firstContentLine;
|
||||
if (initOnlyOutput) {
|
||||
const modelHint = model ? ` (model: ${model})` : "";
|
||||
return `Claude started but did not produce a result${modelHint} — check API credentials, model support, and adapter config`;
|
||||
}
|
||||
return firstContentLine
|
||||
? `Claude exited with code ${exitCode ?? -1}: ${firstContentLine}`
|
||||
: `Claude exited with code ${exitCode ?? -1}`;
|
||||
}
|
||||
/**
|
||||
* Evaluate an orphaned K8s Job (one whose `paperclip.io/run-id` label does
|
||||
* not match the current runId) as a potential reattach target. A Job is
|
||||
* reattachable when it belongs to the same agent, same task, and same resume
|
||||
* session as the current run — meaning the previous Paperclip instance was
|
||||
* mid-stream on the exact piece of work this new run was dispatched to do.
|
||||
* Exported for unit tests.
|
||||
*/
|
||||
export function isReattachableOrphan(job, expected) {
|
||||
if (!expected.taskId || !expected.sessionId)
|
||||
return false;
|
||||
const labels = job.metadata?.labels ?? {};
|
||||
if (labels["paperclip.io/adapter-type"] !== "claude_k8s")
|
||||
return false;
|
||||
if (labels["paperclip.io/agent-id"] !== expected.agentId)
|
||||
return false;
|
||||
if (labels["paperclip.io/task-id"] !== expected.taskId)
|
||||
return false;
|
||||
if (labels["paperclip.io/session-id"] !== expected.sessionId)
|
||||
return false;
|
||||
const conditions = job.status?.conditions ?? [];
|
||||
const terminal = conditions.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True");
|
||||
if (terminal)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* Build an error message for a pod that reached phase=Failed before or
|
||||
* instead of streaming logs. Includes the claude container's terminated exit
|
||||
* code and reason when available so operators can diagnose crashes without
|
||||
* needing kubectl. Exported for unit tests.
|
||||
*/
|
||||
export function describePodTerminatedError(podName, phase, containerStatuses) {
|
||||
const mainCs = containerStatuses.find((cs) => cs.name === "claude");
|
||||
const terminated = mainCs?.state?.terminated;
|
||||
if (terminated) {
|
||||
const code = terminated.exitCode ?? "unknown";
|
||||
const reason = terminated.reason ?? terminated.message ?? "no reason";
|
||||
return `Pod ${podName} reached phase=${phase}: claude exited ${code} (${reason})`;
|
||||
}
|
||||
return `Pod ${podName} reached phase=${phase}`;
|
||||
}
|
||||
/**
|
||||
* Wait for the Job's pod to reach a terminal or running state.
|
||||
* Returns the pod name once logs can be streamed, or throws on failure.
|
||||
@@ -52,14 +150,22 @@ async function waitForPod(namespace, jobName, timeoutMs, onLog, kubeconfigPath)
|
||||
details.push(`${cs.name}: waiting (${cs.state.waiting.reason ?? "unknown"})`);
|
||||
else if (cs.state?.running)
|
||||
details.push(`${cs.name}: running`);
|
||||
else if (cs.state?.terminated)
|
||||
details.push(`${cs.name}: terminated (exit ${cs.state.terminated.exitCode ?? "?"}, ${cs.state.terminated.reason ?? "no reason"})`);
|
||||
}
|
||||
await onLog("stdout", `[paperclip] Pod ${podName}: ${details.join(", ")}\n`);
|
||||
lastStatus = statusKey;
|
||||
}
|
||||
// Ready to stream logs
|
||||
if (phase === "Running" || phase === "Succeeded" || phase === "Failed") {
|
||||
if (phase === "Running" || phase === "Succeeded") {
|
||||
return podName;
|
||||
}
|
||||
// phase=Failed means the pod crashed before we could stream logs.
|
||||
// Throwing here routes the caller into the error path with a structured
|
||||
// message instead of entering the log-streaming path with a dead pod.
|
||||
if (phase === "Failed") {
|
||||
throw new Error(describePodTerminatedError(podName, phase, containerStatuses));
|
||||
}
|
||||
// Init containers done + main running (phase may still say Pending briefly)
|
||||
const allInitsDone = initStatuses.length > 0 && initStatuses.every((s) => s.state?.terminated?.exitCode === 0);
|
||||
const mainRunning = containerStatuses.some((s) => s.state?.running);
|
||||
@@ -104,16 +210,32 @@ async function waitForPod(namespace, jobName, timeoutMs, onLog, kubeconfigPath)
|
||||
* Stream pod logs once via follow. Returns accumulated stdout when the
|
||||
* stream ends (container exit, API disconnect, or abort signal).
|
||||
*/
|
||||
async function streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds) {
|
||||
async function streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal) {
|
||||
const logApi = getLogApi(kubeconfigPath);
|
||||
const chunks = [];
|
||||
const writable = new Writable({
|
||||
write(chunk, _encoding, callback) {
|
||||
const text = chunk.toString("utf-8");
|
||||
chunks.push(text);
|
||||
void onLog("stdout", text).then(() => callback(), callback);
|
||||
const emitted = dedup ? dedup.filter(text) : text;
|
||||
if (!emitted) {
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
void onLog("stdout", emitted).then(() => callback(), callback);
|
||||
},
|
||||
});
|
||||
// When the job completion signal fires, destroy the writable to abort the
|
||||
// in-flight follow stream. Without this, logApi.log can hang indefinitely
|
||||
// when the pod terminates without closing the HTTP connection cleanly.
|
||||
let stopPoller = null;
|
||||
if (stopSignal) {
|
||||
stopPoller = setInterval(() => {
|
||||
if (stopSignal.stopped && !writable.destroyed) {
|
||||
writable.destroy();
|
||||
}
|
||||
}, 200);
|
||||
}
|
||||
try {
|
||||
await logApi.log(namespace, podName, "claude", writable, {
|
||||
follow: true,
|
||||
@@ -122,8 +244,12 @@ async function streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinc
|
||||
});
|
||||
}
|
||||
catch {
|
||||
// follow may fail if the container already exited or the API
|
||||
// connection dropped — not fatal, caller decides whether to retry.
|
||||
// follow may fail if the container already exited, the API connection
|
||||
// dropped, or we aborted via writable.destroy() — not fatal.
|
||||
}
|
||||
finally {
|
||||
if (stopPoller)
|
||||
clearInterval(stopPoller);
|
||||
}
|
||||
return chunks.join("");
|
||||
}
|
||||
@@ -143,6 +269,9 @@ async function streamPodLogs(namespace, podName, onLog, kubeconfigPath, stopSign
|
||||
// reconnects use a tight window instead of an ever-growing one anchored
|
||||
// at stream start. This is the primary fix for FAR-105 duplicative logs.
|
||||
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
// Shared across reconnects so replayed lines inside the `sinceSeconds`
|
||||
// overlap window are dropped before they reach the streaming UI (FAR-123).
|
||||
const dedup = new LogLineDedupFilter();
|
||||
while (!stopSignal?.stopped) {
|
||||
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
|
||||
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
|
||||
@@ -158,7 +287,7 @@ async function streamPodLogs(namespace, podName, onLog, kubeconfigPath, stopSign
|
||||
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
|
||||
}
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
@@ -177,6 +306,11 @@ async function streamPodLogs(namespace, podName, onLog, kubeconfigPath, stopSign
|
||||
// Brief pause before reconnecting to avoid tight loops.
|
||||
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
|
||||
}
|
||||
// Flush any buffered partial line so the final assistant/result chunk
|
||||
// isn't dropped when the stream ends mid-line.
|
||||
const tail = dedup.flush();
|
||||
if (tail)
|
||||
await onLog("stdout", tail);
|
||||
return allChunks.join("");
|
||||
}
|
||||
/**
|
||||
@@ -199,13 +333,27 @@ async function readPodLogs(namespace, podName, kubeconfigPath) {
|
||||
}
|
||||
/**
|
||||
* Wait for the Job to reach a terminal state (Complete or Failed).
|
||||
* Returns the Job's final status.
|
||||
* Returns the Job's final status. A 404 (job deleted by TTL or externally)
|
||||
* is treated as a soft terminal: succeeded=false, timedOut=false, jobGone=true.
|
||||
* The caller should log this and fall through to stdout parsing.
|
||||
*/
|
||||
async function waitForJobCompletion(namespace, jobName, timeoutMs, kubeconfigPath) {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : 0;
|
||||
while (deadline === 0 || Date.now() < deadline) {
|
||||
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
|
||||
let job;
|
||||
try {
|
||||
job = await batchApi.readNamespacedJob({ name: jobName, namespace });
|
||||
}
|
||||
catch (err) {
|
||||
if (isK8s404(err)) {
|
||||
// Job was deleted (TTL garbage collection or external deletion) before
|
||||
// we detected its terminal condition. The container must have already
|
||||
// exited for TTL to fire, so log streaming will have captured the output.
|
||||
return { succeeded: false, timedOut: false, jobGone: true };
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
const conditions = job.status?.conditions ?? [];
|
||||
const complete = conditions.find((c) => c.type === "Complete" && c.status === "True");
|
||||
if (complete)
|
||||
@@ -261,10 +409,18 @@ export async function execute(ctx) {
|
||||
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
|
||||
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
|
||||
// still be running. We detect those by comparing the Job's run-id label against
|
||||
// the current runId and clean them up so this execution can proceed.
|
||||
// the current runId. When reattachOrphanedJobs is enabled and the orphan matches
|
||||
// the current agent+task+session, we attach to it instead of deleting it (FAR-124).
|
||||
const agentId = ctx.agent.id;
|
||||
const selfPod = await getSelfPodInfo(kubeconfigPath);
|
||||
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
|
||||
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
|
||||
const runtimeSessionParams = parseObject(runtime.sessionParams);
|
||||
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
|
||||
const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null;
|
||||
const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, "");
|
||||
const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null;
|
||||
let reattachTarget = null;
|
||||
try {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
const existing = await batchApi.listNamespacedJob({
|
||||
@@ -277,10 +433,37 @@ export async function execute(ctx) {
|
||||
// concurrent jobs (same runId — shouldn't happen but guard defensively).
|
||||
const orphaned = running.filter((j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") !== runId);
|
||||
const samRun = running.filter((j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId);
|
||||
if (orphaned.length > 0) {
|
||||
const orphanNames = orphaned.map((j) => j.metadata?.name).join(", ");
|
||||
await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
|
||||
for (const j of orphaned) {
|
||||
// Pick the most recent reattachable orphan — same agent + task + session,
|
||||
// not terminal. Only one target is chosen; any other orphans get
|
||||
// cleaned up as before.
|
||||
if (reattachOrphanedJobs && orphaned.length > 0) {
|
||||
const candidates = orphaned
|
||||
.filter((j) => isReattachableOrphan(j, {
|
||||
agentId,
|
||||
taskId: currentTaskLabel,
|
||||
sessionId: currentSessionLabel,
|
||||
}))
|
||||
.sort((a, b) => {
|
||||
const at = new Date(a.metadata?.creationTimestamp ?? 0).getTime();
|
||||
const bt = new Date(b.metadata?.creationTimestamp ?? 0).getTime();
|
||||
return bt - at;
|
||||
});
|
||||
const chosen = candidates[0];
|
||||
const chosenName = chosen?.metadata?.name;
|
||||
if (chosen && chosenName) {
|
||||
reattachTarget = {
|
||||
jobName: chosenName,
|
||||
namespace: chosen.metadata?.namespace ?? guardNamespace,
|
||||
priorRunId: chosen.metadata?.labels?.["paperclip.io/run-id"] ?? "",
|
||||
image: chosen.spec?.template?.spec?.containers?.[0]?.image ?? "unknown",
|
||||
};
|
||||
}
|
||||
}
|
||||
const toDelete = orphaned.filter((j) => !reattachTarget || j.metadata?.name !== reattachTarget.jobName);
|
||||
if (toDelete.length > 0) {
|
||||
const orphanNames = toDelete.map((j) => j.metadata?.name).join(", ");
|
||||
await onLog("stdout", `[paperclip] Cleaning up ${toDelete.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
|
||||
for (const j of toDelete) {
|
||||
const name = j.metadata?.name;
|
||||
if (name) {
|
||||
await cleanupJob(guardNamespace, name, onLog, kubeconfigPath);
|
||||
@@ -317,81 +500,114 @@ export async function execute(ctx) {
|
||||
errorCode: "k8s_concurrency_guard_unreachable",
|
||||
};
|
||||
}
|
||||
// Build Job manifest
|
||||
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
|
||||
ctx,
|
||||
selfPod,
|
||||
});
|
||||
// Report invocation metadata
|
||||
if (onMeta) {
|
||||
await onMeta({
|
||||
adapterType: "claude_k8s",
|
||||
command: `kubectl job/${jobName}`,
|
||||
cwd: namespace,
|
||||
commandArgs: claudeArgs,
|
||||
commandNotes: [
|
||||
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
|
||||
`Namespace: ${namespace}`,
|
||||
`Timeout: ${timeoutSec}s`,
|
||||
],
|
||||
prompt,
|
||||
...(promptMetrics ? { promptMetrics } : {}),
|
||||
context: ctx.context,
|
||||
});
|
||||
}
|
||||
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
|
||||
// PodSpec limit). The Secret is cleaned up in the finally block.
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
if (promptSecret) {
|
||||
try {
|
||||
await coreApi.createNamespacedSecret({
|
||||
namespace: promptSecret.namespace,
|
||||
body: {
|
||||
apiVersion: "v1",
|
||||
kind: "Secret",
|
||||
metadata: {
|
||||
name: promptSecret.name,
|
||||
namespace: promptSecret.namespace,
|
||||
labels: {
|
||||
"app.kubernetes.io/managed-by": "paperclip",
|
||||
"paperclip.io/adapter-type": "claude_k8s",
|
||||
"paperclip.io/run-id": runId,
|
||||
},
|
||||
},
|
||||
stringData: promptSecret.data,
|
||||
},
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
let jobName;
|
||||
let namespace;
|
||||
let promptSecret = null;
|
||||
if (reattachTarget) {
|
||||
jobName = reattachTarget.jobName;
|
||||
namespace = reattachTarget.namespace;
|
||||
// Announce reattach metadata. Prompt and args aren't known here — they
|
||||
// belong to the prior run that created this pod and are already present
|
||||
// on the running container.
|
||||
if (onMeta) {
|
||||
await onMeta({
|
||||
adapterType: "claude_k8s",
|
||||
command: `kubectl job/${jobName}`,
|
||||
cwd: namespace,
|
||||
commandArgs: [],
|
||||
commandNotes: [
|
||||
`Image: ${reattachTarget.image}`,
|
||||
`Namespace: ${namespace}`,
|
||||
`Reattached from prior run: ${reattachTarget.priorRunId || "unknown"}`,
|
||||
`Timeout: ${timeoutSec}s`,
|
||||
],
|
||||
prompt: "",
|
||||
context: ctx.context,
|
||||
});
|
||||
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
|
||||
}
|
||||
await onLog("stdout", `[paperclip] Reattaching to in-flight K8s Job ${jobName} in namespace ${namespace} (prior run ${reattachTarget.priorRunId || "unknown"})\n`);
|
||||
}
|
||||
else {
|
||||
// Build Job manifest
|
||||
const built = buildJobManifest({ ctx, selfPod });
|
||||
const job = built.job;
|
||||
jobName = built.jobName;
|
||||
namespace = built.namespace;
|
||||
const prompt = built.prompt;
|
||||
const claudeArgs = built.claudeArgs;
|
||||
const promptMetrics = built.promptMetrics;
|
||||
promptSecret = built.promptSecret;
|
||||
// Report invocation metadata
|
||||
if (onMeta) {
|
||||
await onMeta({
|
||||
adapterType: "claude_k8s",
|
||||
command: `kubectl job/${jobName}`,
|
||||
cwd: namespace,
|
||||
commandArgs: claudeArgs,
|
||||
commandNotes: [
|
||||
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
|
||||
`Namespace: ${namespace}`,
|
||||
`Timeout: ${timeoutSec}s`,
|
||||
],
|
||||
prompt,
|
||||
...(promptMetrics ? { promptMetrics } : {}),
|
||||
context: ctx.context,
|
||||
});
|
||||
}
|
||||
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
|
||||
// PodSpec limit). The Secret is cleaned up in the finally block.
|
||||
if (promptSecret) {
|
||||
try {
|
||||
await coreApi.createNamespacedSecret({
|
||||
namespace: promptSecret.namespace,
|
||||
body: {
|
||||
apiVersion: "v1",
|
||||
kind: "Secret",
|
||||
metadata: {
|
||||
name: promptSecret.name,
|
||||
namespace: promptSecret.namespace,
|
||||
labels: {
|
||||
"app.kubernetes.io/managed-by": "paperclip",
|
||||
"paperclip.io/adapter-type": "claude_k8s",
|
||||
"paperclip.io/run-id": runId,
|
||||
},
|
||||
},
|
||||
stringData: promptSecret.data,
|
||||
},
|
||||
});
|
||||
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
|
||||
}
|
||||
catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Failed to create prompt Secret: ${msg}`,
|
||||
errorCode: "k8s_prompt_secret_create_failed",
|
||||
};
|
||||
}
|
||||
}
|
||||
// Create the Job
|
||||
try {
|
||||
await batchApi.createNamespacedJob({ namespace, body: job });
|
||||
}
|
||||
catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
|
||||
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Failed to create prompt Secret: ${msg}`,
|
||||
errorCode: "k8s_prompt_secret_create_failed",
|
||||
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
|
||||
errorCode: "k8s_job_create_failed",
|
||||
};
|
||||
}
|
||||
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
|
||||
}
|
||||
// Create the Job
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
try {
|
||||
await batchApi.createNamespacedJob({ namespace, body: job });
|
||||
}
|
||||
catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
|
||||
errorCode: "k8s_job_create_failed",
|
||||
};
|
||||
}
|
||||
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
|
||||
let stdout = "";
|
||||
let exitCode = null;
|
||||
let jobTimedOut = false;
|
||||
@@ -404,8 +620,24 @@ export async function execute(ctx) {
|
||||
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
|
||||
let podName;
|
||||
try {
|
||||
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
|
||||
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
|
||||
if (reattachTarget) {
|
||||
// Pod is already running from the prior run — look it up directly.
|
||||
const podList = await coreApi.listNamespacedPod({
|
||||
namespace,
|
||||
labelSelector: `job-name=${jobName}`,
|
||||
});
|
||||
const pod = podList.items[0];
|
||||
const name = pod?.metadata?.name;
|
||||
if (!name) {
|
||||
throw new Error(`Reattach target Job ${jobName} has no pod`);
|
||||
}
|
||||
podName = name;
|
||||
await onLog("stdout", `[paperclip] Reattached to pod ${podName}\n`);
|
||||
}
|
||||
else {
|
||||
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
|
||||
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
|
||||
}
|
||||
// Notify the server that execution has started. This sets
|
||||
// processStartedAt and refreshes updatedAt in the DB, which the
|
||||
// stale-run reaper (reapOrphanedRuns) uses to decide liveness.
|
||||
@@ -419,13 +651,14 @@ export async function execute(ctx) {
|
||||
}
|
||||
catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Pod scheduling failed: ${msg}\n`);
|
||||
const phase = reattachTarget ? "reattach" : "scheduling";
|
||||
await onLog("stderr", `[paperclip] Pod ${phase} failed: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Pod scheduling failed: ${msg}`,
|
||||
errorCode: "k8s_pod_schedule_failed",
|
||||
errorMessage: `Pod ${phase} failed: ${msg}`,
|
||||
errorCode: reattachTarget ? "k8s_pod_reattach_failed" : "k8s_pod_schedule_failed",
|
||||
};
|
||||
}
|
||||
// Stream logs and wait for completion concurrently.
|
||||
@@ -457,18 +690,32 @@ export async function execute(ctx) {
|
||||
let lastLogAt = Date.now();
|
||||
let keepaliveTick = 0;
|
||||
let keepaliveJobTerminal = false;
|
||||
let keepaliveJobTerminalAt = null;
|
||||
keepaliveTimer = setInterval(() => {
|
||||
// Fire-and-forget the async work; setInterval callbacks must be
|
||||
// synchronous or the timer will drift.
|
||||
void (async () => {
|
||||
if (keepaliveJobTerminal)
|
||||
if (keepaliveJobTerminal) {
|
||||
// Post-terminal window: keep refreshing onSpawn during cleanup
|
||||
// (job deletion, log parsing, K8s API calls) so the reaper doesn't
|
||||
// fire a false process_lost while execute() is still running.
|
||||
if (ctx.onSpawn &&
|
||||
keepaliveJobTerminalAt !== null &&
|
||||
Date.now() - keepaliveJobTerminalAt <= POST_TERMINAL_KEEPALIVE_MS) {
|
||||
keepaliveTick++;
|
||||
if (keepaliveTick % 6 === 0) {
|
||||
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => { });
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
// Verify the Job is still alive before announcing or refreshing.
|
||||
try {
|
||||
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
|
||||
const terminal = job.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True");
|
||||
if (terminal) {
|
||||
keepaliveJobTerminal = true;
|
||||
keepaliveJobTerminalAt = Date.now();
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -477,10 +724,9 @@ export async function execute(ctx) {
|
||||
// connection resets should NOT permanently disable the keepalive —
|
||||
// the next tick will re-check and the reaper uses the staleness
|
||||
// window as a safety net.
|
||||
const statusCode = err?.response?.statusCode
|
||||
?? err?.statusCode;
|
||||
if (statusCode === 404) {
|
||||
if (isK8s404(err)) {
|
||||
keepaliveJobTerminal = true;
|
||||
keepaliveJobTerminalAt = Date.now();
|
||||
return;
|
||||
}
|
||||
// Log transient errors but leave keepaliveJobTerminal false so
|
||||
@@ -525,23 +771,44 @@ export async function execute(ctx) {
|
||||
if (logResult.status === "fulfilled") {
|
||||
stdout = logResult.value;
|
||||
}
|
||||
// If the follow stream missed output (container exited quickly), do a
|
||||
// one-shot log read as fallback before the pod is cleaned up.
|
||||
if (!stdout.trim()) {
|
||||
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
|
||||
stdout = await readPodLogs(namespace, podName, kubeconfigPath);
|
||||
if (stdout.trim()) {
|
||||
// One-shot log fallback: handles two failure modes with a single read.
|
||||
// Mode 1 — empty stream: the follow stream returned nothing (fast exit before connection).
|
||||
// Mode 2 — partial stream: we have some output but no result event (follow stream raced
|
||||
// with container exit and captured only the init line before the connection dropped).
|
||||
// A one-shot readPodLogs is more reliable for already-terminated containers and reads
|
||||
// from the beginning of the log, giving us the full output.
|
||||
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
|
||||
// the authoritative parse happens once below after all fallbacks complete).
|
||||
const hasResultEvent = stdout.includes('"type":"result"');
|
||||
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
|
||||
if (needsOneShot) {
|
||||
if (!stdout.trim()) {
|
||||
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
|
||||
}
|
||||
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
|
||||
if (!stdout.trim() && oneShotLogs.trim()) {
|
||||
stdout = oneShotLogs;
|
||||
await onLog("stdout", stdout);
|
||||
}
|
||||
else if (oneShotLogs && oneShotLogs.length > stdout.length) {
|
||||
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
|
||||
stdout = oneShotLogs;
|
||||
}
|
||||
}
|
||||
if (completionResult.status === "fulfilled") {
|
||||
jobTimedOut = completionResult.value.timedOut;
|
||||
if (completionResult.value.jobGone) {
|
||||
// Job was deleted by TTL or externally before we observed the Complete/Failed
|
||||
// condition. The container must have exited first (TTL only fires after
|
||||
// completion), so log streaming has captured the full output — continue
|
||||
// to stdout parsing rather than returning an error.
|
||||
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// waitForJobCompletion threw — re-check job state to avoid returning
|
||||
// while the job is still running (which would cause UI staleness and
|
||||
// concurrency errors on retry). Use a bounded timeout (60s) so we
|
||||
// don't hang the heartbeat indefinitely if the K8s API is degraded.
|
||||
// waitForJobCompletion threw an unexpected error — re-check job state to
|
||||
// avoid returning while the job is still running. Use a bounded timeout
|
||||
// (60s) so we don't hang the heartbeat indefinitely if the K8s API is degraded.
|
||||
jobTimedOut = false;
|
||||
const RECHECK_TIMEOUT_MS = 60_000;
|
||||
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath);
|
||||
@@ -550,6 +817,11 @@ export async function execute(ctx) {
|
||||
// Return an error so the UI knows the run is not done.
|
||||
jobTimedOut = true;
|
||||
}
|
||||
else if (actualState.jobGone) {
|
||||
// Job was deleted before we could confirm terminal state — same as the
|
||||
// fulfilled+jobGone case above: proceed with captured output.
|
||||
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
|
||||
}
|
||||
else if (!actualState.succeeded) {
|
||||
// Job still not terminal — the completion error was likely transient.
|
||||
// Return an error so the UI knows the run is not done, rather than
|
||||
@@ -615,16 +887,11 @@ export async function execute(ctx) {
|
||||
};
|
||||
}
|
||||
if (!parsed) {
|
||||
const stderrLine = stdout.split(/\r?\n/).map((l) => l.trim()).find(Boolean) ?? "";
|
||||
return {
|
||||
exitCode,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: exitCode === 0
|
||||
? "Failed to parse Claude JSON output"
|
||||
: stderrLine
|
||||
? `Claude exited with code ${exitCode ?? -1}: ${stderrLine}`
|
||||
: `Claude exited with code ${exitCode ?? -1}`,
|
||||
errorMessage: buildPartialRunError(exitCode, parsedStream.model, stdout),
|
||||
resultJson: { stdout },
|
||||
};
|
||||
}
|
||||
@@ -636,8 +903,7 @@ export async function execute(ctx) {
|
||||
outputTokens: asNumber(usageObj.output_tokens, 0),
|
||||
};
|
||||
})();
|
||||
const runtimeSessionParams = parseObject(runtime.sessionParams);
|
||||
const fallbackSessionId = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
|
||||
const fallbackSessionId = currentSessionIdRaw;
|
||||
const resolvedSessionId = parsedStream.sessionId
|
||||
?? (asString(parsed.session_id, fallbackSessionId) || fallbackSessionId);
|
||||
const model = asString(config.model, "");
|
||||
|
||||
Reference in New Issue
Block a user