feat: implement cancel support via keepalive poll and SIGTERM handler (FAR-26)

- Poll GET /api/heartbeat-runs/:runId on every keepalive tick (15s); when
  status != 'running', delete the K8s Job, set logStopSignal, and return
  errorCode='cancelled' — Job gone within ~15s of external cancellation.
- SIGTERM handler best-effort deletes all active Jobs/Secrets and re-emits
  the signal to let the process exit naturally.
- Export shouldAbortForCancellation() helper; add tests for helper, cancel
  poll path, and SIGTERM cleanup.
- Guard: PAPERCLIP_API_URL missing logs a warning and skips cancel polling;
  HTTP 5xx from poll treated as transient; reattach path skips cancel poll.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-04-24 15:20:45 +00:00
parent c55d6c61fc
commit f097440f3c
2 changed files with 380 additions and 9 deletions
+120 -8
View File
@@ -39,6 +39,48 @@ const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
// minutes, causing stale "running" status in the UI (FAR-23).
const LOG_EXIT_COMPLETION_GRACE_MS = 30_000;
// Module-level tracking of active Jobs for SIGTERM best-effort cleanup.
interface ActiveJobRef {
namespace: string;
jobName: string;
promptSecretName?: string;
promptSecretNamespace?: string;
kubeconfigPath?: string;
}
const activeJobs = new Set<ActiveJobRef>();
let sigtermHandlerRegistered = false;
function ensureSigtermHandler(): void {
if (sigtermHandlerRegistered) return;
sigtermHandlerRegistered = true;
process.once("SIGTERM", () => {
const jobs = [...activeJobs];
void Promise.allSettled(
jobs.map(async (ref) => {
try {
const batchApi = getBatchApi(ref.kubeconfigPath);
await batchApi.deleteNamespacedJob({
name: ref.jobName,
namespace: ref.namespace,
body: { propagationPolicy: "Background" },
});
} catch { /* best-effort */ }
if (ref.promptSecretName && ref.promptSecretNamespace) {
try {
const coreApi = getCoreApi(ref.kubeconfigPath);
await coreApi.deleteNamespacedSecret({
name: ref.promptSecretName,
namespace: ref.promptSecretNamespace,
});
} catch { /* best-effort */ }
}
}),
).then(() => {
process.kill(process.pid, "SIGTERM");
});
});
}
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
@@ -53,6 +95,16 @@ export function isK8s404(err: unknown): boolean {
return /HTTP-Code:\s*404\b/.test(err.message);
}
/**
* Returns true when the heartbeat-run status indicates the run is no longer
* active and the K8s Job should be cancelled.
* Exported for unit tests.
*/
export function shouldAbortForCancellation(runStatus: string | undefined): boolean {
if (!runStatus) return false;
return runStatus !== "running";
}
/**
* Build the error message when Claude's stdout contains no result event.
* Skips system/init event lines so the UI doesn't display the raw init JSON.
@@ -546,6 +598,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const graceSec = asNumber(config.graceSec, 60);
const retainJobs = asBoolean(config.retainJobs, false);
const kubeconfigPath = asString(config.kubeconfig, "") || undefined;
const paperclipApiUrl = process.env.PAPERCLIP_API_URL ?? "";
if (!paperclipApiUrl) {
await onLog("stderr", "[paperclip] Warning: PAPERCLIP_API_URL not set — cancel polling disabled\n");
}
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
@@ -905,6 +961,15 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// delete a job that is still alive and the UI is waiting on.
let skipCleanup = false;
const activeJobRef: ActiveJobRef = {
namespace,
jobName,
...(promptSecret ? { promptSecretName: promptSecret.name, promptSecretNamespace: promptSecret.namespace } : {}),
kubeconfigPath,
};
activeJobs.add(activeJobRef);
ensureSigtermHandler();
try {
// Wait for pod to be ready for log streaming
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
@@ -953,11 +1018,21 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let lastLogAt = Date.now();
let keepaliveJobTerminal = false;
let consecutiveTerminalReadings = 0;
// Shared signal: when job completion resolves, tell the log streamer to
// stop reconnecting. Declared before keepaliveTimer so the cancel path
// inside the timer can set it without temporal dead zone issues.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
// Set when the run is externally cancelled (cancel-poll path).
let cancelled = false;
keepaliveTimer = setInterval(() => {
// Fire-and-forget the async work; setInterval callbacks must be
// synchronous or the timer will drift.
void (async () => {
if (keepaliveJobTerminal) return;
if (keepaliveJobTerminal || cancelled) return;
// Verify the Job is still alive before announcing or refreshing.
// Require two consecutive terminal readings before latching to
@@ -992,6 +1067,37 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return;
}
// Cancel-polling: check if the Paperclip run was cancelled externally.
// Skipped on the reattach path to avoid tearing down an adopted Job.
// HTTP non-2xx is treated as transient — never interpret a 5xx as cancel.
if (!reattachTarget && paperclipApiUrl && ctx.authToken) {
try {
const resp = await fetch(`${paperclipApiUrl}/api/heartbeat-runs/${runId}`, {
headers: { Authorization: `Bearer ${ctx.authToken}` },
});
if (resp.ok) {
const data = await resp.json() as { status?: string };
if (shouldAbortForCancellation(data.status)) {
void onLog("stdout", `[paperclip] Run cancelled externally — deleting Job ${jobName}\n`).catch(() => {});
cancelled = true;
logStopSignal.stopped = true;
try {
await batchApi.deleteNamespacedJob({
name: jobName,
namespace,
body: { propagationPolicy: "Background" },
});
} catch { /* best-effort — completion watcher will see 404 and settle */ }
return;
}
} else if (resp.status >= 500) {
void onLog("stderr", `[paperclip] keepalive: cancel poll returned HTTP ${resp.status} — transient, ignoring\n`).catch(() => {});
}
} catch {
// network error — transient, skip this tick
}
}
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {});
})();
@@ -1001,13 +1107,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return onLog(stream, chunk);
};
// Shared signal: when job completion resolves, tell the log
// streamer to stop reconnecting.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
// Track when the log stream first exits so the grace-period can fire
// if the K8s Job condition lags behind container exit (FAR-23).
// Set via onFirstStreamExit callback (called after attempt=0 returns)
@@ -1067,6 +1166,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
keepaliveTimer = null;
}
// If the run was externally cancelled, return a clean cancelled result
// without processing stdout (the finally block still runs for cleanup).
if (cancelled) {
return {
exitCode: null,
signal: null,
timedOut: false,
errorCode: "cancelled",
errorMessage: "Run cancelled",
};
}
if (logResult.status === "fulfilled") {
stdout = logResult.value;
}
@@ -1141,6 +1252,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
exitCode = await getPodExitCode(namespace, jobName, kubeconfigPath);
} finally {
if (keepaliveTimer) clearInterval(keepaliveTimer);
activeJobs.delete(activeJobRef);
if (skipCleanup) {
await onLog("stdout", `[paperclip] Retaining job ${jobName} (state mismatch — UI is waiting on it)\n`);
} else if (!retainJobs) {