From df856e6ca537ab88bc8588bf97dfadc0deddd00a Mon Sep 17 00:00:00 2001 From: Test User Date: Thu, 16 Apr 2026 21:48:16 +0000 Subject: [PATCH] fix: clean up orphaned K8s Jobs and refresh updatedAt to prevent UI desync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two root causes behind the "plugin losing sync" issue: 1. After a server restart, the in-memory activeRunExecutions set is lost. The K8s Job keeps running but the reaper marks the server-side run as failed after 5 min (stale updatedAt). Next heartbeat fires a new run, the adapter's concurrency guard blocks it because the old Job is still alive, and this loops indefinitely. Fix: the concurrency guard now compares each running Job's paperclip.io/run-id label against the current runId. Jobs from a previous (dead) run are cleaned up automatically so the new run can proceed. 2. onLog (keepalive) does NOT update the run's updatedAt in the DB — it only writes to the log store and publishes SSE events. In multi-instance deployments, a reaper on instance B can mark a run being executed on instance A as stale after 5 min of no DB updates. Fix: the keepalive timer now calls onSpawn every ~4 min (16 ticks) to refresh updatedAt, staying within the 5-min reaper threshold. Co-Authored-By: Paperclip --- src/server/execute.ts | 70 +++++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 15 deletions(-) diff --git a/src/server/execute.ts b/src/server/execute.ts index eac81c7..6770c5a 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -305,7 +305,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"), ); if (running.length > 0) { - const names = running.map((j) => j.metadata?.name).join(", "); - await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`); - return { - exitCode: null, - signal: null, - timedOut: false, - errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`, - errorCode: "k8s_concurrent_run_blocked", - }; + // Separate orphaned jobs (from a previous server-side run) from truly + // concurrent jobs (same runId — shouldn't happen but guard defensively). + const orphaned = running.filter( + (j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") !== runId, + ); + const samRun = running.filter( + (j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId, + ); + + if (orphaned.length > 0) { + const orphanNames = orphaned.map((j) => j.metadata?.name).join(", "); + await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`); + for (const j of orphaned) { + const name = j.metadata?.name; + if (name) { + await cleanupJob(guardNamespace, name, onLog, kubeconfigPath); + } + } + } + + // If there are still running Jobs that belong to THIS run (shouldn't happen + // since we haven't created the Job yet), block execution. + if (samRun.length > 0) { + const names = samRun.map((j) => j.metadata?.name).join(", "); + await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this run: ${names}\n`); + return { + exitCode: null, + signal: null, + timedOut: false, + errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`, + errorCode: "k8s_concurrent_run_blocked", + }; + } } } catch { // If we can't check, proceed — the heartbeat service enforces concurrency too @@ -391,11 +418,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise { const silenceSec = Math.round((Date.now() - lastLogAt) / 1000); void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`); + + // Refresh updatedAt every ~4 minutes (16 ticks × 15s) to stay + // well within the 5-minute reaper staleness window. + keepaliveTick++; + if (ctx.onSpawn && keepaliveTick % 16 === 0) { + void ctx.onSpawn({ pid: -1, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {}); + } }, KEEPALIVE_INTERVAL_MS); const wrappedOnLog: typeof onLog = async (stream, chunk) => { lastLogAt = Date.now();