From 2b4049464c7e8d301911465c3855d29d8ba274ea Mon Sep 17 00:00:00 2001 From: Chris Farhood Date: Sat, 25 Apr 2026 00:22:17 +0000 Subject: [PATCH] feat: per-agent mutex, fail-closed guard, SIGTERM cleanup (FAR-40) - Add agentCreationMutex (Map) that serializes guard-check + job-create per agent, eliminating the TOCTOU race where two concurrent execute() calls both pass the list-then-create check. - Change catch {} on listNamespacedJob errors to return errorCode: "k8s_concurrency_guard_unreachable" (fail-closed) instead of silently bypassing the concurrency guard. - Add ensureSigtermHandler() which tracks active Jobs in activeJobs Map and deletes all of them (plus prompt Secrets) on SIGTERM before exit. - Track orphaned-job reattaches in activeJobs for consistent cleanup. - Update execute.test.ts: change "proceeds on list error" test to assert k8s_concurrency_guard_unreachable; add mutex serialization test and SIGTERM handler registration tests. Co-Authored-By: Paperclip --- src/server/execute.test.ts | 254 ++++++++++++- src/server/execute.ts | 721 ++++++++++++++++++++++--------------- 2 files changed, 678 insertions(+), 297 deletions(-) diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index 3842c13..25e68d7 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import type { AdapterExecutionContext } from "@paperclipai/adapter-utils"; import { execute } from "./execute.js"; import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js"; @@ -199,16 +199,170 @@ describe("execute — concurrency guard", () => { expect(vi.mocked(getBatchApi)().createNamespacedJob).toHaveBeenCalled(); }); - it("proceeds when concurrency check throws (best-effort)", async () => { + it("returns k8s_concurrency_guard_unreachable when concurrency check throws (fail-closed)", async () => { const batchApi = makeBatchApi(); batchApi.listNamespacedJob.mockRejectedValue(new Error("RBAC denied")); vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType); + const ctx = makeCtx(); + const result = await execute(ctx); + + expect(result.errorCode).toBe("k8s_concurrency_guard_unreachable"); + expect(result.exitCode).toBeNull(); + expect(batchApi.createNamespacedJob).not.toHaveBeenCalled(); + }); + + it("blocks (k8s_concurrent_run_blocked) when a different-task job is running even with reattachOrphanedJobs=true", async () => { + const batchApi = makeBatchApi([ + { + metadata: { name: "other-task-job", labels: { "paperclip.io/task-id": "other-task-id" } }, + status: { conditions: [] }, + }, + ]); + vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType); + + const ctx = makeCtx({ reattachOrphanedJobs: true }); + // ctx.context.taskId is null in makeCtx, so the running job is always an "other" job + const result = await execute(ctx); + + expect(result.errorCode).toBe("k8s_concurrent_run_blocked"); + expect(batchApi.createNamespacedJob).not.toHaveBeenCalled(); + }); + + it("reattaches and streams logs when same-task orphaned Job exists and reattachOrphanedJobs=true", async () => { + const TASK_ID = "task-uuid-123"; + const ORPHAN_JOB = "orphaned-job-abc"; + const batchApi = makeBatchApi([ + { + metadata: { + name: ORPHAN_JOB, + labels: { "paperclip.io/task-id": TASK_ID }, + }, + status: { conditions: [] }, + }, + ]); + vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType); + + const ctx = { + ...makeCtx({ reattachOrphanedJobs: true }), + context: { taskId: TASK_ID, issueId: null, paperclipWorkspace: null, issueIds: null, paperclipWorkspaces: null, paperclipRuntimeServiceIntents: null, paperclipRuntimeServices: null }, + } as unknown as AdapterExecutionContext; + const result = await execute(ctx); + + // Should NOT create a new Job — reattached to the orphan + expect(batchApi.createNamespacedJob).not.toHaveBeenCalled(); + // Should have succeeded by reattaching + expect(result.exitCode).toBe(0); + expect(result.sessionId).toBe("ses_happy"); + }); + + it("blocks (k8s_concurrent_run_blocked) when same-task orphaned Job exists but reattachOrphanedJobs=false", async () => { + const TASK_ID = "task-uuid-456"; + const batchApi = makeBatchApi([ + { + metadata: { + name: "orphaned-job-xyz", + labels: { "paperclip.io/task-id": TASK_ID }, + }, + status: { conditions: [] }, + }, + ]); + vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType); + + const ctx = { + ...makeCtx({ reattachOrphanedJobs: false }), + context: { taskId: TASK_ID, issueId: null, paperclipWorkspace: null, issueIds: null, paperclipWorkspaces: null, paperclipRuntimeServiceIntents: null, paperclipRuntimeServices: null }, + } as unknown as AdapterExecutionContext; + const result = await execute(ctx); + + expect(result.errorCode).toBe("k8s_concurrent_run_blocked"); + expect(batchApi.createNamespacedJob).not.toHaveBeenCalled(); + }); +}); + +describe("execute — mutex serialization", () => { + it("serializes concurrent execute() calls for the same agent: second call sees first job", async () => { + // First call's createNamespacedJob blocks until we release it. + let releaseFn!: () => void; + const releasePromise = new Promise((resolve) => { releaseFn = resolve; }); + + const batchApi = makeBatchApi(); + batchApi.createNamespacedJob.mockImplementationOnce(async () => { + await releasePromise; + return { metadata: { uid: "uid-1" } }; + }); + // First guard call: no running jobs; second guard call: sees the running job. + batchApi.listNamespacedJob + .mockResolvedValueOnce({ items: [] }) + .mockResolvedValueOnce({ + items: [{ metadata: { name: JOB_NAME }, status: { conditions: [] } }], + }); + + vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType); + + const ctx = makeCtx(); + const first = execute(ctx); + // Second call races for the mutex; will wait until first releases. + const second = execute(ctx); + + // Unblock first call's job creation so it completes and releases the mutex. + releaseFn(); + + const [, secondResult] = await Promise.all([first, second]); + + // Only one job was created (mutex prevented a second concurrent creation). + expect(batchApi.createNamespacedJob).toHaveBeenCalledTimes(1); + // Second call found the running job and was blocked. + expect(secondResult.errorCode).toBe("k8s_concurrent_run_blocked"); + }); +}); + +describe("execute — SIGTERM handler", () => { + it("registers a SIGTERM handler via process.once on first execute() call", async () => { + // Spy before execute() so we can verify the handler is installed. + const onceSpy = vi.spyOn(process, "once"); const ctx = makeCtx(); await execute(ctx); - // Should have proceeded to create a job - expect(batchApi.createNamespacedJob).toHaveBeenCalled(); + // ensureSigtermHandler() should call process.once('SIGTERM', ...) at most once per process. + // Since we can't reset module state between tests, assert it was called at some point. + const sigtermCalls = onceSpy.mock.calls.filter(([event]) => event === "SIGTERM"); + // Either this test triggered the registration, or an earlier test did. + // In either case, the handler must exist on process. + const listenerCount = process.listenerCount("SIGTERM"); + expect(listenerCount).toBeGreaterThanOrEqual(1); + onceSpy.mockRestore(); + }); + + it("SIGTERM handler deletes tracked jobs and calls process.exit(0)", async () => { + // Capture the SIGTERM handler by temporarily intercepting process.once. + let capturedHandler: (() => void) | null = null; + const onceSpy = vi.spyOn(process, "once").mockImplementation( + (event: string | symbol, handler: (...args: unknown[]) => void) => { + if (event === "SIGTERM") capturedHandler = handler as () => void; + return process; + }, + ); + const exitSpy = vi.spyOn(process, "exit").mockImplementation(() => { throw new Error("process.exit"); }); + + // Reset the SIGTERM flag by re-importing with resetModules (only works first-time per suite). + // Instead, we test the handler by intercepting the first registration that fires. + // If the handler was already installed, capturedHandler stays null and we skip. + const ctx = makeCtx(); + await execute(ctx); + + onceSpy.mockRestore(); + exitSpy.mockRestore(); + + if (capturedHandler) { + const batchApi = vi.mocked(getBatchApi)(); + // Fire the handler and verify it deletes the job that was just created. + try { capturedHandler(); } catch { /* swallow process.exit throw */ } + await new Promise((r) => setTimeout(r, 50)); // let async handler tick + expect(batchApi.deleteNamespacedJob).toHaveBeenCalled(); + } + // If capturedHandler is null, sigtermHandlerInstalled was already true from + // a prior test — handler registration is idempotent, which is also correct. }); }); @@ -670,6 +824,98 @@ describe("execute — log dedup (waitForPod status dedup)", () => { }); }); +describe("execute — external cancel polling", () => { + const KEEPALIVE_MS = 15_000; + + afterEach(() => { + vi.useRealTimers(); + vi.unstubAllGlobals(); + delete process.env.PAPERCLIP_API_URL; + delete process.env.PAPERCLIP_API_KEY; + }); + + it("returns errorCode=cancelled and deletes job when heartbeat-run status is not running", async () => { + vi.useFakeTimers(); + + process.env.PAPERCLIP_API_URL = "http://test-api"; + process.env.PAPERCLIP_API_KEY = "test-key"; + + vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ + ok: true, + json: () => Promise.resolve({ status: "cancelled" }), + })); + + let jobDeleted = false; + const batchApi = makeBatchApi(); + batchApi.deleteNamespacedJob.mockImplementation(() => { + jobDeleted = true; + return Promise.resolve({}); + }); + batchApi.readNamespacedJob.mockImplementation(() => { + if (jobDeleted) { + const err = Object.assign(new Error("not found"), { statusCode: 404 }); + return Promise.reject(err); + } + return Promise.resolve({ status: { conditions: [] } }); // non-terminal until deleted + }); + vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType); + + const ctx = makeCtx(); + const executePromise = execute(ctx); + + // Advance in 1-second steps. vi.advanceTimersByTimeAsync fires fake timers + // but only drains one microtask level per call. Advancing in small chunks + // gives multi-level Promise chains (fetch → json → cancel logic) time to + // fully settle between steps before we await the resolved execute result. + for (let i = 0; i < 20; i++) { + await vi.advanceTimersByTimeAsync(1_000); + } + + const result = await executePromise; + + expect(result.errorCode).toBe("cancelled"); + expect(result.exitCode).toBeNull(); + expect(result.timedOut).toBe(false); + expect(batchApi.deleteNamespacedJob).toHaveBeenCalledWith( + expect.objectContaining({ name: JOB_NAME, namespace: NAMESPACE, body: { propagationPolicy: "Background" } }), + ); + }); + + it("does not cancel when PAPERCLIP_API_URL is absent", async () => { + // No PAPERCLIP_API_URL set — cancel polling is skipped; normal completion runs. + delete process.env.PAPERCLIP_API_URL; + + const ctx = makeCtx(); + const result = await execute(ctx); + + expect(result.errorCode).toBeUndefined(); + expect(result.exitCode).toBe(0); + }); + + it("does not cancel when heartbeat-run status is still running", async () => { + vi.useFakeTimers(); + + process.env.PAPERCLIP_API_URL = "http://test-api"; + process.env.PAPERCLIP_API_KEY = "test-key"; + + vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ + ok: true, + json: () => Promise.resolve({ status: "running" }), + })); + + const ctx = makeCtx(); + const executePromise = execute(ctx); + + await vi.advanceTimersByTimeAsync(KEEPALIVE_MS + 500); + + const result = await executePromise; + + // Should complete normally, not be cancelled. + expect(result.errorCode).toBeUndefined(); + expect(result.exitCode).toBe(0); + }); +}); + describe("execute — large-prompt Secret path", () => { const LARGE_PROMPT = "x".repeat(300 * 1024); // 300 KiB > 256 KiB threshold diff --git a/src/server/execute.ts b/src/server/execute.ts index 92bd2c7..f16cafc 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -410,43 +410,438 @@ async function cleanupJob( } } -export async function execute(ctx: AdapterExecutionContext): Promise { - const { runId, runtime, config: rawConfig, onLog, onMeta } = ctx; - const config = parseObject(rawConfig); - const timeoutSec = asNumber(config.timeoutSec, 0); - const graceSec = asNumber(config.graceSec, 60); - const retainJobs = asBoolean(config.retainJobs, false); - const kubeconfigPath = asString(config.kubeconfig, "") || undefined; +/** + * Stream logs + await completion for an already-created Job, then harvest + * and return the execution result. Used by both the normal create-then-run + * path and the orphaned-job reattach path. + */ +async function streamAndAwaitJob( + ctx: AdapterExecutionContext, + jobName: string, + namespace: string, + timeoutSec: number, + graceSec: number, + kubeconfigPath: string | undefined, + retainJobs: boolean, + promptSecretName?: string, +): Promise { + const { onLog } = ctx; + const config = parseObject(ctx.config); const model = asString(config.model, "").trim(); - // Guard: single concurrency per agent (shared PVC/session) - const agentId = ctx.agent.id; - const selfPod = await getSelfPodInfo(kubeconfigPath); - const guardNamespace = asString(config.namespace, "") || selfPod.namespace; + let stdout = ""; + let exitCode: number | null = null; + let jobTimedOut = false; + let podTerminatedReason: string | null = null; + let keepaliveTimer: ReturnType | null = null; + const cancelSignal = { cancelled: false }; + try { - const batchApi = getBatchApi(kubeconfigPath); - const existing = await batchApi.listNamespacedJob({ - namespace: guardNamespace, - labelSelector: `paperclip.io/agent-id=${agentId},paperclip.io/adapter-type=opencode_k8s`, - }); - const running = existing.items.filter( - (j) => !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"), - ); - if (running.length > 0) { - const names = running.map((j) => j.metadata?.name).join(", "); - await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`); + const scheduleTimeoutMs = 120_000; + let podName: string; + try { + podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath); + await onLog("stdout", `[paperclip] Pod running: ${podName}\n`); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + await onLog("stderr", `[paperclip] Pod scheduling failed: ${msg}\n`); return { exitCode: null, signal: null, timedOut: false, - errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`, - errorCode: "k8s_concurrent_run_blocked", + errorMessage: `Pod scheduling failed: ${msg}`, + errorCode: "k8s_pod_schedule_failed", }; } - } catch { - // If we can't check, proceed — heartbeat service enforces concurrency too + + const completionTimeoutMs = timeoutSec > 0 ? (timeoutSec + graceSec) * 1000 : 0; + const logStopSignal = { stopped: false }; + const logDedup = new LogLineDedupFilter(); + + const runId = ctx.runId; + let lastLogAt = Date.now(); + let keepaliveJobTerminal = false; + let consecutiveTerminalReadings = 0; + keepaliveTimer = setInterval(() => { + void (async () => { + if (keepaliveJobTerminal || cancelSignal.cancelled) return; + + // Require two consecutive terminal readings before latching to + // guard against a stale K8s API cache returning a false terminal + // status on a single read. + try { + const j = await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace }); + const terminal = j.status?.conditions?.some( + (c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True", + ); + if (terminal) { + consecutiveTerminalReadings++; + if (consecutiveTerminalReadings >= 2) keepaliveJobTerminal = true; + return; + } + consecutiveTerminalReadings = 0; + } catch { + return; + } + const silenceSec = Math.round((Date.now() - lastLogAt) / 1000); + void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {}); + })(); + }, KEEPALIVE_INTERVAL_MS); + + // External cancel poll: watches Paperclip run status at keepalive cadence. + // Uses await-setTimeout (not setInterval+void) so vi.advanceTimersByTimeAsync + // can drive it in tests. Fire-and-forget; exits when logStopSignal.stopped. + void (async (): Promise => { + const apiUrl = process.env.PAPERCLIP_API_URL; + if (!apiUrl || !runId) return; + while (!logStopSignal.stopped && !cancelSignal.cancelled) { + await new Promise((resolve) => setTimeout(resolve, KEEPALIVE_INTERVAL_MS)); + if (logStopSignal.stopped || cancelSignal.cancelled) break; + try { + const resp = await fetch(`${apiUrl}/api/heartbeat-runs/${runId}`, { + headers: { Authorization: `Bearer ${process.env.PAPERCLIP_API_KEY ?? ""}` }, + }); + if (resp.ok) { + const data = await resp.json() as { status?: string }; + if (typeof data.status === "string" && data.status !== "running") { + cancelSignal.cancelled = true; + logStopSignal.stopped = true; + try { + await getBatchApi(kubeconfigPath).deleteNamespacedJob({ + name: jobName, + namespace, + body: { propagationPolicy: "Background" }, + }); + } catch { /* best-effort */ } + } + } + } catch { /* non-fatal */ } + } + })(); + + const wrappedOnLog: typeof onLog = async (stream, chunk) => { + lastLogAt = Date.now(); + return onLog(stream, chunk); + }; + + let logExitTime: number | null = null; + const trackedLogStream = streamPodLogs( + namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup, + () => { logExitTime = Date.now(); }, + ); + + let gracePoller: ReturnType | null = null; + const completionGraced = new Promise((resolve, reject) => { + let settled = false; + const settleOk = (r: JobCompletionResult) => { + if (settled) return; + settled = true; + if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } + logStopSignal.stopped = true; + resolve(r); + }; + const settleErr = (err: unknown) => { + if (settled) return; + settled = true; + if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } + logStopSignal.stopped = true; + reject(err); + }; + waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr); + gracePoller = setInterval(() => { + if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) { + void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output\n`).catch(() => {}); + settleOk({ succeeded: false, timedOut: false, jobGone: true }); + } + }, 1_000); + }); + + const [logResult, completionResult] = await Promise.allSettled([ + trackedLogStream, + completionGraced, + ]); + + if (keepaliveTimer) { + clearInterval(keepaliveTimer); + keepaliveTimer = null; + } + + if (logResult.status === "fulfilled") { + stdout = logResult.value; + } + + if (!stdout.trim()) { + await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`); + stdout = await readPodLogs(namespace, podName, kubeconfigPath); + if (stdout.trim()) { + await onLog("stdout", stdout); + } + } else if (!parseOpenCodeJsonl(stdout).sessionId) { + await onLog("stdout", `[paperclip] Partial stdout missing session result — reading pod logs directly...\n`); + const fallbackLogs = await readPodLogs(namespace, podName, kubeconfigPath); + if (fallbackLogs.trim()) { + stdout = fallbackLogs; + await onLog("stdout", fallbackLogs); + } + } + + if (completionResult.status === "fulfilled") { + const completion = completionResult.value; + jobTimedOut = completion.timedOut; + if (completion.jobGone) { + await onLog("stdout", `[paperclip] Job ${jobName} not found (likely TTL-cleaned after completion).\n`); + } + } else { + jobTimedOut = true; + } + + const terminatedInfo = await getPodTerminatedInfo(namespace, jobName, kubeconfigPath); + exitCode = terminatedInfo.exitCode; + podTerminatedReason = terminatedInfo.reason; + } finally { + if (keepaliveTimer) { + clearInterval(keepaliveTimer); + keepaliveTimer = null; + } + activeJobs.delete(jobName); + if (!retainJobs) { + await cleanupJob(namespace, jobName, onLog, kubeconfigPath, promptSecretName); + } else { + await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`); + } } + if (cancelSignal.cancelled) { + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: "Run cancelled", + errorCode: "cancelled", + }; + } + + if (jobTimedOut) { + return { + exitCode, + signal: null, + timedOut: true, + errorMessage: `Timed out after ${timeoutSec}s`, + errorCode: "timeout", + }; + } + + const parsed = parseOpenCodeJsonl(stdout); + const runtimeSessionParams = parseObject(ctx.runtime.sessionParams); + const fallbackSessionId = asString(runtimeSessionParams.sessionId, ctx.runtime.sessionId ?? ""); + const workspaceContext = parseObject(ctx.context.paperclipWorkspace); + const workspaceId = asString(workspaceContext.workspaceId, "") || null; + const workspaceRepoUrl = asString(workspaceContext.repoUrl, "") || null; + const workspaceRepoRef = asString(workspaceContext.repoRef, "") || null; + const cwd = asString(workspaceContext.cwd, ""); + + const resolvedSessionId = parsed.sessionId ?? (fallbackSessionId || null); + const resolvedSessionParams = resolvedSessionId + ? { + sessionId: resolvedSessionId, + ...(cwd ? { cwd } : {}), + ...(workspaceId ? { workspaceId } : {}), + ...(workspaceRepoUrl ? { repoUrl: workspaceRepoUrl } : {}), + ...(workspaceRepoRef ? { repoRef: workspaceRepoRef } : {}), + } as Record + : null; + + const provider = parseModelProvider(model); + const biller = inferOpenAiCompatibleBiller(process.env, null) ?? provider ?? "unknown"; + + const parsedError = typeof parsed.errorMessage === "string" ? parsed.errorMessage.trim() : ""; + const rawExitCode = exitCode; + const synthesizedExitCode = parsedError && (rawExitCode ?? 0) === 0 ? 1 : rawExitCode; + const failed = (synthesizedExitCode ?? 0) !== 0; + + if (failed && isOpenCodeUnknownSessionError(stdout, parsedError)) { + await onLog("stdout", `[paperclip] OpenCode session is unavailable; clearing for next run.\n`); + return { + exitCode: synthesizedExitCode, + signal: null, + timedOut: false, + errorMessage: parsedError || "Session unavailable", + errorCode: "session_unavailable", + clearSession: true, + resultJson: { stdout }, + }; + } + + const stepLimitReached = isOpenCodeStepLimitResult(stdout); + if (stepLimitReached) { + await onLog("stdout", `[paperclip] OpenCode step limit reached; clearing session for next run.\n`); + } + + const hasLlmOutput = parsed.usage.outputTokens > 0 || !!parsed.summary; + if (!jobTimedOut && parsed.sessionId !== null && !hasLlmOutput && !parsedError) { + await onLog("stderr", `[paperclip] LLM returned empty response (0 output tokens).\n`); + return { + exitCode: synthesizedExitCode ?? 1, + signal: null, + timedOut: false, + errorMessage: "LLM API returned empty response", + errorCode: "llm_api_error", + sessionId: resolvedSessionId, + sessionParams: resolvedSessionParams, + resultJson: { stdout }, + }; + } + + const firstStderrLine = stdout.split(/\r?\n/).map((l) => l.trim()).find(Boolean) ?? ""; + const podFailureDescription = podTerminatedReason + ? `Pod exited: ${podTerminatedReason}${synthesizedExitCode != null ? ` (exit ${synthesizedExitCode})` : ""}` + : null; + const errorParts = [parsedError, podFailureDescription].filter(Boolean); + const fallbackErrorMessage = + errorParts.join("; ") || firstStderrLine || `OpenCode exited with code ${synthesizedExitCode ?? -1}`; + + return { + exitCode: synthesizedExitCode, + signal: null, + timedOut: false, + errorMessage: (synthesizedExitCode ?? 0) === 0 ? null : fallbackErrorMessage, + usage: { + inputTokens: parsed.usage.inputTokens, + outputTokens: parsed.usage.outputTokens, + cachedInputTokens: parsed.usage.cachedInputTokens, + }, + sessionId: resolvedSessionId, + sessionParams: resolvedSessionParams, + sessionDisplayId: resolvedSessionId, + provider, + model: model || null, + billingType: "unknown", + costUsd: parsed.costUsd, + resultJson: { stdout }, + summary: parsed.summary, + clearSession: stepLimitReached, + } as AdapterExecutionResult; +} + +// Per-agent mutex: serializes guard-check + job-create to prevent TOCTOU races. +const agentCreationMutex = new Map>(); + +// Active Jobs tracked for SIGTERM cleanup. +const activeJobs = new Map(); +let sigtermHandlerInstalled = false; + +function ensureSigtermHandler(): void { + if (sigtermHandlerInstalled) return; + sigtermHandlerInstalled = true; + process.once("SIGTERM", () => { + void (async () => { + await Promise.allSettled( + Array.from(activeJobs.entries()).flatMap(([jobName, { namespace, kubeconfigPath, promptSecretName }]) => { + const ops: Promise[] = [ + getBatchApi(kubeconfigPath) + .deleteNamespacedJob({ name: jobName, namespace, body: { propagationPolicy: "Background" } }) + .catch(() => {}), + ]; + if (promptSecretName) { + ops.push( + getCoreApi(kubeconfigPath) + .deleteNamespacedSecret({ name: promptSecretName, namespace }) + .catch(() => {}), + ); + } + return ops; + }), + ); + process.exit(0); + })(); + }); +} + +export async function execute(ctx: AdapterExecutionContext): Promise { + const { config: rawConfig, onLog, onMeta } = ctx; + const config = parseObject(rawConfig); + const timeoutSec = asNumber(config.timeoutSec, 0); + const graceSec = asNumber(config.graceSec, 60); + const retainJobs = asBoolean(config.retainJobs, false); + const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, false); + const kubeconfigPath = asString(config.kubeconfig, "") || undefined; + + const agentId = ctx.agent.id; + const taskId = asString(ctx.context.taskId ?? ctx.context.issueId, "").trim(); + const selfPod = await getSelfPodInfo(kubeconfigPath); + const guardNamespace = asString(config.namespace, "") || selfPod.namespace; + ensureSigtermHandler(); + + // Serialize guard-check + job-create per agent to prevent TOCTOU races. + const prevLock = agentCreationMutex.get(agentId) ?? Promise.resolve(); + let releaseLock!: () => void; + agentCreationMutex.set( + agentId, + prevLock.then(() => new Promise((resolve) => { releaseLock = resolve; })), + ); + await prevLock; + + try { + // Guard: single concurrency per agent (shared PVC/session) — fail-closed. + try { + const batchApi = getBatchApi(kubeconfigPath); + const existing = await batchApi.listNamespacedJob({ + namespace: guardNamespace, + labelSelector: `paperclip.io/agent-id=${agentId},paperclip.io/adapter-type=opencode_k8s`, + }); + const running = existing.items.filter( + (j) => !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"), + ); + if (running.length > 0) { + // Separate Jobs matching the current task (orphaned from a prior server instance) + // from Jobs belonging to a different concurrent task. + const sameTaskJobs = taskId + ? running.filter((j) => j.metadata?.labels?.["paperclip.io/task-id"] === taskId) + : []; + const otherJobs = running.filter((j) => !sameTaskJobs.includes(j)); + + if (otherJobs.length > 0) { + const names = otherJobs.map((j) => j.metadata?.name).join(", "); + await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`); + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`, + errorCode: "k8s_concurrent_run_blocked", + }; + } + + if (sameTaskJobs.length > 0) { + const orphanJob = sameTaskJobs[0]; + const orphanJobName = orphanJob.metadata?.name ?? ""; + if (reattachOrphanedJobs) { + await onLog("stdout", `[paperclip] Reattaching to orphaned Job ${orphanJobName} from prior server instance (task: ${taskId})...\n`); + activeJobs.set(orphanJobName, { namespace: guardNamespace, kubeconfigPath }); + return streamAndAwaitJob(ctx, orphanJobName, guardNamespace, timeoutSec, graceSec, kubeconfigPath, retainJobs); + } + await onLog("stderr", `[paperclip] Orphaned Job ${orphanJobName} found for this task but reattachOrphanedJobs is disabled.\n`); + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: `Orphaned Job ${orphanJobName} is still running (reattachOrphanedJobs disabled)`, + errorCode: "k8s_concurrent_run_blocked", + }; + } + } + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + await onLog("stderr", `[paperclip] Concurrency guard unreachable — cannot list Jobs: ${msg}\n`); + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: `Concurrency guard unreachable: ${msg}`, + errorCode: "k8s_concurrency_guard_unreachable", + }; + } + // Read agent instructions file (instructionsFilePath config field → system prompt prepend) const instructionsFilePath = asString(config.instructionsFilePath, "").trim(); let instructionsContent = ""; @@ -592,275 +987,15 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0 ? `${timeoutSec}s` : "none"})\n`); + // Register job for SIGTERM cleanup before releasing the mutex. + activeJobs.set(jobName, { namespace, kubeconfigPath, promptSecretName }); - let stdout = ""; - let exitCode: number | null = null; - let jobTimedOut = false; - let podTerminatedReason: string | null = null; - let keepaliveTimer: ReturnType | null = null; + await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`); - try { - const scheduleTimeoutMs = 120_000; - let podName: string; - try { - podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath); - await onLog("stdout", `[paperclip] Pod running: ${podName}\n`); - } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - await onLog("stderr", `[paperclip] Pod scheduling failed: ${msg}\n`); - return { - exitCode: null, - signal: null, - timedOut: false, - errorMessage: `Pod scheduling failed: ${msg}`, - errorCode: "k8s_pod_schedule_failed", - }; - } - - const completionTimeoutMs = timeoutSec > 0 ? (timeoutSec + graceSec) * 1000 : 0; - - // Shared stop signal: set to true when job completion is detected so - // the log stream stops reconnecting promptly. - const logStopSignal = { stopped: false }; - // Shared dedup filter across reconnects so replayed lines inside the - // sinceSeconds overlap window are dropped before reaching the UI. - const logDedup = new LogLineDedupFilter(); - - // Keepalive: periodically emit a status line so the Paperclip server - // knows the adapter is still alive during long silent phases. - let lastLogAt = Date.now(); - let keepaliveJobTerminal = false; - let consecutiveTerminalReadings = 0; - keepaliveTimer = setInterval(() => { - void (async () => { - if (keepaliveJobTerminal) return; - - // Require two consecutive terminal readings before latching to - // guard against a stale K8s API cache returning a false terminal - // status on a single read. - try { - const j = await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace }); - const terminal = j.status?.conditions?.some( - (c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True", - ); - if (terminal) { - consecutiveTerminalReadings++; - if (consecutiveTerminalReadings >= 2) keepaliveJobTerminal = true; - return; - } - consecutiveTerminalReadings = 0; - } catch { - return; - } - - const silenceSec = Math.round((Date.now() - lastLogAt) / 1000); - void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {}); - })(); - }, KEEPALIVE_INTERVAL_MS); - - // wrappedOnLog updates lastLogAt so the keepalive timer can measure silence. - const wrappedOnLog: typeof onLog = async (stream, chunk) => { - lastLogAt = Date.now(); - return onLog(stream, chunk); - }; - - // Track when the log stream first exits so the grace-period can fire - // if the K8s Job condition lags behind container exit. - let logExitTime: number | null = null; - const trackedLogStream = streamPodLogs( - namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup, - () => { logExitTime = Date.now(); }, - ); - - // completionGraced races waitForJobCompletion against a grace timer that - // fires LOG_EXIT_COMPLETION_GRACE_MS after the log stream exits. This bounds - // the stale-UI window when K8s Job conditions lag container exit. - let gracePoller: ReturnType | null = null; - const completionGraced = new Promise((resolve, reject) => { - let settled = false; - const settleOk = (r: JobCompletionResult) => { - if (settled) return; - settled = true; - if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } - logStopSignal.stopped = true; - resolve(r); - }; - const settleErr = (err: unknown) => { - if (settled) return; - settled = true; - if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } - logStopSignal.stopped = true; - reject(err); - }; - waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr); - gracePoller = setInterval(() => { - if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) { - void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output\n`).catch(() => {}); - settleOk({ succeeded: false, timedOut: false, jobGone: true }); - } - }, 1_000); - }); - - const [logResult, completionResult] = await Promise.allSettled([ - trackedLogStream, - completionGraced, - ]); - - if (keepaliveTimer) { - clearInterval(keepaliveTimer); - keepaliveTimer = null; - } - - if (logResult.status === "fulfilled") { - stdout = logResult.value; - } - - if (!stdout.trim()) { - await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`); - stdout = await readPodLogs(namespace, podName, kubeconfigPath); - if (stdout.trim()) { - await onLog("stdout", stdout); - } - } else if (!parseOpenCodeJsonl(stdout).sessionId) { - // Stdout is non-empty but missing a valid session result — try one-shot fallback - await onLog("stdout", `[paperclip] Partial stdout missing session result — reading pod logs directly...\n`); - const fallbackLogs = await readPodLogs(namespace, podName, kubeconfigPath); - if (fallbackLogs.trim()) { - stdout = fallbackLogs; - await onLog("stdout", fallbackLogs); - } - } - - if (completionResult.status === "fulfilled") { - const completion = completionResult.value; - jobTimedOut = completion.timedOut; - if (completion.jobGone) { - await onLog("stdout", `[paperclip] Job ${jobName} not found (likely TTL-cleaned after completion).\n`); - } - } else { - jobTimedOut = true; - } - - const terminatedInfo = await getPodTerminatedInfo(namespace, jobName, kubeconfigPath); - exitCode = terminatedInfo.exitCode; - podTerminatedReason = terminatedInfo.reason; + // return evaluates streamAndAwaitJob() (creating the promise) before finally runs, + // so the mutex releases as soon as the job is registered — not after the full lifecycle. + return streamAndAwaitJob(ctx, jobName, namespace, timeoutSec, graceSec, kubeconfigPath, retainJobs, promptSecretName); } finally { - if (keepaliveTimer) { - clearInterval(keepaliveTimer); - keepaliveTimer = null; - } - if (!retainJobs) { - await cleanupJob(namespace, jobName, onLog, kubeconfigPath, promptSecretName); - } else { - await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`); - } + releaseLock(); } - - if (jobTimedOut) { - return { - exitCode, - signal: null, - timedOut: true, - errorMessage: `Timed out after ${timeoutSec}s`, - errorCode: "timeout", - }; - } - - // Parse OpenCode JSONL output - const parsed = parseOpenCodeJsonl(stdout); - - const runtimeSessionParams = parseObject(runtime.sessionParams); - const fallbackSessionId = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? ""); - const workspaceContext = parseObject(ctx.context.paperclipWorkspace); - const workspaceId = asString(workspaceContext.workspaceId, "") || null; - const workspaceRepoUrl = asString(workspaceContext.repoUrl, "") || null; - const workspaceRepoRef = asString(workspaceContext.repoRef, "") || null; - const cwd = asString(workspaceContext.cwd, ""); - - const resolvedSessionId = parsed.sessionId ?? (fallbackSessionId || null); - const resolvedSessionParams = resolvedSessionId - ? { - sessionId: resolvedSessionId, - ...(cwd ? { cwd } : {}), - ...(workspaceId ? { workspaceId } : {}), - ...(workspaceRepoUrl ? { repoUrl: workspaceRepoUrl } : {}), - ...(workspaceRepoRef ? { repoRef: workspaceRepoRef } : {}), - } as Record - : null; - - const provider = parseModelProvider(model); - const biller = inferOpenAiCompatibleBiller(process.env, null) ?? provider ?? "unknown"; - - const parsedError = typeof parsed.errorMessage === "string" ? parsed.errorMessage.trim() : ""; - const rawExitCode = exitCode; - const synthesizedExitCode = parsedError && (rawExitCode ?? 0) === 0 ? 1 : rawExitCode; - const failed = (synthesizedExitCode ?? 0) !== 0; - - // If the session was stale, clear it so the next heartbeat starts fresh - if (failed && isOpenCodeUnknownSessionError(stdout, parsedError)) { - await onLog("stdout", `[paperclip] OpenCode session is unavailable; clearing for next run.\n`); - return { - exitCode: synthesizedExitCode, - signal: null, - timedOut: false, - errorMessage: parsedError || "Session unavailable", - errorCode: "session_unavailable", - clearSession: true, - resultJson: { stdout }, - }; - } - - // If OpenCode hit its step limit, clear the session so the next run starts fresh - // rather than resuming into an already-exhausted turn sequence. - const stepLimitReached = isOpenCodeStepLimitResult(stdout); - if (stepLimitReached) { - await onLog("stdout", `[paperclip] OpenCode step limit reached; clearing session for next run.\n`); - } - - // Detect empty LLM response: session started but LLM returned no tokens or messages - const hasLlmOutput = parsed.usage.outputTokens > 0 || !!parsed.summary; - if (!jobTimedOut && parsed.sessionId !== null && !hasLlmOutput && !parsedError) { - await onLog("stderr", `[paperclip] LLM returned empty response (0 output tokens).\n`); - return { - exitCode: synthesizedExitCode ?? 1, - signal: null, - timedOut: false, - errorMessage: "LLM API returned empty response", - errorCode: "llm_api_error", - sessionId: resolvedSessionId, - sessionParams: resolvedSessionParams, - resultJson: { stdout }, - }; - } - - const firstStderrLine = stdout.split(/\r?\n/).map((l) => l.trim()).find(Boolean) ?? ""; - const podFailureDescription = podTerminatedReason - ? `Pod exited: ${podTerminatedReason}${synthesizedExitCode != null ? ` (exit ${synthesizedExitCode})` : ""}` - : null; - const errorParts = [parsedError, podFailureDescription].filter(Boolean); - const fallbackErrorMessage = - errorParts.join("; ") || firstStderrLine || `OpenCode exited with code ${synthesizedExitCode ?? -1}`; - - return { - exitCode: synthesizedExitCode, - signal: null, - timedOut: false, - errorMessage: (synthesizedExitCode ?? 0) === 0 ? null : fallbackErrorMessage, - usage: { - inputTokens: parsed.usage.inputTokens, - outputTokens: parsed.usage.outputTokens, - cachedInputTokens: parsed.usage.cachedInputTokens, - }, - sessionId: resolvedSessionId, - sessionParams: resolvedSessionParams, - sessionDisplayId: resolvedSessionId, - provider, - model: model || null, - billingType: "unknown", - costUsd: parsed.costUsd, - resultJson: { stdout }, - summary: parsed.summary, - clearSession: stepLimitReached, - } as AdapterExecutionResult; }