diff --git a/package-lock.json b/package-lock.json index ae6bde7..984b38c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "paperclip-adapter-opencode-k8s", - "version": "0.1.18", + "version": "0.1.23", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "paperclip-adapter-opencode-k8s", - "version": "0.1.18", + "version": "0.1.23", "license": "MIT", "dependencies": { "@kubernetes/client-node": "^1.0.0", diff --git a/package.json b/package.json index 64f86ad..955021d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "paperclip-adapter-opencode-k8s", - "version": "0.1.22", + "version": "0.1.23", "description": "Paperclip adapter plugin that runs OpenCode agents as Kubernetes Jobs", "license": "MIT", "type": "module", diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index 80a7ebc..ecbc827 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -288,7 +288,7 @@ describe("execute — concurrency guard", () => { }); describe("execute — mutex serialization", () => { - it("serializes concurrent execute() calls for the same agent: second call sees first job", async () => { + it("second call waits for first job to finish then creates its own job (no permanent block)", async () => { // First call's createNamespacedJob blocks until we release it. let releaseFn!: () => void; const releasePromise = new Promise((resolve) => { releaseFn = resolve; }); @@ -299,6 +299,7 @@ describe("execute — mutex serialization", () => { return { metadata: { uid: "uid-1" } }; }); // First guard call: no running jobs; second guard call: sees the running job. + // Third call (re-check after wait): default [] from makeBatchApi. batchApi.listNamespacedJob .mockResolvedValueOnce({ items: [] }) .mockResolvedValueOnce({ @@ -307,6 +308,25 @@ describe("execute — mutex serialization", () => { vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType); + // Extended coreApi that satisfies both execute() calls' pod queries: + // waitForPod needs phase=Running; getPodTerminatedInfo needs terminated.exitCode. + const coreApi = { + listNamespacedPod: vi.fn().mockResolvedValue({ + items: [{ + metadata: { name: POD_NAME }, + status: { + phase: "Running", + containerStatuses: [{ name: "opencode", state: { terminated: { exitCode: 0 } } }], + }, + }], + }), + readNamespacedPodLog: vi.fn().mockResolvedValue(HAPPY_JSONL), + createNamespacedSecret: vi.fn().mockResolvedValue({}), + deleteNamespacedSecret: vi.fn().mockResolvedValue({}), + patchNamespacedSecret: vi.fn().mockResolvedValue({}), + }; + vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType); + const ctx = makeCtx(); const first = execute(ctx); // Second call races for the mutex; will wait until first releases. @@ -317,10 +337,32 @@ describe("execute — mutex serialization", () => { const [, secondResult] = await Promise.all([first, second]); - // Only one job was created (mutex prevented a second concurrent creation). - expect(batchApi.createNamespacedJob).toHaveBeenCalledTimes(1); - // Second call found the running job and was blocked. - expect(secondResult.errorCode).toBe("k8s_concurrent_run_blocked"); + // Second call should NOT be permanently blocked — it waited and created its own job. + expect(secondResult.errorCode).toBeUndefined(); + expect(secondResult.exitCode).toBe(0); + // Both tasks created jobs (sequential, not concurrent). + expect(batchApi.createNamespacedJob).toHaveBeenCalledTimes(2); + }); + + it("returns k8s_concurrent_run_blocked if job is still running after wait", async () => { + // Simulate a job that never completes from listNamespacedJob's perspective + // (always returns running), even though readNamespacedJob says Complete. + // This covers the re-check failing after the wait (e.g. a new job appeared). + const batchApi = makeBatchApi([ + { + metadata: { name: "persistent-job" }, + status: { conditions: [] }, // always appears running in list + }, + ]); + vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType); + + const ctx = makeCtx(); + const result = await execute(ctx); + + // Waited for the job (readNamespacedJob returned Complete), re-checked (still + // appears running in list), returned blocked. + expect(result.errorCode).toBe("k8s_concurrent_run_blocked"); + expect(batchApi.createNamespacedJob).not.toHaveBeenCalled(); }); }); diff --git a/src/server/execute.ts b/src/server/execute.ts index 09b68e7..10b344a 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -855,63 +855,88 @@ export async function execute(ctx: AdapterExecutionContext): Promise !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"), - ); - if (running.length > 0) { - // Separate Jobs matching the current task (orphaned from a prior server instance) - // from Jobs belonging to a different concurrent task. - const sameTaskJobs = taskId - ? running.filter((j) => j.metadata?.labels?.["paperclip.io/task-id"] === taskId) - : []; - const otherJobs = running.filter((j) => !sameTaskJobs.includes(j)); + // When a concurrent job is detected, wait for it to finish and retry once rather + // than returning k8s_concurrent_run_blocked immediately (which caused permanent + // blocked state for all but the first task in a simultaneous batch assignment). + let waitedForConcurrent = false; + while (true) { + try { + const batchApi = getBatchApi(kubeconfigPath); + const existing = await batchApi.listNamespacedJob({ + namespace: guardNamespace, + labelSelector: `paperclip.io/agent-id=${agentId},paperclip.io/adapter-type=opencode_k8s`, + }); + const running = existing.items.filter( + (j) => !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"), + ); + if (running.length > 0) { + // Separate Jobs matching the current task (orphaned from a prior server instance) + // from Jobs belonging to a different concurrent task. + const sameTaskJobs = taskId + ? running.filter((j) => j.metadata?.labels?.["paperclip.io/task-id"] === taskId) + : []; + const otherJobs = running.filter((j) => !sameTaskJobs.includes(j)); - if (otherJobs.length > 0) { - const names = otherJobs.map((j) => j.metadata?.name).join(", "); - await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`); - return { - exitCode: null, - signal: null, - timedOut: false, - errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`, - errorCode: "k8s_concurrent_run_blocked", - }; - } - - if (sameTaskJobs.length > 0) { - const orphanJob = sameTaskJobs[0]; - const orphanJobName = orphanJob.metadata?.name ?? ""; - if (reattachOrphanedJobs) { - await onLog("stdout", `[paperclip] Reattaching to orphaned Job ${orphanJobName} from prior server instance (task: ${taskId})...\n`); - activeJobs.set(orphanJobName, { namespace: guardNamespace, kubeconfigPath }); - return streamAndAwaitJob(ctx, orphanJobName, guardNamespace, timeoutSec, graceSec, kubeconfigPath, retainJobs); + if (otherJobs.length > 0) { + if (waitedForConcurrent) { + // Already waited once — give up to avoid an infinite loop. + const names = otherJobs.map((j) => j.metadata?.name).join(", "); + await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`); + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`, + errorCode: "k8s_concurrent_run_blocked", + }; + } + const names = otherJobs.map((j) => j.metadata?.name).join(", "); + await onLog("stdout", `[paperclip] Waiting for concurrent Job(s) to finish before starting: ${names}\n`); + // Wait up to the configured job timeout (+ grace + buffer); for unlimited jobs + // cap at 1 hour so we don't block the mutex indefinitely. + const concurrentWaitMs = timeoutSec > 0 + ? (timeoutSec + graceSec + 120) * 1000 + : 60 * 60_000; + await Promise.all( + otherJobs.map((j) => + waitForJobCompletion(guardNamespace, j.metadata?.name ?? "", concurrentWaitMs, kubeconfigPath).catch(() => {}), + ), + ); + await onLog("stdout", `[paperclip] Concurrent Job(s) done — retrying guard check...\n`); + waitedForConcurrent = true; + continue; + } + + if (sameTaskJobs.length > 0) { + const orphanJob = sameTaskJobs[0]; + const orphanJobName = orphanJob.metadata?.name ?? ""; + if (reattachOrphanedJobs) { + await onLog("stdout", `[paperclip] Reattaching to orphaned Job ${orphanJobName} from prior server instance (task: ${taskId})...\n`); + activeJobs.set(orphanJobName, { namespace: guardNamespace, kubeconfigPath }); + return streamAndAwaitJob(ctx, orphanJobName, guardNamespace, timeoutSec, graceSec, kubeconfigPath, retainJobs); + } + await onLog("stderr", `[paperclip] Orphaned Job ${orphanJobName} found for this task but reattachOrphanedJobs is disabled.\n`); + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: `Orphaned Job ${orphanJobName} is still running (reattachOrphanedJobs disabled)`, + errorCode: "k8s_concurrent_run_blocked", + }; } - await onLog("stderr", `[paperclip] Orphaned Job ${orphanJobName} found for this task but reattachOrphanedJobs is disabled.\n`); - return { - exitCode: null, - signal: null, - timedOut: false, - errorMessage: `Orphaned Job ${orphanJobName} is still running (reattachOrphanedJobs disabled)`, - errorCode: "k8s_concurrent_run_blocked", - }; } + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + await onLog("stderr", `[paperclip] Concurrency guard unreachable — cannot list Jobs: ${msg}\n`); + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: `Concurrency guard unreachable: ${msg}`, + errorCode: "k8s_concurrency_guard_unreachable", + }; } - } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - await onLog("stderr", `[paperclip] Concurrency guard unreachable — cannot list Jobs: ${msg}\n`); - return { - exitCode: null, - signal: null, - timedOut: false, - errorMessage: `Concurrency guard unreachable: ${msg}`, - errorCode: "k8s_concurrency_guard_unreachable", - }; + break; // no blocking jobs — proceed to job creation } // Read agent instructions file (instructionsFilePath config field → system prompt prepend)