diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index d92e5c9..bca0a60 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -60,7 +60,7 @@ vi.mock("@paperclipai/adapter-utils/server-utils", async (importOriginal) => { }); }); -const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, execute } = await import("./execute.js"); +const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, shouldAbortForCancellation, execute } = await import("./execute.js"); function makeJob(opts: { runId?: string; @@ -1220,3 +1220,262 @@ describe("execute: concurrency guard — multiple orphans", () => { expect(result.errorMessage).toContain("different task"); }); }); + +// ─── shouldAbortForCancellation ────────────────────────────────────────────── + +describe("shouldAbortForCancellation", () => { + it("returns false for undefined", () => { + expect(shouldAbortForCancellation(undefined)).toBe(false); + }); + + it("returns false for empty string", () => { + expect(shouldAbortForCancellation("")).toBe(false); + }); + + it("returns false when status is 'running'", () => { + expect(shouldAbortForCancellation("running")).toBe(false); + }); + + it("returns true when status is 'cancelled'", () => { + expect(shouldAbortForCancellation("cancelled")).toBe(true); + }); + + it("returns true when status is 'failed'", () => { + expect(shouldAbortForCancellation("failed")).toBe(true); + }); + + it("returns true when status is 'completed'", () => { + expect(shouldAbortForCancellation("completed")).toBe(true); + }); + + it("returns true for any non-running non-empty string", () => { + expect(shouldAbortForCancellation("unknown")).toBe(true); + }); +}); + +// ─── execute: cancel-polling path ──────────────────────────────────────────── + +describe("execute: cancel-polling via keepalive tick", () => { + const mockFetch = vi.fn(); + + beforeEach(() => { + vi.resetAllMocks(); + vi.useFakeTimers(); + // Replace global fetch for this suite + vi.stubGlobal("fetch", mockFetch); + process.env.PAPERCLIP_API_URL = "http://paperclip-test.local"; + + mockReadSkillEntries.mockResolvedValue([]); + mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); + mockBatchListJobs.mockResolvedValue({ items: [] }); + mockPrepareBundle.mockResolvedValue(makeBundle()); + mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } }); + mockBatchPatchJob.mockResolvedValue({}); + mockBatchDeleteJob.mockResolvedValue({}); + mockCoreDeleteSecret.mockResolvedValue({}); + + mockCoreListPods + .mockResolvedValueOnce({ + items: [{ + metadata: { name: "pod-abc" }, + status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, + }], + }) + .mockResolvedValue({ + items: [{ + metadata: { name: "pod-abc" }, + status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] }, + }], + }); + + // Job never reaches terminal on its own (cancel kicks in first) + mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); + + // Log stream never ends (hung — simulates long-running Claude) + mockLogFn.mockImplementation(() => new Promise(() => { /* never resolves */ })); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.unstubAllGlobals(); + delete process.env.PAPERCLIP_API_URL; + }); + + it("returns errorCode=cancelled when poll detects non-running status within one keepalive tick", async () => { + // Use a flag so readJob throws 404 only AFTER deleteJob is called (simulating + // K8s state where the job disappears after deletion). + let jobDeleted = false; + mockBatchDeleteJob.mockImplementation(async () => { jobDeleted = true; return {}; }); + mockBatchReadJob.mockImplementation(async () => { + if (jobDeleted) { + throw Object.assign(new Error("Not Found"), { response: { statusCode: 404 } }); + } + return { status: { conditions: [] } }; + }); + + // Cancel poll returns "cancelled" status. + mockFetch.mockResolvedValue({ + ok: true, + json: async () => ({ status: "cancelled" }), + }); + + const executePromise = execute( + makeCtx({ authToken: "tok-abc" } as Partial), + ); + + // Timer sequence: + // t=15100: keepalive fires → pre-check non-terminal → fetch → cancelled → + // deleteJob (jobDeleted=true) → stop signal set + // t=15300: stop poller fires (200ms) → destroys writable → starts 3s bail timer + // t=17100: completion watcher polls → 404 (jobDeleted=true) → jobGone → settles + // t=18300: bail timer fires → streamPodLogsOnce returns → streamPodLogs exits → + // trackedLogStream settles → Promise.allSettled resolves + await vi.advanceTimersByTimeAsync(15_100); // keepalive fires → cancel detected + await vi.advanceTimersByTimeAsync(2_100); // completion watcher polls → 404 → settles + await vi.advanceTimersByTimeAsync(3_100); // bail timer fires → log stream settles + + const result = await executePromise; + + expect(result.errorCode).toBe("cancelled"); + expect(result.errorMessage).toBe("Run cancelled"); + expect(result.timedOut).toBe(false); + expect(mockBatchDeleteJob).toHaveBeenCalled(); + }); + + it("treats HTTP 500 on cancel poll as transient and does not cancel", async () => { + // Cancel poll returns 500 → transient, should not cancel. + // After a while the job completes normally. + mockFetch.mockResolvedValue({ ok: false, status: 500 }); + + // Override: job completes after keepalive tick fires + mockBatchReadJob + .mockResolvedValueOnce({ status: { conditions: [] } }) // first keepalive check: non-terminal + .mockResolvedValue({ status: { conditions: [{ type: "Complete", status: "True" }] } }); + mockLogFn.mockImplementation( + async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => { + writable.write(CLAUDE_HAPPY_OUTPUT); + }, + ); + + const executePromise = execute( + makeCtx({ authToken: "tok-abc" } as Partial), + ); + + await vi.advanceTimersByTimeAsync(15_100); // keepalive fires: 500 → transient, no cancel + await vi.advanceTimersByTimeAsync(3_100); // log reconnect sleep → stopSignal already true + + const result = await executePromise; + + expect(result.errorCode).toBeUndefined(); + expect(result.exitCode).toBe(0); + expect(result.sessionId).toBe("sess_test123"); + }); + + it("skips cancel poll when authToken is absent", async () => { + // No authToken → cancel poll must not be attempted → job completes normally + mockBatchReadJob.mockResolvedValue({ + status: { conditions: [{ type: "Complete", status: "True" }] }, + }); + mockLogFn.mockImplementation( + async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => { + writable.write(CLAUDE_HAPPY_OUTPUT); + }, + ); + + const executePromise = execute(makeCtx()); // no authToken + + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(mockFetch).not.toHaveBeenCalled(); + expect(result.exitCode).toBe(0); + }); + + it("skips cancel poll when PAPERCLIP_API_URL is not set", async () => { + delete process.env.PAPERCLIP_API_URL; + + mockBatchReadJob.mockResolvedValue({ + status: { conditions: [{ type: "Complete", status: "True" }] }, + }); + mockLogFn.mockImplementation( + async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => { + writable.write(CLAUDE_HAPPY_OUTPUT); + }, + ); + + const executePromise = execute( + makeCtx({ authToken: "tok-abc" } as Partial), + ); + + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(mockFetch).not.toHaveBeenCalled(); + expect(result.exitCode).toBe(0); + }); +}); + +// ─── execute: SIGTERM handler ───────────────────────────────────────────────── + +describe("execute: SIGTERM handler best-effort cleanup", () => { + beforeEach(() => { + vi.resetAllMocks(); + vi.useFakeTimers(); + mockReadSkillEntries.mockResolvedValue([]); + mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); + mockBatchListJobs.mockResolvedValue({ items: [] }); + mockPrepareBundle.mockResolvedValue(makeBundle()); + mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } }); + mockBatchPatchJob.mockResolvedValue({}); + mockBatchDeleteJob.mockResolvedValue({}); + mockCoreDeleteSecret.mockResolvedValue({}); + mockCoreListPods + .mockResolvedValueOnce({ + items: [{ + metadata: { name: "pod-abc" }, + status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, + }], + }) + .mockResolvedValue({ + items: [{ + metadata: { name: "pod-abc" }, + status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] }, + }], + }); + mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); + mockLogFn.mockImplementation(() => new Promise(() => { /* never resolves */ })); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("deletes the active Job when SIGTERM fires during execution", async () => { + // Mock process.kill to prevent the test process from actually being killed. + const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true); + + // Start execute() and suppress unhandled rejection (we won't await it). + const executePromise = execute(makeCtx()); + executePromise.catch(() => {}); + + // Flush microtasks through the async setup chain: getSelfPodInfo, listJobs, + // readSkillEntries, prepareBundle, createJob, onLog, activeJobs.add(), and + // ensureSigtermHandler() all complete before the try block enters streaming. + // 30 rounds is more than enough for the ~7 sequential await points. + for (let i = 0; i < 30; i++) await Promise.resolve(); + + // Emit SIGTERM — the process.once handler fires synchronously and kicks off + // async cleanup (deleteNamespacedJob). The mock resolves immediately. + process.emit("SIGTERM"); + + // Flush microtasks for deleteJob to resolve and the .then(process.kill) to run. + for (let i = 0; i < 10; i++) await Promise.resolve(); + + expect(mockBatchDeleteJob).toHaveBeenCalled(); + expect(killSpy).toHaveBeenCalledWith(process.pid, "SIGTERM"); + + killSpy.mockRestore(); + // afterEach calls vi.useRealTimers() which clears all pending fake timers, + // so we do not need to settle executePromise. + }); +}); diff --git a/src/server/execute.ts b/src/server/execute.ts index 868d256..ea2e6f1 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -39,6 +39,48 @@ const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000; // minutes, causing stale "running" status in the UI (FAR-23). const LOG_EXIT_COMPLETION_GRACE_MS = 30_000; +// Module-level tracking of active Jobs for SIGTERM best-effort cleanup. +interface ActiveJobRef { + namespace: string; + jobName: string; + promptSecretName?: string; + promptSecretNamespace?: string; + kubeconfigPath?: string; +} +const activeJobs = new Set(); +let sigtermHandlerRegistered = false; + +function ensureSigtermHandler(): void { + if (sigtermHandlerRegistered) return; + sigtermHandlerRegistered = true; + process.once("SIGTERM", () => { + const jobs = [...activeJobs]; + void Promise.allSettled( + jobs.map(async (ref) => { + try { + const batchApi = getBatchApi(ref.kubeconfigPath); + await batchApi.deleteNamespacedJob({ + name: ref.jobName, + namespace: ref.namespace, + body: { propagationPolicy: "Background" }, + }); + } catch { /* best-effort */ } + if (ref.promptSecretName && ref.promptSecretNamespace) { + try { + const coreApi = getCoreApi(ref.kubeconfigPath); + await coreApi.deleteNamespacedSecret({ + name: ref.promptSecretName, + namespace: ref.promptSecretNamespace, + }); + } catch { /* best-effort */ } + } + }), + ).then(() => { + process.kill(process.pid, "SIGTERM"); + }); + }); +} + /** * Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node. * Works for both v0.x (response.statusCode) and v1.0+ (response.status, message). @@ -53,6 +95,16 @@ export function isK8s404(err: unknown): boolean { return /HTTP-Code:\s*404\b/.test(err.message); } +/** + * Returns true when the heartbeat-run status indicates the run is no longer + * active and the K8s Job should be cancelled. + * Exported for unit tests. + */ +export function shouldAbortForCancellation(runStatus: string | undefined): boolean { + if (!runStatus) return false; + return runStatus !== "running"; +} + /** * Build the error message when Claude's stdout contains no result event. * Skips system/init event lines so the UI doesn't display the raw init JSON. @@ -546,6 +598,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise { // Fire-and-forget the async work; setInterval callbacks must be // synchronous or the timer will drift. void (async () => { - if (keepaliveJobTerminal) return; + if (keepaliveJobTerminal || cancelled) return; // Verify the Job is still alive before announcing or refreshing. // Require two consecutive terminal readings before latching to @@ -992,6 +1067,37 @@ export async function execute(ctx: AdapterExecutionContext): Promise {}); + cancelled = true; + logStopSignal.stopped = true; + try { + await batchApi.deleteNamespacedJob({ + name: jobName, + namespace, + body: { propagationPolicy: "Background" }, + }); + } catch { /* best-effort — completion watcher will see 404 and settle */ } + return; + } + } else if (resp.status >= 500) { + void onLog("stderr", `[paperclip] keepalive: cancel poll returned HTTP ${resp.status} — transient, ignoring\n`).catch(() => {}); + } + } catch { + // network error — transient, skip this tick + } + } + const silenceSec = Math.round((Date.now() - lastLogAt) / 1000); void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {}); })(); @@ -1001,13 +1107,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise