diff --git a/src/server/execute.ts b/src/server/execute.ts index eac81c7..6770c5a 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -305,7 +305,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"), ); if (running.length > 0) { - const names = running.map((j) => j.metadata?.name).join(", "); - await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`); - return { - exitCode: null, - signal: null, - timedOut: false, - errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`, - errorCode: "k8s_concurrent_run_blocked", - }; + // Separate orphaned jobs (from a previous server-side run) from truly + // concurrent jobs (same runId — shouldn't happen but guard defensively). + const orphaned = running.filter( + (j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") !== runId, + ); + const samRun = running.filter( + (j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId, + ); + + if (orphaned.length > 0) { + const orphanNames = orphaned.map((j) => j.metadata?.name).join(", "); + await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`); + for (const j of orphaned) { + const name = j.metadata?.name; + if (name) { + await cleanupJob(guardNamespace, name, onLog, kubeconfigPath); + } + } + } + + // If there are still running Jobs that belong to THIS run (shouldn't happen + // since we haven't created the Job yet), block execution. + if (samRun.length > 0) { + const names = samRun.map((j) => j.metadata?.name).join(", "); + await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this run: ${names}\n`); + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`, + errorCode: "k8s_concurrent_run_blocked", + }; + } } } catch { // If we can't check, proceed — the heartbeat service enforces concurrency too @@ -391,11 +418,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise { const silenceSec = Math.round((Date.now() - lastLogAt) / 1000); void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`); + + // Refresh updatedAt every ~4 minutes (16 ticks × 15s) to stay + // well within the 5-minute reaper staleness window. + keepaliveTick++; + if (ctx.onSpawn && keepaliveTick % 16 === 0) { + void ctx.onSpawn({ pid: -1, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {}); + } }, KEEPALIVE_INTERVAL_MS); const wrappedOnLog: typeof onLog = async (stream, chunk) => { lastLogAt = Date.now();