fix: wait for concurrent job to finish instead of returning permanent blocked error (FAR-61)

When multiple tasks are assigned simultaneously, only one K8s job can run
at a time (shared PVC/session guard). Previously, all other tasks received
k8s_concurrent_run_blocked immediately and stayed blocked forever.

Now the guard retries once: wait for all blocking jobs to complete via
waitForJobCompletion, then re-check before proceeding to create a new job.
If the re-check still shows a running job, the error is returned as before.

The agentCreationMutex already serializes guard-check + job-create, so
tasks naturally queue up and execute one at a time without concurrent jobs.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-04-25 11:10:27 +00:00
parent 2bd8107f1d
commit 80d18005f9
4 changed files with 128 additions and 61 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{ {
"name": "paperclip-adapter-opencode-k8s", "name": "paperclip-adapter-opencode-k8s",
"version": "0.1.18", "version": "0.1.23",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "paperclip-adapter-opencode-k8s", "name": "paperclip-adapter-opencode-k8s",
"version": "0.1.18", "version": "0.1.23",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@kubernetes/client-node": "^1.0.0", "@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "paperclip-adapter-opencode-k8s", "name": "paperclip-adapter-opencode-k8s",
"version": "0.1.22", "version": "0.1.23",
"description": "Paperclip adapter plugin that runs OpenCode agents as Kubernetes Jobs", "description": "Paperclip adapter plugin that runs OpenCode agents as Kubernetes Jobs",
"license": "MIT", "license": "MIT",
"type": "module", "type": "module",
+47 -5
View File
@@ -288,7 +288,7 @@ describe("execute — concurrency guard", () => {
}); });
describe("execute — mutex serialization", () => { describe("execute — mutex serialization", () => {
it("serializes concurrent execute() calls for the same agent: second call sees first job", async () => { it("second call waits for first job to finish then creates its own job (no permanent block)", async () => {
// First call's createNamespacedJob blocks until we release it. // First call's createNamespacedJob blocks until we release it.
let releaseFn!: () => void; let releaseFn!: () => void;
const releasePromise = new Promise<void>((resolve) => { releaseFn = resolve; }); const releasePromise = new Promise<void>((resolve) => { releaseFn = resolve; });
@@ -299,6 +299,7 @@ describe("execute — mutex serialization", () => {
return { metadata: { uid: "uid-1" } }; return { metadata: { uid: "uid-1" } };
}); });
// First guard call: no running jobs; second guard call: sees the running job. // First guard call: no running jobs; second guard call: sees the running job.
// Third call (re-check after wait): default [] from makeBatchApi.
batchApi.listNamespacedJob batchApi.listNamespacedJob
.mockResolvedValueOnce({ items: [] }) .mockResolvedValueOnce({ items: [] })
.mockResolvedValueOnce({ .mockResolvedValueOnce({
@@ -307,6 +308,25 @@ describe("execute — mutex serialization", () => {
vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType<typeof getBatchApi>); vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType<typeof getBatchApi>);
// Extended coreApi that satisfies both execute() calls' pod queries:
// waitForPod needs phase=Running; getPodTerminatedInfo needs terminated.exitCode.
const coreApi = {
listNamespacedPod: vi.fn().mockResolvedValue({
items: [{
metadata: { name: POD_NAME },
status: {
phase: "Running",
containerStatuses: [{ name: "opencode", state: { terminated: { exitCode: 0 } } }],
},
}],
}),
readNamespacedPodLog: vi.fn().mockResolvedValue(HAPPY_JSONL),
createNamespacedSecret: vi.fn().mockResolvedValue({}),
deleteNamespacedSecret: vi.fn().mockResolvedValue({}),
patchNamespacedSecret: vi.fn().mockResolvedValue({}),
};
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx(); const ctx = makeCtx();
const first = execute(ctx); const first = execute(ctx);
// Second call races for the mutex; will wait until first releases. // Second call races for the mutex; will wait until first releases.
@@ -317,10 +337,32 @@ describe("execute — mutex serialization", () => {
const [, secondResult] = await Promise.all([first, second]); const [, secondResult] = await Promise.all([first, second]);
// Only one job was created (mutex prevented a second concurrent creation). // Second call should NOT be permanently blocked — it waited and created its own job.
expect(batchApi.createNamespacedJob).toHaveBeenCalledTimes(1); expect(secondResult.errorCode).toBeUndefined();
// Second call found the running job and was blocked. expect(secondResult.exitCode).toBe(0);
expect(secondResult.errorCode).toBe("k8s_concurrent_run_blocked"); // Both tasks created jobs (sequential, not concurrent).
expect(batchApi.createNamespacedJob).toHaveBeenCalledTimes(2);
});
it("returns k8s_concurrent_run_blocked if job is still running after wait", async () => {
// Simulate a job that never completes from listNamespacedJob's perspective
// (always returns running), even though readNamespacedJob says Complete.
// This covers the re-check failing after the wait (e.g. a new job appeared).
const batchApi = makeBatchApi([
{
metadata: { name: "persistent-job" },
status: { conditions: [] }, // always appears running in list
},
]);
vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType<typeof getBatchApi>);
const ctx = makeCtx();
const result = await execute(ctx);
// Waited for the job (readNamespacedJob returned Complete), re-checked (still
// appears running in list), returned blocked.
expect(result.errorCode).toBe("k8s_concurrent_run_blocked");
expect(batchApi.createNamespacedJob).not.toHaveBeenCalled();
}); });
}); });
+25
View File
@@ -855,6 +855,11 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
try { try {
// Guard: single concurrency per agent (shared PVC/session) — fail-closed. // Guard: single concurrency per agent (shared PVC/session) — fail-closed.
// When a concurrent job is detected, wait for it to finish and retry once rather
// than returning k8s_concurrent_run_blocked immediately (which caused permanent
// blocked state for all but the first task in a simultaneous batch assignment).
let waitedForConcurrent = false;
while (true) {
try { try {
const batchApi = getBatchApi(kubeconfigPath); const batchApi = getBatchApi(kubeconfigPath);
const existing = await batchApi.listNamespacedJob({ const existing = await batchApi.listNamespacedJob({
@@ -873,6 +878,8 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const otherJobs = running.filter((j) => !sameTaskJobs.includes(j)); const otherJobs = running.filter((j) => !sameTaskJobs.includes(j));
if (otherJobs.length > 0) { if (otherJobs.length > 0) {
if (waitedForConcurrent) {
// Already waited once — give up to avoid an infinite loop.
const names = otherJobs.map((j) => j.metadata?.name).join(", "); const names = otherJobs.map((j) => j.metadata?.name).join(", ");
await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`); await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`);
return { return {
@@ -883,6 +890,22 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
errorCode: "k8s_concurrent_run_blocked", errorCode: "k8s_concurrent_run_blocked",
}; };
} }
const names = otherJobs.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Waiting for concurrent Job(s) to finish before starting: ${names}\n`);
// Wait up to the configured job timeout (+ grace + buffer); for unlimited jobs
// cap at 1 hour so we don't block the mutex indefinitely.
const concurrentWaitMs = timeoutSec > 0
? (timeoutSec + graceSec + 120) * 1000
: 60 * 60_000;
await Promise.all(
otherJobs.map((j) =>
waitForJobCompletion(guardNamespace, j.metadata?.name ?? "", concurrentWaitMs, kubeconfigPath).catch(() => {}),
),
);
await onLog("stdout", `[paperclip] Concurrent Job(s) done — retrying guard check...\n`);
waitedForConcurrent = true;
continue;
}
if (sameTaskJobs.length > 0) { if (sameTaskJobs.length > 0) {
const orphanJob = sameTaskJobs[0]; const orphanJob = sameTaskJobs[0];
@@ -913,6 +936,8 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
errorCode: "k8s_concurrency_guard_unreachable", errorCode: "k8s_concurrency_guard_unreachable",
}; };
} }
break; // no blocking jobs — proceed to job creation
}
// Read agent instructions file (instructionsFilePath config field → system prompt prepend) // Read agent instructions file (instructionsFilePath config field → system prompt prepend)
const instructionsFilePath = asString(config.instructionsFilePath, "").trim(); const instructionsFilePath = asString(config.instructionsFilePath, "").trim();