fix: add per-agent mutex to eliminate TOCTOU race in K8s concurrency guard (FAR-29)
Two concurrent execute() calls for the same agent can both pass the list-then-create guard before either job appears in the other's query. The new module-level agentCreationMutex serializes the guard+create phase within the process so only one call enters listNamespacedJob at a time. The mutex is acquired after sanitizing the agent ID and released in a finally block that wraps the entire guard+create section, so all early return paths (guard blocks, create failures) cleanly release it. Variables used in both the guard+create and log-streaming phases are hoisted to before the try block. Cross-agent calls use separate mutex slots and are unaffected. Added two vitest cases verifying same-agent serialization and that different-agent calls are not serialized. Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -1637,3 +1637,91 @@ describe("execute: SIGTERM handler best-effort cleanup", () => {
|
||||
// so we do not need to settle executePromise.
|
||||
});
|
||||
});
|
||||
|
||||
// ─── execute: per-agent creation mutex (FAR-29 TOCTOU fix) ───────────────────
|
||||
//
|
||||
// Verifies that two concurrent execute() calls for the same agent cannot both
|
||||
// enter the listNamespacedJob → createNamespacedJob sequence simultaneously.
|
||||
// Without the per-agent mutex, both would pass the concurrency guard before
|
||||
// either job appears in the other's list query.
|
||||
|
||||
describe("execute: per-agent creation mutex prevents TOCTOU race", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
mockReadSkillEntries.mockResolvedValue([]);
|
||||
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
|
||||
mockPrepareBundle.mockResolvedValue(makeBundle());
|
||||
// Make job creation fail so the guard+create phase exits quickly and
|
||||
// releases the mutex without needing to mock the full streaming path.
|
||||
mockBatchCreateJob.mockRejectedValue(new Error("mock: create not configured"));
|
||||
mockBatchDeleteJob.mockResolvedValue({});
|
||||
mockCoreDeleteSecret.mockResolvedValue({});
|
||||
});
|
||||
|
||||
it("serializes guard phases for the same agent: call-2 waits until call-1 exits guard+create", async () => {
|
||||
const listCalls: string[] = [];
|
||||
let resolveFirstList!: (v: { items: [] }) => void;
|
||||
|
||||
mockBatchListJobs
|
||||
.mockImplementationOnce(() => {
|
||||
listCalls.push("call-1");
|
||||
return new Promise<{ items: [] }>((resolve) => { resolveFirstList = resolve; });
|
||||
})
|
||||
.mockImplementation(() => {
|
||||
listCalls.push("call-2");
|
||||
return Promise.resolve({ items: [] });
|
||||
});
|
||||
|
||||
const p1 = execute(makeCtx({ runId: "run-1" }));
|
||||
const p2 = execute(makeCtx({ runId: "run-2" }));
|
||||
|
||||
// Drain microtasks: call-1 should be suspended in listNamespacedJob while
|
||||
// call-2 waits behind the per-agent mutex, not yet calling list.
|
||||
for (let i = 0; i < 20; i++) await Promise.resolve();
|
||||
expect(listCalls).toEqual(["call-1"]);
|
||||
|
||||
// Let call-1's guard resolve (no running jobs). It will proceed to job
|
||||
// creation, fail (mock rejects), and release the mutex in finally.
|
||||
resolveFirstList({ items: [] });
|
||||
await Promise.allSettled([p1, p2]);
|
||||
|
||||
// call-2 must have listed, and only AFTER call-1's guard resolved.
|
||||
// The exact order: call-1 listed → call-1 list resolved → call-2 listed.
|
||||
expect(listCalls).toEqual(["call-1", "call-2"]);
|
||||
});
|
||||
|
||||
it("does not serialize guard phases for different agents", async () => {
|
||||
const listCalls: string[] = [];
|
||||
let resolveAgentAList!: (v: { items: [] }) => void;
|
||||
|
||||
// Agent A's list is artificially slow. Agent B (different id) should
|
||||
// proceed immediately without waiting — the mutex is keyed by agent id.
|
||||
mockBatchListJobs
|
||||
.mockImplementationOnce(() => {
|
||||
listCalls.push("A");
|
||||
return new Promise<{ items: [] }>((resolve) => { resolveAgentAList = resolve; });
|
||||
})
|
||||
.mockImplementation(() => {
|
||||
listCalls.push("B");
|
||||
return Promise.resolve({ items: [] });
|
||||
});
|
||||
|
||||
const ctxA = makeCtx({ runId: "run-A" });
|
||||
const ctxB = makeCtx({
|
||||
runId: "run-B",
|
||||
agent: { id: "agent-other", companyId: "co1", name: "Other Agent", adapterType: "claude_k8s", adapterConfig: {} },
|
||||
} as Partial<AdapterExecutionContext>);
|
||||
|
||||
const pA = execute(ctxA);
|
||||
const pB = execute(ctxB);
|
||||
|
||||
// Drain microtasks — B should have called list even though A is still
|
||||
// suspended, because they use separate mutex slots.
|
||||
for (let i = 0; i < 20; i++) await Promise.resolve();
|
||||
expect(listCalls).toContain("B");
|
||||
|
||||
// Let A complete so the promises settle cleanly.
|
||||
resolveAgentAList({ items: [] });
|
||||
await Promise.allSettled([pA, pB]);
|
||||
});
|
||||
});
|
||||
|
||||
+36
-11
@@ -48,6 +48,10 @@ interface ActiveJobRef {
|
||||
kubeconfigPath?: string;
|
||||
}
|
||||
const activeJobs = new Set<ActiveJobRef>();
|
||||
// Per-agent serialization lock: prevents the TOCTOU race (FAR-29) where two
|
||||
// concurrent execute() calls for the same agent both pass the list-then-create
|
||||
// guard and create K8s Jobs simultaneously on the shared PVC.
|
||||
const agentCreationMutex = new Map<string, Promise<void>>();
|
||||
let sigtermHandlerRegistered = false;
|
||||
|
||||
function ensureSigtermHandler(): void {
|
||||
@@ -633,17 +637,40 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
errorCode: "k8s_agent_id_invalid",
|
||||
};
|
||||
}
|
||||
// FAR-29: serialize guard+create per agent within this process to prevent the
|
||||
// TOCTOU race where two concurrent execute() calls both pass the list-then-create
|
||||
// guard and create K8s Jobs simultaneously on the shared PVC.
|
||||
const _prevCreation = agentCreationMutex.get(agentId) ?? Promise.resolve();
|
||||
let _releaseMutex: () => void = () => {};
|
||||
const _mutexSlot = new Promise<void>((resolve) => { _releaseMutex = resolve; });
|
||||
// Chain: next caller for this agent waits on _mutexSlot, which resolves in finally.
|
||||
agentCreationMutex.set(agentId, _prevCreation.then(() => _mutexSlot, () => _mutexSlot));
|
||||
// Wait for any prior execute() call to finish its guard+create phase.
|
||||
await _prevCreation.catch(() => {});
|
||||
|
||||
// Hoist declarations used in both the guard+create phase and the log-streaming
|
||||
// section so the mutex try/finally can be added without a large re-indent.
|
||||
let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null;
|
||||
// eslint-disable-next-line prefer-const
|
||||
let jobName!: string;
|
||||
// eslint-disable-next-line prefer-const
|
||||
let namespace!: string;
|
||||
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
|
||||
// runtimeSessionParams and currentSessionIdRaw are also used after the
|
||||
// try block (in the result-parsing section) so hoist them here.
|
||||
const runtimeSessionParams = parseObject(runtime.sessionParams);
|
||||
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
|
||||
try {
|
||||
const selfPod = await getSelfPodInfo(kubeconfigPath);
|
||||
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
|
||||
const reattachOrphanedJobs = asBoolean(config.reattachOrphanedJobs, true);
|
||||
const runtimeSessionParams = parseObject(runtime.sessionParams);
|
||||
const currentSessionIdRaw = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
|
||||
const currentSessionLabel = currentSessionIdRaw ? sanitizeLabelValue(currentSessionIdRaw) : null;
|
||||
const currentTaskIdRaw = asString(ctx.context.taskId, "") || asString(ctx.context.issueId, "");
|
||||
const currentTaskLabel = currentTaskIdRaw ? sanitizeLabelValue(currentTaskIdRaw) : null;
|
||||
let reattachTarget: { jobName: string; namespace: string; priorRunId: string; image: string } | null = null;
|
||||
try {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
const existing = await batchApi.listNamespacedJob({
|
||||
namespace: guardNamespace,
|
||||
labelSelector: `paperclip.io/agent-id=${sanitizedAgentId},paperclip.io/adapter-type=claude_k8s`,
|
||||
@@ -763,13 +790,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
};
|
||||
}
|
||||
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
|
||||
let jobName: string;
|
||||
let namespace: string;
|
||||
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
|
||||
|
||||
// Prepare the prompt bundle (skills + instructions) on the server filesystem.
|
||||
// The K8s Job pod mounts the same PVC at /paperclip, so bundle paths written
|
||||
// here are accessible inside the pod at the identical absolute path.
|
||||
@@ -967,6 +987,11 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
|
||||
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
|
||||
}
|
||||
} finally {
|
||||
// Release the per-agent creation mutex so the next queued execute() call
|
||||
// can proceed with its guard+create phase (FAR-29).
|
||||
_releaseMutex();
|
||||
}
|
||||
|
||||
let stdout = "";
|
||||
let exitCode: number | null = null;
|
||||
|
||||
Reference in New Issue
Block a user