Compare commits

...

2 Commits

Author SHA1 Message Date
Chris Farhood e611f26d32 0.1.39 2026-04-24 15:20:59 +00:00
Chris Farhood f097440f3c feat: implement cancel support via keepalive poll and SIGTERM handler (FAR-26)
- Poll GET /api/heartbeat-runs/:runId on every keepalive tick (15s); when
  status != 'running', delete the K8s Job, set logStopSignal, and return
  errorCode='cancelled' — Job gone within ~15s of external cancellation.
- SIGTERM handler best-effort deletes all active Jobs/Secrets and re-emits
  the signal to let the process exit naturally.
- Export shouldAbortForCancellation() helper; add tests for helper, cancel
  poll path, and SIGTERM cleanup.
- Guard: PAPERCLIP_API_URL missing logs a warning and skips cancel polling;
  HTTP 5xx from poll treated as transient; reattach path skips cancel poll.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 15:20:45 +00:00
4 changed files with 383 additions and 12 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.38",
"version": "0.1.39",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.38",
"version": "0.1.39",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.38",
"version": "0.1.39",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+260 -1
View File
@@ -60,7 +60,7 @@ vi.mock("@paperclipai/adapter-utils/server-utils", async (importOriginal) => {
});
});
const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, execute } = await import("./execute.js");
const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, shouldAbortForCancellation, execute } = await import("./execute.js");
function makeJob(opts: {
runId?: string;
@@ -1220,3 +1220,262 @@ describe("execute: concurrency guard — multiple orphans", () => {
expect(result.errorMessage).toContain("different task");
});
});
// ─── shouldAbortForCancellation ──────────────────────────────────────────────
describe("shouldAbortForCancellation", () => {
it("returns false for undefined", () => {
expect(shouldAbortForCancellation(undefined)).toBe(false);
});
it("returns false for empty string", () => {
expect(shouldAbortForCancellation("")).toBe(false);
});
it("returns false when status is 'running'", () => {
expect(shouldAbortForCancellation("running")).toBe(false);
});
it("returns true when status is 'cancelled'", () => {
expect(shouldAbortForCancellation("cancelled")).toBe(true);
});
it("returns true when status is 'failed'", () => {
expect(shouldAbortForCancellation("failed")).toBe(true);
});
it("returns true when status is 'completed'", () => {
expect(shouldAbortForCancellation("completed")).toBe(true);
});
it("returns true for any non-running non-empty string", () => {
expect(shouldAbortForCancellation("unknown")).toBe(true);
});
});
// ─── execute: cancel-polling path ────────────────────────────────────────────
describe("execute: cancel-polling via keepalive tick", () => {
const mockFetch = vi.fn();
beforeEach(() => {
vi.resetAllMocks();
vi.useFakeTimers();
// Replace global fetch for this suite
vi.stubGlobal("fetch", mockFetch);
process.env.PAPERCLIP_API_URL = "http://paperclip-test.local";
mockReadSkillEntries.mockResolvedValue([]);
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
mockBatchListJobs.mockResolvedValue({ items: [] });
mockPrepareBundle.mockResolvedValue(makeBundle());
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } });
mockBatchPatchJob.mockResolvedValue({});
mockBatchDeleteJob.mockResolvedValue({});
mockCoreDeleteSecret.mockResolvedValue({});
mockCoreListPods
.mockResolvedValueOnce({
items: [{
metadata: { name: "pod-abc" },
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
}],
})
.mockResolvedValue({
items: [{
metadata: { name: "pod-abc" },
status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] },
}],
});
// Job never reaches terminal on its own (cancel kicks in first)
mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } });
// Log stream never ends (hung — simulates long-running Claude)
mockLogFn.mockImplementation(() => new Promise(() => { /* never resolves */ }));
});
afterEach(() => {
vi.useRealTimers();
vi.unstubAllGlobals();
delete process.env.PAPERCLIP_API_URL;
});
it("returns errorCode=cancelled when poll detects non-running status within one keepalive tick", async () => {
// Use a flag so readJob throws 404 only AFTER deleteJob is called (simulating
// K8s state where the job disappears after deletion).
let jobDeleted = false;
mockBatchDeleteJob.mockImplementation(async () => { jobDeleted = true; return {}; });
mockBatchReadJob.mockImplementation(async () => {
if (jobDeleted) {
throw Object.assign(new Error("Not Found"), { response: { statusCode: 404 } });
}
return { status: { conditions: [] } };
});
// Cancel poll returns "cancelled" status.
mockFetch.mockResolvedValue({
ok: true,
json: async () => ({ status: "cancelled" }),
});
const executePromise = execute(
makeCtx({ authToken: "tok-abc" } as Partial<AdapterExecutionContext>),
);
// Timer sequence:
// t=15100: keepalive fires → pre-check non-terminal → fetch → cancelled →
// deleteJob (jobDeleted=true) → stop signal set
// t=15300: stop poller fires (200ms) → destroys writable → starts 3s bail timer
// t=17100: completion watcher polls → 404 (jobDeleted=true) → jobGone → settles
// t=18300: bail timer fires → streamPodLogsOnce returns → streamPodLogs exits →
// trackedLogStream settles → Promise.allSettled resolves
await vi.advanceTimersByTimeAsync(15_100); // keepalive fires → cancel detected
await vi.advanceTimersByTimeAsync(2_100); // completion watcher polls → 404 → settles
await vi.advanceTimersByTimeAsync(3_100); // bail timer fires → log stream settles
const result = await executePromise;
expect(result.errorCode).toBe("cancelled");
expect(result.errorMessage).toBe("Run cancelled");
expect(result.timedOut).toBe(false);
expect(mockBatchDeleteJob).toHaveBeenCalled();
});
it("treats HTTP 500 on cancel poll as transient and does not cancel", async () => {
// Cancel poll returns 500 → transient, should not cancel.
// After a while the job completes normally.
mockFetch.mockResolvedValue({ ok: false, status: 500 });
// Override: job completes after keepalive tick fires
mockBatchReadJob
.mockResolvedValueOnce({ status: { conditions: [] } }) // first keepalive check: non-terminal
.mockResolvedValue({ status: { conditions: [{ type: "Complete", status: "True" }] } });
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
writable.write(CLAUDE_HAPPY_OUTPUT);
},
);
const executePromise = execute(
makeCtx({ authToken: "tok-abc" } as Partial<AdapterExecutionContext>),
);
await vi.advanceTimersByTimeAsync(15_100); // keepalive fires: 500 → transient, no cancel
await vi.advanceTimersByTimeAsync(3_100); // log reconnect sleep → stopSignal already true
const result = await executePromise;
expect(result.errorCode).toBeUndefined();
expect(result.exitCode).toBe(0);
expect(result.sessionId).toBe("sess_test123");
});
it("skips cancel poll when authToken is absent", async () => {
// No authToken → cancel poll must not be attempted → job completes normally
mockBatchReadJob.mockResolvedValue({
status: { conditions: [{ type: "Complete", status: "True" }] },
});
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
writable.write(CLAUDE_HAPPY_OUTPUT);
},
);
const executePromise = execute(makeCtx()); // no authToken
await vi.advanceTimersByTimeAsync(3_100);
const result = await executePromise;
expect(mockFetch).not.toHaveBeenCalled();
expect(result.exitCode).toBe(0);
});
it("skips cancel poll when PAPERCLIP_API_URL is not set", async () => {
delete process.env.PAPERCLIP_API_URL;
mockBatchReadJob.mockResolvedValue({
status: { conditions: [{ type: "Complete", status: "True" }] },
});
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
writable.write(CLAUDE_HAPPY_OUTPUT);
},
);
const executePromise = execute(
makeCtx({ authToken: "tok-abc" } as Partial<AdapterExecutionContext>),
);
await vi.advanceTimersByTimeAsync(3_100);
const result = await executePromise;
expect(mockFetch).not.toHaveBeenCalled();
expect(result.exitCode).toBe(0);
});
});
// ─── execute: SIGTERM handler ─────────────────────────────────────────────────
describe("execute: SIGTERM handler best-effort cleanup", () => {
beforeEach(() => {
vi.resetAllMocks();
vi.useFakeTimers();
mockReadSkillEntries.mockResolvedValue([]);
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
mockBatchListJobs.mockResolvedValue({ items: [] });
mockPrepareBundle.mockResolvedValue(makeBundle());
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } });
mockBatchPatchJob.mockResolvedValue({});
mockBatchDeleteJob.mockResolvedValue({});
mockCoreDeleteSecret.mockResolvedValue({});
mockCoreListPods
.mockResolvedValueOnce({
items: [{
metadata: { name: "pod-abc" },
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
}],
})
.mockResolvedValue({
items: [{
metadata: { name: "pod-abc" },
status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] },
}],
});
mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } });
mockLogFn.mockImplementation(() => new Promise(() => { /* never resolves */ }));
});
afterEach(() => {
vi.useRealTimers();
});
it("deletes the active Job when SIGTERM fires during execution", async () => {
// Mock process.kill to prevent the test process from actually being killed.
const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true);
// Start execute() and suppress unhandled rejection (we won't await it).
const executePromise = execute(makeCtx());
executePromise.catch(() => {});
// Flush microtasks through the async setup chain: getSelfPodInfo, listJobs,
// readSkillEntries, prepareBundle, createJob, onLog, activeJobs.add(), and
// ensureSigtermHandler() all complete before the try block enters streaming.
// 30 rounds is more than enough for the ~7 sequential await points.
for (let i = 0; i < 30; i++) await Promise.resolve();
// Emit SIGTERM — the process.once handler fires synchronously and kicks off
// async cleanup (deleteNamespacedJob). The mock resolves immediately.
process.emit("SIGTERM");
// Flush microtasks for deleteJob to resolve and the .then(process.kill) to run.
for (let i = 0; i < 10; i++) await Promise.resolve();
expect(mockBatchDeleteJob).toHaveBeenCalled();
expect(killSpy).toHaveBeenCalledWith(process.pid, "SIGTERM");
killSpy.mockRestore();
// afterEach calls vi.useRealTimers() which clears all pending fake timers,
// so we do not need to settle executePromise.
});
});
+120 -8
View File
@@ -39,6 +39,48 @@ const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
// minutes, causing stale "running" status in the UI (FAR-23).
const LOG_EXIT_COMPLETION_GRACE_MS = 30_000;
// Module-level tracking of active Jobs for SIGTERM best-effort cleanup.
interface ActiveJobRef {
namespace: string;
jobName: string;
promptSecretName?: string;
promptSecretNamespace?: string;
kubeconfigPath?: string;
}
const activeJobs = new Set<ActiveJobRef>();
let sigtermHandlerRegistered = false;
function ensureSigtermHandler(): void {
if (sigtermHandlerRegistered) return;
sigtermHandlerRegistered = true;
process.once("SIGTERM", () => {
const jobs = [...activeJobs];
void Promise.allSettled(
jobs.map(async (ref) => {
try {
const batchApi = getBatchApi(ref.kubeconfigPath);
await batchApi.deleteNamespacedJob({
name: ref.jobName,
namespace: ref.namespace,
body: { propagationPolicy: "Background" },
});
} catch { /* best-effort */ }
if (ref.promptSecretName && ref.promptSecretNamespace) {
try {
const coreApi = getCoreApi(ref.kubeconfigPath);
await coreApi.deleteNamespacedSecret({
name: ref.promptSecretName,
namespace: ref.promptSecretNamespace,
});
} catch { /* best-effort */ }
}
}),
).then(() => {
process.kill(process.pid, "SIGTERM");
});
});
}
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
@@ -53,6 +95,16 @@ export function isK8s404(err: unknown): boolean {
return /HTTP-Code:\s*404\b/.test(err.message);
}
/**
* Returns true when the heartbeat-run status indicates the run is no longer
* active and the K8s Job should be cancelled.
* Exported for unit tests.
*/
export function shouldAbortForCancellation(runStatus: string | undefined): boolean {
if (!runStatus) return false;
return runStatus !== "running";
}
/**
* Build the error message when Claude's stdout contains no result event.
* Skips system/init event lines so the UI doesn't display the raw init JSON.
@@ -546,6 +598,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const graceSec = asNumber(config.graceSec, 60);
const retainJobs = asBoolean(config.retainJobs, false);
const kubeconfigPath = asString(config.kubeconfig, "") || undefined;
const paperclipApiUrl = process.env.PAPERCLIP_API_URL ?? "";
if (!paperclipApiUrl) {
await onLog("stderr", "[paperclip] Warning: PAPERCLIP_API_URL not set — cancel polling disabled\n");
}
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
@@ -905,6 +961,15 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// delete a job that is still alive and the UI is waiting on.
let skipCleanup = false;
const activeJobRef: ActiveJobRef = {
namespace,
jobName,
...(promptSecret ? { promptSecretName: promptSecret.name, promptSecretNamespace: promptSecret.namespace } : {}),
kubeconfigPath,
};
activeJobs.add(activeJobRef);
ensureSigtermHandler();
try {
// Wait for pod to be ready for log streaming
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
@@ -953,11 +1018,21 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let lastLogAt = Date.now();
let keepaliveJobTerminal = false;
let consecutiveTerminalReadings = 0;
// Shared signal: when job completion resolves, tell the log streamer to
// stop reconnecting. Declared before keepaliveTimer so the cancel path
// inside the timer can set it without temporal dead zone issues.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
// Set when the run is externally cancelled (cancel-poll path).
let cancelled = false;
keepaliveTimer = setInterval(() => {
// Fire-and-forget the async work; setInterval callbacks must be
// synchronous or the timer will drift.
void (async () => {
if (keepaliveJobTerminal) return;
if (keepaliveJobTerminal || cancelled) return;
// Verify the Job is still alive before announcing or refreshing.
// Require two consecutive terminal readings before latching to
@@ -992,6 +1067,37 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return;
}
// Cancel-polling: check if the Paperclip run was cancelled externally.
// Skipped on the reattach path to avoid tearing down an adopted Job.
// HTTP non-2xx is treated as transient — never interpret a 5xx as cancel.
if (!reattachTarget && paperclipApiUrl && ctx.authToken) {
try {
const resp = await fetch(`${paperclipApiUrl}/api/heartbeat-runs/${runId}`, {
headers: { Authorization: `Bearer ${ctx.authToken}` },
});
if (resp.ok) {
const data = await resp.json() as { status?: string };
if (shouldAbortForCancellation(data.status)) {
void onLog("stdout", `[paperclip] Run cancelled externally — deleting Job ${jobName}\n`).catch(() => {});
cancelled = true;
logStopSignal.stopped = true;
try {
await batchApi.deleteNamespacedJob({
name: jobName,
namespace,
body: { propagationPolicy: "Background" },
});
} catch { /* best-effort — completion watcher will see 404 and settle */ }
return;
}
} else if (resp.status >= 500) {
void onLog("stderr", `[paperclip] keepalive: cancel poll returned HTTP ${resp.status} — transient, ignoring\n`).catch(() => {});
}
} catch {
// network error — transient, skip this tick
}
}
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {});
})();
@@ -1001,13 +1107,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return onLog(stream, chunk);
};
// Shared signal: when job completion resolves, tell the log
// streamer to stop reconnecting.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
// Track when the log stream first exits so the grace-period can fire
// if the K8s Job condition lags behind container exit (FAR-23).
// Set via onFirstStreamExit callback (called after attempt=0 returns)
@@ -1067,6 +1166,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
keepaliveTimer = null;
}
// If the run was externally cancelled, return a clean cancelled result
// without processing stdout (the finally block still runs for cleanup).
if (cancelled) {
return {
exitCode: null,
signal: null,
timedOut: false,
errorCode: "cancelled",
errorMessage: "Run cancelled",
};
}
if (logResult.status === "fulfilled") {
stdout = logResult.value;
}
@@ -1141,6 +1252,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
exitCode = await getPodExitCode(namespace, jobName, kubeconfigPath);
} finally {
if (keepaliveTimer) clearInterval(keepaliveTimer);
activeJobs.delete(activeJobRef);
if (skipCleanup) {
await onLog("stdout", `[paperclip] Retaining job ${jobName} (state mismatch — UI is waiting on it)\n`);
} else if (!retainJobs) {