From 55fd3021fbe1e6717fea8eeef5809e3618b692d9 Mon Sep 17 00:00:00 2001 From: Chris Farhood Date: Fri, 24 Apr 2026 20:10:01 +0000 Subject: [PATCH] fix: add per-agent mutex to eliminate TOCTOU race in K8s concurrency guard (FAR-29) Two concurrent execute() calls for the same agent can both pass the list-then-create guard before either job appears in the other's query. The new module-level agentCreationMutex serializes the guard+create phase within the process so only one call enters listNamespacedJob at a time. The mutex is acquired after sanitizing the agent ID and released in a finally block that wraps the entire guard+create section, so all early return paths (guard blocks, create failures) cleanly release it. Variables used in both the guard+create and log-streaming phases are hoisted to before the try block. Cross-agent calls use separate mutex slots and are unaffected. Added two vitest cases verifying same-agent serialization and that different-agent calls are not serialized. Co-Authored-By: Paperclip --- src/server/execute.test.ts | 88 ++++++++++++++++++++++++++++++++++++++ src/server/execute.ts | 47 +++++++++++++++----- 2 files changed, 124 insertions(+), 11 deletions(-) diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index 5ad5406..4970481 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -1637,3 +1637,91 @@ describe("execute: SIGTERM handler best-effort cleanup", () => { // so we do not need to settle executePromise. }); }); + +// ─── execute: per-agent creation mutex (FAR-29 TOCTOU fix) ─────────────────── +// +// Verifies that two concurrent execute() calls for the same agent cannot both +// enter the listNamespacedJob → createNamespacedJob sequence simultaneously. +// Without the per-agent mutex, both would pass the concurrency guard before +// either job appears in the other's list query. + +describe("execute: per-agent creation mutex prevents TOCTOU race", () => { + beforeEach(() => { + vi.resetAllMocks(); + mockReadSkillEntries.mockResolvedValue([]); + mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); + mockPrepareBundle.mockResolvedValue(makeBundle()); + // Make job creation fail so the guard+create phase exits quickly and + // releases the mutex without needing to mock the full streaming path. + mockBatchCreateJob.mockRejectedValue(new Error("mock: create not configured")); + mockBatchDeleteJob.mockResolvedValue({}); + mockCoreDeleteSecret.mockResolvedValue({}); + }); + + it("serializes guard phases for the same agent: call-2 waits until call-1 exits guard+create", async () => { + const listCalls: string[] = []; + let resolveFirstList!: (v: { items: [] }) => void; + + mockBatchListJobs + .mockImplementationOnce(() => { + listCalls.push("call-1"); + return new Promise<{ items: [] }>((resolve) => { resolveFirstList = resolve; }); + }) + .mockImplementation(() => { + listCalls.push("call-2"); + return Promise.resolve({ items: [] }); + }); + + const p1 = execute(makeCtx({ runId: "run-1" })); + const p2 = execute(makeCtx({ runId: "run-2" })); + + // Drain microtasks: call-1 should be suspended in listNamespacedJob while + // call-2 waits behind the per-agent mutex, not yet calling list. + for (let i = 0; i < 20; i++) await Promise.resolve(); + expect(listCalls).toEqual(["call-1"]); + + // Let call-1's guard resolve (no running jobs). It will proceed to job + // creation, fail (mock rejects), and release the mutex in finally. + resolveFirstList({ items: [] }); + await Promise.allSettled([p1, p2]); + + // call-2 must have listed, and only AFTER call-1's guard resolved. + // The exact order: call-1 listed → call-1 list resolved → call-2 listed. + expect(listCalls).toEqual(["call-1", "call-2"]); + }); + + it("does not serialize guard phases for different agents", async () => { + const listCalls: string[] = []; + let resolveAgentAList!: (v: { items: [] }) => void; + + // Agent A's list is artificially slow. Agent B (different id) should + // proceed immediately without waiting — the mutex is keyed by agent id. + mockBatchListJobs + .mockImplementationOnce(() => { + listCalls.push("A"); + return new Promise<{ items: [] }>((resolve) => { resolveAgentAList = resolve; }); + }) + .mockImplementation(() => { + listCalls.push("B"); + return Promise.resolve({ items: [] }); + }); + + const ctxA = makeCtx({ runId: "run-A" }); + const ctxB = makeCtx({ + runId: "run-B", + agent: { id: "agent-other", companyId: "co1", name: "Other Agent", adapterType: "claude_k8s", adapterConfig: {} }, + } as Partial); + + const pA = execute(ctxA); + const pB = execute(ctxB); + + // Drain microtasks — B should have called list even though A is still + // suspended, because they use separate mutex slots. + for (let i = 0; i < 20; i++) await Promise.resolve(); + expect(listCalls).toContain("B"); + + // Let A complete so the promises settle cleanly. + resolveAgentAList({ items: [] }); + await Promise.allSettled([pA, pB]); + }); +}); diff --git a/src/server/execute.ts b/src/server/execute.ts index 072ae6f..1e5c8b6 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -48,6 +48,10 @@ interface ActiveJobRef { kubeconfigPath?: string; } const activeJobs = new Set(); +// Per-agent serialization lock: prevents the TOCTOU race (FAR-29) where two +// concurrent execute() calls for the same agent both pass the list-then-create +// guard and create K8s Jobs simultaneously on the shared PVC. +const agentCreationMutex = new Map>(); let sigtermHandlerRegistered = false; function ensureSigtermHandler(): void { @@ -633,17 +637,40 @@ export async function execute(ctx: AdapterExecutionContext): Promise void = () => {}; + const _mutexSlot = new Promise((resolve) => { _releaseMutex = resolve; }); + // Chain: next caller for this agent waits on _mutexSlot, which resolves in finally. + agentCreationMutex.set(agentId, _prevCreation.then(() => _mutexSlot, () => _mutexSlot)); + // Wait for any prior execute() call to finish its guard+create phase. + await _prevCreation.catch(() => {}); + + // Hoist declarations used in both the guard+create phase and the log-streaming + // section so the mutex try/finally can be added without a large re-indent. + let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null; + // eslint-disable-next-line prefer-const + let jobName!: string; + // eslint-disable-next-line prefer-const + let namespace!: string; + let promptSecret: { name: string; namespace: string; data: Record } | null = null; + // runtimeSessionParams and currentSessionIdRaw are also used after the + // try block (in the result-parsing section) so hoist them here. + const runtimeSessionParams = parseObject(runtime.sessionParams); + const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? ""); + const coreApi = getCoreApi(kubeconfigPath); + const batchApi = getBatchApi(kubeconfigPath); + + try { const selfPod = await getSelfPodInfo(kubeconfigPath); const guardNamespace = asString(config.namespace, "") || selfPod.namespace; const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true); - const runtimeSessionParams = parseObject(runtime.sessionParams); - const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? ""); const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null; const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, ""); const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null; - let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null; try { - const batchApi = getBatchApi(kubeconfigPath); const existing = await batchApi.listNamespacedJob({ namespace: guardNamespace, labelSelector: `paperclip.io/agent-id=${sanitizedAgentId},paperclip.io/adapter-type=claude_k8s`, @@ -763,13 +790,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise } | null = null; - // Prepare the prompt bundle (skills + instructions) on the server filesystem. // The K8s Job pod mounts the same PVC at /paperclip, so bundle paths written // here are accessible inside the pod at the identical absolute path. @@ -967,6 +987,11 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0 ? `${timeoutSec}s` : "none"})\n`); } + } finally { + // Release the per-agent creation mutex so the next queued execute() call + // can proceed with its guard+create phase (FAR-29). + _releaseMutex(); + } let stdout = ""; let exitCode: number | null = null;