Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dae9e18659 | |||
| 6923597b31 | |||
| d184a1732b | |||
| be84428226 |
Generated
+2
-2
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.48",
|
||||
"version": "0.1.50",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.48",
|
||||
"version": "0.1.50",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@kubernetes/client-node": "^1.0.0",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.48",
|
||||
"version": "0.1.50",
|
||||
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
|
||||
"license": "MIT",
|
||||
"repository": {
|
||||
|
||||
@@ -1019,7 +1019,8 @@ describe("execute: happy path", () => {
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.errorCode).toBe("k8s_job_deleted_externally");
|
||||
expect(result.errorMessage).toBe("K8s Job was deleted externally before Claude could complete");
|
||||
expect(result.errorMessage).toMatch(/^K8s Job was deleted externally before Claude could complete \[/);
|
||||
expect(result.errorMessage).toContain("detected_via=");
|
||||
expect(result.exitCode).toBeNull();
|
||||
});
|
||||
|
||||
@@ -1770,7 +1771,7 @@ describe("execute: SIGTERM handler best-effort cleanup", () => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("deletes the active Job when SIGTERM fires during execution", async () => {
|
||||
it("does NOT delete active Jobs on SIGTERM — leaves them for orphan reattach (FAR-107)", async () => {
|
||||
// Mock process.kill to prevent the test process from actually being killed.
|
||||
const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true);
|
||||
|
||||
@@ -1781,17 +1782,19 @@ describe("execute: SIGTERM handler best-effort cleanup", () => {
|
||||
// Flush microtasks through the async setup chain: getSelfPodInfo, listJobs,
|
||||
// readSkillEntries, prepareBundle, createJob, onLog, activeJobs.add(), and
|
||||
// ensureSigtermHandler() all complete before the try block enters streaming.
|
||||
// 30 rounds is more than enough for the ~7 sequential await points.
|
||||
for (let i = 0; i < 30; i++) await Promise.resolve();
|
||||
|
||||
// Emit SIGTERM — the process.once handler fires synchronously and kicks off
|
||||
// async cleanup (deleteNamespacedJob). The mock resolves immediately.
|
||||
// Reset deleteJob spy after setup so we can detect any SIGTERM-driven calls.
|
||||
mockBatchDeleteJob.mockClear();
|
||||
|
||||
// Emit SIGTERM — the handler must re-raise to the default handler without
|
||||
// touching the K8s Job. Deleting the Job here would surface as
|
||||
// k8s_job_deleted_externally in the in-flight run (FAR-107).
|
||||
process.emit("SIGTERM");
|
||||
|
||||
// Flush microtasks for deleteJob to resolve and the .then(process.kill) to run.
|
||||
for (let i = 0; i < 10; i++) await Promise.resolve();
|
||||
|
||||
expect(mockBatchDeleteJob).toHaveBeenCalled();
|
||||
expect(mockBatchDeleteJob).not.toHaveBeenCalled();
|
||||
expect(killSpy).toHaveBeenCalledWith(process.pid, "SIGTERM");
|
||||
|
||||
killSpy.mockRestore();
|
||||
|
||||
+99
-30
@@ -58,30 +58,20 @@ function ensureSigtermHandler(): void {
|
||||
if (sigtermHandlerRegistered) return;
|
||||
sigtermHandlerRegistered = true;
|
||||
process.once("SIGTERM", () => {
|
||||
const jobs = [...activeJobs];
|
||||
void Promise.allSettled(
|
||||
jobs.map(async (ref) => {
|
||||
try {
|
||||
const batchApi = getBatchApi(ref.kubeconfigPath);
|
||||
await batchApi.deleteNamespacedJob({
|
||||
name: ref.jobName,
|
||||
namespace: ref.namespace,
|
||||
body: { propagationPolicy: "Background" },
|
||||
});
|
||||
} catch { /* best-effort */ }
|
||||
if (ref.promptSecretName && ref.promptSecretNamespace) {
|
||||
try {
|
||||
const coreApi = getCoreApi(ref.kubeconfigPath);
|
||||
await coreApi.deleteNamespacedSecret({
|
||||
name: ref.promptSecretName,
|
||||
namespace: ref.promptSecretNamespace,
|
||||
});
|
||||
} catch { /* best-effort */ }
|
||||
}
|
||||
}),
|
||||
).then(() => {
|
||||
process.kill(process.pid, "SIGTERM");
|
||||
});
|
||||
// Do NOT delete active K8s Jobs on SIGTERM (FAR-107). Paperclip itself
|
||||
// receives SIGTERM during rolling deploys, evictions, scale-down, etc.
|
||||
// Deleting the Jobs we own there causes the in-flight heartbeat to surface
|
||||
// a false-positive `k8s_job_deleted_externally` error and tears down work
|
||||
// the user expected to keep running.
|
||||
//
|
||||
// The correct behaviour with `reattachOrphanedJobs=true` (default) is to
|
||||
// leave the Jobs alive: the next paperclip process discovers them via the
|
||||
// orphan-classification path and reattaches their log streams. When
|
||||
// `reattachOrphanedJobs=false` the operator explicitly opted into manual
|
||||
// cleanup and should not have us auto-deleting either. The owning Job's
|
||||
// ownerReference (FAR-15) keeps the prompt Secret tied to the Job, so
|
||||
// both survive together and TTL cleans them up after natural completion.
|
||||
process.kill(process.pid, "SIGTERM");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -582,11 +572,14 @@ async function readPodLogs(
|
||||
* is treated as a soft terminal: succeeded=false, timedOut=false, jobGone=true.
|
||||
* The caller should log this and fall through to stdout parsing.
|
||||
*/
|
||||
type JobConditionSnapshot = { type?: string; status?: string; reason?: string; message?: string };
|
||||
|
||||
async function waitForJobCompletion(
|
||||
namespace: string,
|
||||
jobName: string,
|
||||
timeoutMs: number,
|
||||
kubeconfigPath?: string,
|
||||
observer?: { lastConditions: JobConditionSnapshot[] | null; pollCount: number },
|
||||
): Promise<{ succeeded: boolean; timedOut: boolean; jobGone?: boolean }> {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : 0;
|
||||
@@ -605,6 +598,12 @@ async function waitForJobCompletion(
|
||||
throw err;
|
||||
}
|
||||
const conditions = job.status?.conditions ?? [];
|
||||
if (observer) {
|
||||
observer.pollCount += 1;
|
||||
observer.lastConditions = conditions.map((c) => ({
|
||||
type: c.type, status: c.status, reason: c.reason, message: c.message,
|
||||
}));
|
||||
}
|
||||
|
||||
const complete = conditions.find((c) => c.type === "Complete" && c.status === "True");
|
||||
if (complete) return { succeeded: true, timedOut: false };
|
||||
@@ -1112,6 +1111,17 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// Set when the job disappeared (404) or grace-timer fired before we saw a
|
||||
// terminal condition — used to emit a clearer error when stdout parsing fails.
|
||||
let jobDeletedExternally = false;
|
||||
// Forensics for k8s_job_deleted_externally — captures which of the three
|
||||
// detection paths observed the 404, the last successful Job-condition read
|
||||
// before deletion, and timing. Surfaced in the error message so the next
|
||||
// occurrence is self-diagnosing instead of opaque (FAR-107).
|
||||
let jobGoneDetectionPath: string | null = null;
|
||||
let jobGoneAt: number | null = null;
|
||||
const jobObserver: { lastConditions: JobConditionSnapshot[] | null; pollCount: number } = {
|
||||
lastConditions: null,
|
||||
pollCount: 0,
|
||||
};
|
||||
let podRunningAt: number | null = null;
|
||||
|
||||
const activeJobRef: ActiveJobRef = {
|
||||
namespace,
|
||||
@@ -1144,6 +1154,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
|
||||
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
|
||||
}
|
||||
podRunningAt = Date.now();
|
||||
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
@@ -1279,7 +1290,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// while streamPodLogs reconnects, holding execute() open for minutes.
|
||||
// logStopSignal.stopped is set on every settled path (fulfilled, rejected,
|
||||
// or grace) so streamPodLogs stops reconnecting promptly.
|
||||
type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean };
|
||||
type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean; gracePeriodFired?: boolean };
|
||||
let gracePoller: ReturnType<typeof setInterval> | null = null;
|
||||
const completionWithGrace = new Promise<CompletionResult>((resolve, reject) => {
|
||||
let settled = false;
|
||||
@@ -1297,11 +1308,37 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
logStopSignal.stopped = true;
|
||||
reject(err);
|
||||
};
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr);
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr);
|
||||
gracePoller = setInterval(() => {
|
||||
if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) {
|
||||
void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, jobGone: true });
|
||||
// Stop the grace poller immediately so we don't double-fire while the
|
||||
// verification read below is in flight.
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
// The log stream exiting only means the container stopped producing
|
||||
// output — it does NOT prove the Job was deleted. Verify Job
|
||||
// presence with a one-shot read so we can distinguish:
|
||||
// (a) Job 404 → truly gone (TTL or external deletion)
|
||||
// (b) Job still present → K8s condition propagation lag (FAR-23)
|
||||
// Without this check we mis-classify (b) as "deleted externally" and
|
||||
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
|
||||
void (async () => {
|
||||
try {
|
||||
await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace });
|
||||
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
|
||||
} catch (err: unknown) {
|
||||
if (isK8s404(err)) {
|
||||
jobGoneDetectionPath = "grace-period-verify-404";
|
||||
jobGoneAt = Date.now();
|
||||
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago and Job ${jobName} is gone (TTL or external deletion) — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, jobGone: true });
|
||||
} else {
|
||||
// K8s API hiccup — bail out without claiming external deletion.
|
||||
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago; Job state unverifiable (${err instanceof Error ? err.message : String(err)}) — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
}, 1_000);
|
||||
});
|
||||
@@ -1369,6 +1406,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// completion), so log streaming has captured the full output — continue
|
||||
// to stdout parsing rather than returning an error.
|
||||
jobDeletedExternally = true;
|
||||
if (!jobGoneDetectionPath) {
|
||||
jobGoneDetectionPath = "completion-poll-404";
|
||||
jobGoneAt = Date.now();
|
||||
}
|
||||
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
|
||||
}
|
||||
} else {
|
||||
@@ -1377,7 +1418,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// (60s) so we don't hang the heartbeat indefinitely if the K8s API is degraded.
|
||||
jobTimedOut = false;
|
||||
const RECHECK_TIMEOUT_MS = 60_000;
|
||||
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath);
|
||||
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath, jobObserver);
|
||||
if (actualState.timedOut) {
|
||||
// Re-check itself timed out — the job may still be running.
|
||||
// Return an error so the UI knows the run is not done.
|
||||
@@ -1386,6 +1427,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// Job was deleted before we could confirm terminal state — same as the
|
||||
// fulfilled+jobGone case above: proceed with captured output.
|
||||
jobDeletedExternally = true;
|
||||
if (!jobGoneDetectionPath) {
|
||||
jobGoneDetectionPath = "recheck-poll-404";
|
||||
jobGoneAt = Date.now();
|
||||
}
|
||||
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
|
||||
} else if (!actualState.succeeded) {
|
||||
// Job still not terminal — the completion error was likely transient.
|
||||
@@ -1455,11 +1500,35 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
|
||||
if (!parsed) {
|
||||
if (jobDeletedExternally && exitCode === null) {
|
||||
// Forensic context (FAR-107): users sometimes see this error when nothing
|
||||
// actually deleted the Job manually. Surface enough state in the message
|
||||
// to distinguish self-delete (SIGTERM/cancel), TTL-after-completion, and
|
||||
// genuine external deletion without needing cluster shell access.
|
||||
const detailParts: string[] = [];
|
||||
if (jobGoneDetectionPath) detailParts.push(`detected_via=${jobGoneDetectionPath}`);
|
||||
detailParts.push(`job=${jobName}`);
|
||||
detailParts.push(`ns=${namespace}`);
|
||||
if (podRunningAt !== null && jobGoneAt !== null) {
|
||||
detailParts.push(`elapsed_since_pod_running=${Math.round((jobGoneAt - podRunningAt) / 1000)}s`);
|
||||
}
|
||||
detailParts.push(`completion_polls=${jobObserver.pollCount}`);
|
||||
const lastConds = jobObserver.lastConditions;
|
||||
if (lastConds && lastConds.length > 0) {
|
||||
const summary = lastConds
|
||||
.map((c) => `${c.type}=${c.status}${c.reason ? `(${c.reason})` : ""}`)
|
||||
.join(",");
|
||||
detailParts.push(`last_job_conditions=[${summary}]`);
|
||||
} else {
|
||||
detailParts.push("last_job_conditions=none_observed");
|
||||
}
|
||||
detailParts.push(`stdout_bytes=${stdout.length}`);
|
||||
const stdoutLines = stdout.split("\n").filter((l) => l.trim()).length;
|
||||
detailParts.push(`stdout_nonempty_lines=${stdoutLines}`);
|
||||
return {
|
||||
exitCode,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: "K8s Job was deleted externally before Claude could complete",
|
||||
errorMessage: `K8s Job was deleted externally before Claude could complete [${detailParts.join(", ")}]`,
|
||||
errorCode: "k8s_job_deleted_externally",
|
||||
resultJson: { stdout },
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user