fix: clean up orphaned K8s Jobs and refresh updatedAt to prevent UI desync
Two root causes behind the "plugin losing sync" issue: 1. After a server restart, the in-memory activeRunExecutions set is lost. The K8s Job keeps running but the reaper marks the server-side run as failed after 5 min (stale updatedAt). Next heartbeat fires a new run, the adapter's concurrency guard blocks it because the old Job is still alive, and this loops indefinitely. Fix: the concurrency guard now compares each running Job's paperclip.io/run-id label against the current runId. Jobs from a previous (dead) run are cleaned up automatically so the new run can proceed. 2. onLog (keepalive) does NOT update the run's updatedAt in the DB — it only writes to the log store and publishes SSE events. In multi-instance deployments, a reaper on instance B can mark a run being executed on instance A as stale after 5 min of no DB updates. Fix: the keepalive timer now calls onSpawn every ~4 min (16 ticks) to refresh updatedAt, staying within the 5-min reaper threshold. Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
+55
-15
@@ -305,7 +305,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
const retainJobs = asBoolean(config.retainJobs, false);
|
||||
const kubeconfigPath = asString(config.kubeconfig, "") || undefined;
|
||||
|
||||
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session)
|
||||
// Guard: claude_k8s must not run concurrently for the same agent (shared PVC/session).
|
||||
// After a server restart, orphaned K8s Jobs from previous (now-failed) runs may
|
||||
// still be running. We detect those by comparing the Job's run-id label against
|
||||
// the current runId and clean them up so this execution can proceed.
|
||||
const agentId = ctx.agent.id;
|
||||
const selfPod = await getSelfPodInfo(kubeconfigPath);
|
||||
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
|
||||
@@ -319,15 +322,39 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
(j) => !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"),
|
||||
);
|
||||
if (running.length > 0) {
|
||||
const names = running.map((j) => j.metadata?.name).join(", ");
|
||||
await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`,
|
||||
errorCode: "k8s_concurrent_run_blocked",
|
||||
};
|
||||
// Separate orphaned jobs (from a previous server-side run) from truly
|
||||
// concurrent jobs (same runId — shouldn't happen but guard defensively).
|
||||
const orphaned = running.filter(
|
||||
(j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") !== runId,
|
||||
);
|
||||
const samRun = running.filter(
|
||||
(j) => (j.metadata?.labels?.["paperclip.io/run-id"] ?? "") === runId,
|
||||
);
|
||||
|
||||
if (orphaned.length > 0) {
|
||||
const orphanNames = orphaned.map((j) => j.metadata?.name).join(", ");
|
||||
await onLog("stdout", `[paperclip] Cleaning up ${orphaned.length} orphaned K8s Job(s) from previous run(s): ${orphanNames}\n`);
|
||||
for (const j of orphaned) {
|
||||
const name = j.metadata?.name;
|
||||
if (name) {
|
||||
await cleanupJob(guardNamespace, name, onLog, kubeconfigPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If there are still running Jobs that belong to THIS run (shouldn't happen
|
||||
// since we haven't created the Job yet), block execution.
|
||||
if (samRun.length > 0) {
|
||||
const names = samRun.map((j) => j.metadata?.name).join(", ");
|
||||
await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this run: ${names}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`,
|
||||
errorCode: "k8s_concurrent_run_blocked",
|
||||
};
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// If we can't check, proceed — the heartbeat service enforces concurrency too
|
||||
@@ -391,11 +418,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
|
||||
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
|
||||
|
||||
// Notify the server that execution has started. Without this call,
|
||||
// the server has no processStartedAt timestamp for the run, so the
|
||||
// stale-run reaper (reapOrphanedRuns) cannot distinguish a live K8s
|
||||
// job from an orphaned run and may mark it as failed — causing the
|
||||
// UI to show no active runs and triggering duplicate run attempts.
|
||||
// Notify the server that execution has started. This sets
|
||||
// processStartedAt and refreshes updatedAt in the DB, which the
|
||||
// stale-run reaper (reapOrphanedRuns) uses to decide liveness.
|
||||
if (ctx.onSpawn) {
|
||||
await ctx.onSpawn({
|
||||
pid: -1, // no local process; sentinel for K8s Job
|
||||
@@ -424,10 +449,25 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// Keepalive: periodically send a status line via onLog so the
|
||||
// Paperclip server knows the adapter is still alive even when the
|
||||
// pod produces no output (e.g. Claude is in a long thinking phase).
|
||||
//
|
||||
// IMPORTANT: onLog alone does NOT update the run's updatedAt in the
|
||||
// DB — it only appends to the log store and publishes SSE events.
|
||||
// The stale-run reaper checks updatedAt, so we must also call
|
||||
// onSpawn periodically to refresh it. Without this, multi-instance
|
||||
// deployments can reap a live run from another server instance
|
||||
// after the 5-minute staleness window.
|
||||
let lastLogAt = Date.now();
|
||||
let keepaliveTick = 0;
|
||||
keepaliveTimer = setInterval(() => {
|
||||
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
|
||||
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`);
|
||||
|
||||
// Refresh updatedAt every ~4 minutes (16 ticks × 15s) to stay
|
||||
// well within the 5-minute reaper staleness window.
|
||||
keepaliveTick++;
|
||||
if (ctx.onSpawn && keepaliveTick % 16 === 0) {
|
||||
void ctx.onSpawn({ pid: -1, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
|
||||
}
|
||||
}, KEEPALIVE_INTERVAL_MS);
|
||||
const wrappedOnLog: typeof onLog = async (stream, chunk) => {
|
||||
lastLogAt = Date.now();
|
||||
|
||||
Reference in New Issue
Block a user