import { randomUUID } from "node:crypto"; import { spawn, type ChildProcess } from "node:child_process"; import { and, eq, or, inArray } from "drizzle-orm"; import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest"; import { activityLog, agents, agentRuntimeState, agentWakeupRequests, budgetPolicies, companySkills, companies, costEvents, documentAnnotationAnchorSnapshots, documentAnnotationComments, documentAnnotationThreads, createDb, documentRevisions, documents, environmentLeases, environments, executionWorkspaces, heartbeatRunEvents, heartbeatRuns, issueComments, issueDocuments, issuePlanDecompositions, issueRecoveryActions, issueRelations, issueThreadInteractions, issueTreeHoldMembers, issueTreeHolds, issueWorkProducts, issues, workspaceOperations, } from "@paperclipai/db"; import { getEmbeddedPostgresTestSupport, startEmbeddedPostgresTestDatabase, } from "./helpers/embedded-postgres.js"; import { runningProcesses } from "../adapters/index.ts"; const mockTelemetryClient = vi.hoisted(() => ({ track: vi.fn() })); const mockTrackAgentFirstHeartbeat = vi.hoisted(() => vi.fn()); const mockAdapterExecute = vi.hoisted(() => vi.fn(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Recovered stranded heartbeat work.", provider: "test", model: "test-model", })), ); vi.mock("../telemetry.ts", () => ({ getTelemetryClient: () => mockTelemetryClient, })); vi.mock("@paperclipai/shared/telemetry", async () => { const actual = await vi.importActual( "@paperclipai/shared/telemetry", ); return { ...actual, trackAgentFirstHeartbeat: mockTrackAgentFirstHeartbeat, }; }); vi.mock("../adapters/index.ts", async () => { const actual = await vi.importActual("../adapters/index.ts"); return { ...actual, getServerAdapter: vi.fn(() => ({ supportsLocalAgentJwt: false, execute: mockAdapterExecute, })), }; }); import { heartbeatService, redactDetectedSuccessfulRunProgressSummaryForBoard, } from "../services/heartbeat.ts"; import { SUCCESSFUL_RUN_HANDOFF_EXHAUSTED_NOTICE_BODY, SUCCESSFUL_RUN_HANDOFF_REQUIRED_NOTICE_BODY, SUCCESSFUL_RUN_MISSING_STATE_REASON, } from "../services/recovery/index.ts"; const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; if (!embeddedPostgresSupport.supported) { console.warn( `Skipping embedded Postgres heartbeat recovery tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, ); } function spawnAliveProcess() { return spawn(process.execPath, ["-e", "setInterval(() => {}, 1000)"], { stdio: "ignore", }); } function isPidAlive(pid: number | null | undefined) { if (typeof pid !== "number" || !Number.isInteger(pid) || pid <= 0) return false; try { process.kill(pid, 0); return true; } catch { return false; } } async function waitForPidExit(pid: number, timeoutMs = 2_000) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (!isPidAlive(pid)) return true; await new Promise((resolve) => setTimeout(resolve, 50)); } return !isPidAlive(pid); } async function waitForRunToSettle( heartbeat: ReturnType, runId: string, timeoutMs = 3_000, ) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { const run = await heartbeat.getRun(runId); if (!run || (run.status !== "queued" && run.status !== "running")) return run; await new Promise((resolve) => setTimeout(resolve, 50)); } return heartbeat.getRun(runId); } async function waitForValue( read: () => Promise, timeoutMs = 3_000, ) { const deadline = Date.now() + timeoutMs; let latest: T | null | undefined = null; while (Date.now() < deadline) { latest = await read(); if (latest) return latest; await new Promise((resolve) => setTimeout(resolve, 50)); } return latest ?? null; } async function waitForHeartbeatIdle( db: ReturnType, timeoutMs = 3_000, ) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { const runs = await db .select({ status: heartbeatRuns.status, }) .from(heartbeatRuns); if (!runs.some((run) => run.status === "queued" || run.status === "running")) { return; } await new Promise((resolve) => setTimeout(resolve, 50)); } } async function cancelActiveRunsForCleanup( db: ReturnType, timeoutMs = 3_000, ) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { const activeRuns = await db .select({ id: heartbeatRuns.id, wakeupRequestId: heartbeatRuns.wakeupRequestId, }) .from(heartbeatRuns) .where( or( eq(heartbeatRuns.status, "queued"), eq(heartbeatRuns.status, "running"), ), ); if (activeRuns.length === 0) return; const now = new Date(); const runIds = activeRuns.map((run) => run.id); const wakeupRequestIds = activeRuns .map((run) => run.wakeupRequestId) .filter((value): value is string => typeof value === "string" && value.length > 0); await db .update(heartbeatRuns) .set({ status: "cancelled", finishedAt: now, updatedAt: now, errorCode: "test_cleanup", error: "Cancelled by heartbeat-process-recovery test cleanup", processPid: null, processGroupId: null, }) .where(inArray(heartbeatRuns.id, runIds)); if (wakeupRequestIds.length > 0) { await db .update(agentWakeupRequests) .set({ status: "cancelled", finishedAt: now, error: "Cancelled by heartbeat-process-recovery test cleanup", }) .where(inArray(agentWakeupRequests.id, wakeupRequestIds)); } await new Promise((resolve) => setTimeout(resolve, 50)); } } async function spawnOrphanedProcessGroup() { const leader = spawn( process.execPath, [ "-e", [ "const { spawn } = require('node:child_process');", "const child = spawn(process.execPath, ['-e', 'setInterval(() => {}, 1000)'], { stdio: 'ignore' });", "process.stdout.write(String(child.pid));", "setTimeout(() => process.exit(0), 25);", ].join(" "), ], { detached: true, stdio: ["ignore", "pipe", "ignore"], }, ); let stdout = ""; leader.stdout?.on("data", (chunk) => { stdout += String(chunk); }); await new Promise((resolve, reject) => { leader.once("error", reject); leader.once("exit", () => resolve()); }); const descendantPid = Number.parseInt(stdout.trim(), 10); if (!Number.isInteger(descendantPid) || descendantPid <= 0) { throw new Error(`Failed to capture orphaned descendant pid from detached process group: ${stdout}`); } return { processPid: leader.pid ?? null, processGroupId: leader.pid ?? null, descendantPid, }; } describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { let db!: ReturnType; let tempDb: Awaited> | null = null; const childProcesses = new Set(); const cleanupPids = new Set(); beforeAll(async () => { tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-recovery-"); db = createDb(tempDb.connectionString); }, 20_000); afterEach(async () => { vi.clearAllMocks(); mockAdapterExecute.mockImplementation(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Recovered stranded heartbeat work.", provider: "test", model: "test-model", })); runningProcesses.clear(); for (const child of childProcesses) { child.kill("SIGKILL"); } childProcesses.clear(); for (const pid of cleanupPids) { try { process.kill(pid, "SIGKILL"); } catch { // Ignore already-dead cleanup targets. } } cleanupPids.clear(); await cancelActiveRunsForCleanup(db, 5_000); let idlePolls = 0; for (let attempt = 0; attempt < 100; attempt += 1) { const runs = await db .select({ status: heartbeatRuns.status, processPid: heartbeatRuns.processPid, processGroupId: heartbeatRuns.processGroupId, }) .from(heartbeatRuns); const managedExecutionStillActive = runs.some( (run) => (run.status === "queued" || run.status === "running") && !run.processPid && !run.processGroupId, ); if (!managedExecutionStillActive) { idlePolls += 1; if (idlePolls >= 3) break; } else { idlePolls = 0; } await new Promise((resolve) => setTimeout(resolve, 50)); } await new Promise((resolve) => setTimeout(resolve, 50)); await waitForHeartbeatIdle(db, 5_000); await new Promise((resolve) => setTimeout(resolve, 100)); await db.delete(activityLog); await db.delete(agentRuntimeState); await db.delete(companySkills); await db.delete(costEvents); await db.delete(environmentLeases); await db.delete(environments); await db.delete(issuePlanDecompositions); await db.delete(issueThreadInteractions); await db.delete(documentAnnotationComments); await db.delete(documentAnnotationAnchorSnapshots); await db.delete(documentAnnotationThreads); await db.delete(issueWorkProducts); await db.delete(issueComments); await db.delete(issueDocuments); await db.delete(documentRevisions); await db.delete(documents); await db.delete(issueRelations); await db.delete(issueRecoveryActions); await db.delete(issueTreeHoldMembers); await db.delete(issueTreeHolds); for (let attempt = 0; attempt < 5; attempt += 1) { await db.delete(issueComments); await db.delete(issueDocuments); try { await db.delete(issues); break; } catch (error) { if (attempt === 4) throw error; await new Promise((resolve) => setTimeout(resolve, 50)); } } for (let attempt = 0; attempt < 5; attempt += 1) { await db.delete(activityLog); await db.delete(heartbeatRunEvents); try { await db.delete(heartbeatRuns); break; } catch (error) { if (attempt === 4) throw error; await new Promise((resolve) => setTimeout(resolve, 50)); } } await db.delete(agentWakeupRequests); await db.delete(budgetPolicies); for (let attempt = 0; attempt < 5; attempt += 1) { await db.delete(agentRuntimeState); try { await db.delete(agents); break; } catch (error) { if (attempt === 4) throw error; await new Promise((resolve) => setTimeout(resolve, 50)); } } for (let attempt = 0; attempt < 5; attempt += 1) { await db.delete(companySkills); await db.delete(workspaceOperations); await db.delete(executionWorkspaces); await db.delete(issuePlanDecompositions); await db.delete(issueThreadInteractions); await db.delete(documentAnnotationComments); await db.delete(documentAnnotationAnchorSnapshots); await db.delete(documentAnnotationThreads); await db.delete(issueDocuments); await db.delete(documentRevisions); await db.delete(documents); try { await db.delete(companies); break; } catch (error) { if (attempt === 4) throw error; await new Promise((resolve) => setTimeout(resolve, 50)); } } }); afterAll(async () => { for (const child of childProcesses) { child.kill("SIGKILL"); } childProcesses.clear(); for (const pid of cleanupPids) { try { process.kill(pid, "SIGKILL"); } catch { // Ignore already-dead cleanup targets. } } cleanupPids.clear(); runningProcesses.clear(); await tempDb?.cleanup(); }); async function seedRunFixture(input?: { adapterType?: string; agentStatus?: "paused" | "idle" | "running"; runStatus?: "running" | "queued" | "failed"; processPid?: number | null; processGroupId?: number | null; processLossRetryCount?: number; includeIssue?: boolean; runErrorCode?: string | null; runError?: string | null; contextSnapshot?: Record; }) { const companyId = randomUUID(); const agentId = randomUUID(); const runId = randomUUID(); const wakeupRequestId = randomUUID(); const issueId = randomUUID(); const now = new Date("2026-03-19T00:00:00.000Z"); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: input?.agentStatus ?? "paused", adapterType: input?.adapterType ?? "codex_local", adapterConfig: {}, runtimeConfig: {}, permissions: {}, }); await db.insert(agentWakeupRequests).values({ id: wakeupRequestId, companyId, agentId, source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: input?.includeIssue === false ? {} : { issueId }, status: "claimed", runId, claimedAt: now, }); await db.insert(heartbeatRuns).values({ id: runId, companyId, agentId, invocationSource: "assignment", triggerDetail: "system", status: input?.runStatus ?? "running", wakeupRequestId, contextSnapshot: input?.includeIssue === false ? input?.contextSnapshot ?? {} : { ...(input?.contextSnapshot ?? {}), issueId }, processPid: input?.processPid ?? null, processGroupId: input?.processGroupId ?? null, processLossRetryCount: input?.processLossRetryCount ?? 0, errorCode: input?.runErrorCode ?? null, error: input?.runError ?? null, startedAt: now, updatedAt: new Date("2026-03-19T00:00:00.000Z"), }); if (input?.includeIssue !== false) { await db.insert(issues).values({ id: issueId, companyId, title: "Recover local adapter after lost process", status: "in_progress", priority: "medium", assigneeAgentId: agentId, checkoutRunId: runId, executionRunId: runId, issueNumber: 1, identifier: `${issuePrefix}-1`, }); } return { companyId, agentId, runId, wakeupRequestId, issueId }; } async function seedEnvironmentLeaseFixture(input: { companyId: string; runId: string; issueId: string; provider?: string; }) { const environmentId = randomUUID(); const leaseId = randomUUID(); const now = new Date("2026-03-19T00:00:00.000Z"); await db.insert(environments).values({ id: environmentId, companyId: input.companyId, name: "Local test environment", driver: "local", status: "active", config: {}, metadata: null, }); await db.insert(environmentLeases).values({ id: leaseId, companyId: input.companyId, environmentId, issueId: input.issueId, heartbeatRunId: input.runId, status: "active", leasePolicy: "ephemeral", provider: input.provider ?? "local", providerLeaseId: null, acquiredAt: now, lastUsedAt: now, metadata: { driver: "local", }, createdAt: now, updatedAt: now, }); return { environmentId, leaseId }; } async function seedStrandedIssueFixture(input: { status: "todo" | "in_progress"; runStatus: "failed" | "timed_out" | "cancelled" | "succeeded"; retryReason?: "assignment_recovery" | "issue_continuation_needed" | null; runSource?: string | null; assignToUser?: boolean; activePauseHold?: boolean; livenessState?: "completed" | "advanced" | "plan_only" | "empty_response" | "blocked" | "failed" | "needs_followup" | null; runErrorCode?: string | null; runError?: string | null; }) { const companyId = randomUUID(); const agentId = randomUUID(); const runId = randomUUID(); const wakeupRequestId = randomUUID(); const rootIssueId = randomUUID(); const issueId = randomUUID(); const now = new Date("2026-03-19T00:00:00.000Z"); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: "idle", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: {}, permissions: {}, }); await db.insert(agentWakeupRequests).values({ id: wakeupRequestId, companyId, agentId, source: "assignment", triggerDetail: "system", reason: input.retryReason === "assignment_recovery" ? "issue_assignment_recovery" : "issue_assigned", payload: { issueId }, status: input.runStatus === "cancelled" ? "cancelled" : "failed", runId, claimedAt: now, finishedAt: new Date("2026-03-19T00:05:00.000Z"), error: input.runStatus === "succeeded" ? null : ("runError" in input ? input.runError : "run failed before issue advanced"), }); await db.insert(heartbeatRuns).values({ id: runId, companyId, agentId, invocationSource: "assignment", triggerDetail: "system", status: input.runStatus, wakeupRequestId, contextSnapshot: { issueId, taskId: issueId, wakeReason: input.retryReason === "assignment_recovery" ? "issue_assignment_recovery" : input.retryReason ?? "issue_assigned", ...(input.retryReason ? { retryReason: input.retryReason } : {}), ...(input.runSource ? { source: input.runSource } : {}), }, startedAt: now, finishedAt: new Date("2026-03-19T00:05:00.000Z"), updatedAt: new Date("2026-03-19T00:05:00.000Z"), errorCode: input.runStatus === "succeeded" ? null : ("runErrorCode" in input ? input.runErrorCode : "process_lost"), error: input.runStatus === "succeeded" ? null : ("runError" in input ? input.runError : "run failed before issue advanced"), livenessState: input.livenessState ?? null, }); await db.insert(issues).values([ ...(input.activePauseHold ? [{ id: rootIssueId, companyId, title: "Paused recovery root", status: "todo", priority: "medium", issueNumber: 1, identifier: `${issuePrefix}-1`, }] : []), { id: issueId, companyId, parentId: input.activePauseHold ? rootIssueId : null, title: "Recover stranded assigned work", status: input.status, priority: "medium", assigneeAgentId: input.assignToUser ? null : agentId, assigneeUserId: input.assignToUser ? "user-1" : null, checkoutRunId: input.status === "in_progress" ? runId : null, executionRunId: null, issueNumber: input.activePauseHold ? 2 : 1, identifier: `${issuePrefix}-${input.activePauseHold ? 2 : 1}`, startedAt: input.status === "in_progress" ? now : null, }, ]); if (input.activePauseHold) { await db.insert(issueTreeHolds).values({ companyId, rootIssueId, mode: "pause", status: "active", reason: "pause recovery subtree", releasePolicy: { strategy: "manual" }, }); } return { companyId, agentId, runId, wakeupRequestId, issueId, rootIssueId }; } async function seedAssignedTodoNoRunFixture(input?: { agentStatus?: "paused" | "idle" | "running"; }) { const companyId = randomUUID(); const agentId = randomUUID(); const issueId = randomUUID(); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: input?.agentStatus ?? "idle", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: {}, permissions: {}, }); await db.insert(issues).values({ id: issueId, companyId, title: "Assigned todo work that never received a heartbeat", status: "todo", priority: "medium", assigneeAgentId: agentId, assigneeUserId: null, issueNumber: 1, identifier: `${issuePrefix}-1`, }); return { companyId, agentId, issueId }; } async function expectSourceScopedStrandedRecoveryAction(input: { companyId: string; agentId: string; issueId: string; runId: string; previousStatus: "todo" | "in_progress"; retryReason?: "assignment_recovery" | "issue_continuation_needed" | null; cause?: string; kind?: string; }) { const action = await waitForValue(async () => db.select().from(issueRecoveryActions).where( and( eq(issueRecoveryActions.companyId, input.companyId), eq(issueRecoveryActions.sourceIssueId, input.issueId), ), ).then((rows) => rows[0] ?? null), ); if (!action) throw new Error("Expected source-scoped stranded recovery action to be created"); expect(action).toMatchObject({ companyId: input.companyId, sourceIssueId: input.issueId, recoveryIssueId: null, kind: input.kind ?? "stranded_assigned_issue", status: "active", ownerType: "agent", ownerAgentId: input.agentId, previousOwnerAgentId: input.agentId, returnOwnerAgentId: input.agentId, cause: input.cause ?? "stranded_assigned_issue", attemptCount: 1, maxAttempts: null, }); expect(action.evidence).toMatchObject({ sourceIssueId: input.issueId, previousStatus: input.previousStatus, latestRunId: input.runId, retryReason: input.retryReason ?? null, }); expect(action.nextAction).toContain( input.kind === "missing_disposition" ? "valid issue disposition" : "Restore a live execution path", ); const recoveryIssues = await db .select() .from(issues) .where(and( eq(issues.companyId, input.companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, input.issueId), )); expect(recoveryIssues).toHaveLength(0); const recoveryWakeup = await waitForValue(async () => { const wakeups = await db .select() .from(agentWakeupRequests) .where(eq(agentWakeupRequests.agentId, input.agentId)); return wakeups.find((wakeup) => { const payload = wakeup.payload as Record | null; return payload?.issueId === input.issueId && payload?.sourceIssueId === input.issueId && payload?.recoveryActionId === action.id && payload?.strandedRunId === input.runId; }) ?? null; }); expect(recoveryWakeup).toMatchObject({ companyId: input.companyId, reason: "source_scoped_recovery_action", source: "assignment", payload: expect.objectContaining({ modelProfile: "cheap", allowDeliverableWork: false, allowDocumentUpdates: false, resumeRequiresNormalModel: true, }), }); const recoveryRun = recoveryWakeup?.runId ? await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.id, recoveryWakeup.runId)) .then((rows) => rows[0] ?? null) : null; expect(recoveryRun?.contextSnapshot).toMatchObject({ issueId: input.issueId, taskId: input.issueId, source: "issue_recovery_action", recoveryActionId: action.id, sourceIssueId: input.issueId, strandedRunId: input.runId, modelProfile: "cheap", allowDeliverableWork: false, allowDocumentUpdates: false, resumeRequiresNormalModel: true, }); await waitForHeartbeatIdle(db); const sourceIssue = await db .select() .from(issues) .where(eq(issues.id, input.issueId)) .then((rows) => rows[0] ?? null); expect(sourceIssue?.status).toBe("blocked"); return action; } async function sourceBlockerIssueIds(companyId: string, sourceIssueId: string) { return db .select({ blockerIssueId: issueRelations.issueId }) .from(issueRelations) .where( and( eq(issueRelations.companyId, companyId), eq(issueRelations.relatedIssueId, sourceIssueId), eq(issueRelations.type, "blocks"), ), ) .then((rows) => rows.map((row) => row.blockerIssueId)); } async function seedQueuedIssueRunFixture() { const companyId = randomUUID(); const agentId = randomUUID(); const runId = randomUUID(); const wakeupRequestId = randomUUID(); const issueId = randomUUID(); const now = new Date("2026-03-19T00:00:00.000Z"); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: "idle", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1, }, }, permissions: {}, }); await db.insert(agentWakeupRequests).values({ id: wakeupRequestId, companyId, agentId, source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId }, status: "queued", runId, requestedAt: now, updatedAt: now, }); await db.insert(heartbeatRuns).values({ id: runId, companyId, agentId, invocationSource: "assignment", triggerDetail: "system", status: "queued", wakeupRequestId, contextSnapshot: { issueId, taskId: issueId, wakeReason: "issue_assigned", }, updatedAt: now, createdAt: now, }); await db.insert(issues).values({ id: issueId, companyId, title: "Retry transient Codex failure without blocking", status: "in_progress", priority: "medium", assigneeAgentId: agentId, checkoutRunId: runId, executionRunId: runId, issueNumber: 1, identifier: `${issuePrefix}-1`, startedAt: now, }); return { companyId, agentId, runId, wakeupRequestId, issueId }; } it("keeps a local run active when the recorded pid is still alive", async () => { const child = spawnAliveProcess(); childProcesses.add(child); expect(child.pid).toBeTypeOf("number"); const { runId, wakeupRequestId } = await seedRunFixture({ processPid: child.pid ?? null, includeIssue: false, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(0); const run = await heartbeat.getRun(runId); expect(run?.status).toBe("running"); expect(run?.errorCode).toBe("process_detached"); expect(run?.error).toContain(String(child.pid)); const wakeup = await db .select() .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null); expect(wakeup?.status).toBe("claimed"); }); it("queues exactly one retry when the recorded local pid is dead", async () => { const { agentId, runId, issueId } = await seedRunFixture({ processPid: 999_999_999, contextSnapshot: { modelProfile: "cheap", allowDeliverableWork: false, allowDocumentUpdates: false, resumeRequiresNormalModel: true, }, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(1); expect(result.runIds).toEqual([runId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const failedRun = runs.find((row) => row.id === runId); const retryRun = runs.find((row) => row.id !== runId); expect(failedRun?.status).toBe("failed"); expect(failedRun?.errorCode).toBe("process_lost"); expect(failedRun?.livenessState).toBe("failed"); expect(failedRun?.livenessReason).toContain("process_lost"); expect(failedRun?.resultJson).toMatchObject({ stopReason: "process_lost", timeoutConfigured: false, timeoutFired: false, }); expect(retryRun?.status).toBe("queued"); expect(retryRun?.retryOfRunId).toBe(runId); expect(retryRun?.processLossRetryCount).toBe(1); expect(retryRun?.contextSnapshot as Record).not.toHaveProperty("modelProfile"); const issue = await db .select() .from(issues) .where(eq(issues.id, issueId)) .then((rows) => rows[0] ?? null); expect(issue?.executionRunId).toBe(retryRun?.id ?? null); expect(issue?.checkoutRunId).toBe(runId); }); it("releases active environment leases when an orphaned run is reaped", async () => { const { runId, issueId, companyId } = await seedRunFixture({ processPid: 999_999_999, }); const { leaseId } = await seedEnvironmentLeaseFixture({ companyId, runId, issueId, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(1); expect(result.runIds).toEqual([runId]); const lease = await db .select() .from(environmentLeases) .where(eq(environmentLeases.id, leaseId)) .then((rows) => rows[0] ?? null); expect(lease?.status).toBe("failed"); expect(lease?.releasedAt).toBeTruthy(); }); it.skipIf(process.platform === "win32")("reaps orphaned descendant process groups when the parent pid is already gone", async () => { const orphan = await spawnOrphanedProcessGroup(); cleanupPids.add(orphan.descendantPid); expect(isPidAlive(orphan.descendantPid)).toBe(true); const { agentId, runId, issueId } = await seedRunFixture({ processPid: orphan.processPid, processGroupId: orphan.processGroupId, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(1); expect(result.runIds).toEqual([runId]); expect(await waitForPidExit(orphan.descendantPid, 2_000)).toBe(true); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const failedRun = runs.find((row) => row.id === runId); expect(failedRun?.status).toBe("failed"); expect(failedRun?.errorCode).toBe("process_lost"); expect(failedRun?.error).toContain("descendant process group"); const retryRun = runs.find((row) => row.id !== runId); expect(retryRun?.status).toBe("queued"); const issue = await db .select() .from(issues) .where(eq(issues.id, issueId)) .then((rows) => rows[0] ?? null); expect(issue?.executionRunId).toBe(retryRun?.id ?? null); }); it("blocks the issue when process-loss retry is exhausted and the immediate continuation recovery also fails", async () => { mockAdapterExecute.mockRejectedValueOnce(new Error("continuation recovery failed")); const { companyId, agentId, runId, issueId } = await seedRunFixture({ agentStatus: "idle", processPid: 999_999_999, processLossRetryCount: 1, }); const resolvedBlockerId = randomUUID(); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(issues).values({ id: resolvedBlockerId, companyId, title: "Already completed prerequisite", status: "done", priority: "medium", issueNumber: 2, identifier: `${issuePrefix}-2`, }); await db.insert(issueRelations).values({ companyId, issueId: resolvedBlockerId, relatedIssueId: issueId, type: "blocks", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(1); expect(result.runIds).toEqual([runId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); expect(runs.find((row) => row.id === runId)?.status).toBe("failed"); const continuationRun = runs.find((row) => row.id !== runId); expect(continuationRun?.contextSnapshot as Record | undefined).toMatchObject({ retryReason: "issue_continuation_needed", retryOfRunId: runId, }); const blockedIssue = await waitForValue(async () => db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => { const issue = rows[0] ?? null; return issue?.status === "blocked" ? issue : null; }) ); expect(blockedIssue?.status).toBe("blocked"); expect(blockedIssue?.executionRunId).toBeNull(); expect(blockedIssue?.checkoutRunId).toBeNull(); if (!continuationRun?.id) throw new Error("Expected continuation recovery run to exist"); const recoveryAction = await expectSourceScopedStrandedRecoveryAction({ companyId, agentId, issueId, runId: continuationRun.id, previousStatus: "in_progress", retryReason: "issue_continuation_needed", }); await expect(sourceBlockerIssueIds(companyId, issueId)).resolves.toEqual([]); const comments = await waitForValue(async () => { const rows = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); return rows.length > 0 ? rows : null; }); expect(comments).toHaveLength(1); expect(comments[0]?.body).toContain("retried continuation"); expect(comments[0]?.body).toContain(`Recovery action: \`${recoveryAction.id}\``); expect(comments[0]?.body).toContain("Recovery owner: [CodexCoder]"); }); it("blocks failed recovery work in place during immediate terminal-run cleanup", async () => { const sourceIssueId = randomUUID(); const { companyId, agentId, runId, issueId } = await seedRunFixture({ agentStatus: "idle", processPid: 999_999_999, processLossRetryCount: 1, runErrorCode: "process_lost", runError: "Authorization: Bearer sk-test-recovery-secret", }); await db .update(issues) .set({ title: "Recover stalled issue PAP-1", originKind: "stranded_issue_recovery", originId: sourceIssueId, }) .where(eq(issues.id, issueId)); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(issues).values({ id: sourceIssueId, companyId, title: "Original stranded source", status: "blocked", priority: "medium", issueNumber: 2, identifier: `${issuePrefix}-2`, }); await db.insert(issueRelations).values({ companyId, issueId, relatedIssueId: sourceIssueId, type: "blocks", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(1); expect(result.runIds).toEqual([runId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(1); expect(runs[0]?.status).toBe("failed"); const recoveryIssue = await waitForValue(async () => db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => { const issue = rows[0] ?? null; return issue?.status === "blocked" ? issue : null; }) ); expect(recoveryIssue?.assigneeAgentId).toBe(agentId); expect(recoveryIssue?.originKind).toBe("stranded_issue_recovery"); expect(recoveryIssue?.originId).toBe(sourceIssueId); expect(recoveryIssue?.executionRunId).toBeNull(); const nestedRecoveries = await db .select() .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, issueId))); expect(nestedRecoveries).toHaveLength(0); const comments = await waitForValue(async () => { const rows = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); return rows.length > 0 ? rows : null; }); expect(comments).toHaveLength(1); expect(comments[0]?.body).toContain("stopped automatic stranded-work recovery"); expect(comments[0]?.body).toContain("recovery issues do not create nested `stranded_issue_recovery` issues"); expect(comments[0]?.body).toContain("Latest retry failure details were withheld from the issue thread"); expect(comments[0]?.body).not.toContain("sk-test-recovery-secret"); await expect(sourceBlockerIssueIds(companyId, sourceIssueId)).resolves.toEqual([issueId]); }); it("does not block paused-tree work when immediate continuation recovery is suppressed by the hold", async () => { const { companyId, agentId, runId, issueId } = await seedRunFixture({ agentStatus: "idle", processPid: 999_999_999, processLossRetryCount: 1, }); await db.insert(issueTreeHolds).values({ companyId, rootIssueId: issueId, mode: "pause", status: "active", reason: "pause immediate recovery subtree", releasePolicy: { strategy: "manual" }, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(1); expect(result.runIds).toEqual([runId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(1); expect(runs[0]?.status).toBe("failed"); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("in_progress"); expect(issue?.executionRunId).toBeNull(); expect(issue?.checkoutRunId).toBe(runId); const recoveryIssues = await db .select() .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"))); expect(recoveryIssues).toHaveLength(0); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(0); }); it("schedules a bounded retry for codex transient upstream failures instead of blocking the issue immediately", async () => { mockAdapterExecute.mockResolvedValueOnce({ exitCode: 1, signal: null, timedOut: false, errorCode: "adapter_failed", errorFamily: "transient_upstream", errorMessage: "Error running remote compact task: We're currently experiencing high demand, which may cause temporary errors.", provider: "openai", model: "gpt-5.4", resultJson: { errorFamily: "transient_upstream", }, }); const { agentId, runId, issueId } = await seedQueuedIssueRunFixture(); const heartbeat = heartbeatService(db); await heartbeat.resumeQueuedRuns(); await waitForRunToSettle(heartbeat, runId); const runs = await waitForValue(async () => { const rows = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); return rows.length >= 2 ? rows : null; }); expect(runs).toHaveLength(2); const failedRun = runs?.find((row) => row.id === runId); const retryRun = runs?.find((row) => row.id !== runId); expect(failedRun?.status).toBe("failed"); expect(failedRun?.errorCode).toBe("adapter_failed"); expect((failedRun?.resultJson as Record | null)?.errorFamily).toBe("transient_upstream"); expect(retryRun?.status).toBe("scheduled_retry"); expect(retryRun?.scheduledRetryReason).toBe("transient_failure"); expect(retryRun?.contextSnapshot).toMatchObject({ codexTransientFallbackMode: "same_session", }); expect(retryRun?.contextSnapshot as Record).not.toHaveProperty("modelProfile"); const issue = await db .select() .from(issues) .where(eq(issues.id, issueId)) .then((rows) => rows[0] ?? null); expect(issue?.status).toBe("in_progress"); expect(issue?.executionRunId).toBe(retryRun?.id ?? null); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(0); }); it("queues one finish-handoff wake when a successful run leaves in-progress work without a next action", async () => { const { companyId, agentId, runId, issueId } = await seedQueuedIssueRunFixture(); mockAdapterExecute.mockImplementationOnce(async (ctx: { runId: string }) => { await db.insert(issueComments).values({ companyId, issueId, authorAgentId: agentId, createdByRunId: ctx.runId, body: "Implemented the backend detector, but did not choose a final issue state.", }); return { exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Implemented the backend detector, but did not choose a final issue state.", provider: "test", model: "test-model", }; }); const heartbeat = heartbeatService(db); await heartbeat.resumeQueuedRuns(); await waitForRunToSettle(heartbeat, runId, 5_000); const handoffWakeups = await waitForValue(async () => { const rows = await db .select() .from(agentWakeupRequests) .where(eq(agentWakeupRequests.agentId, agentId)); const matches = rows.filter((wakeup) => wakeup.reason === "finish_successful_run_handoff"); return matches.length > 0 ? matches : null; }, 5_000); await waitForHeartbeatIdle(db, 5_000); expect(handoffWakeups).toHaveLength(1); expect(handoffWakeups[0]?.idempotencyKey).toBe(`finish_successful_run_handoff:${issueId}:${runId}:1`); expect(handoffWakeups[0]?.payload).toMatchObject({ issueId, sourceRunId: runId, handoffRequired: true, handoffReason: "successful_run_missing_state", handoffAttempt: 1, maxHandoffAttempts: 1, resumeIntent: true, resumeFromRunId: runId, }); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); const handoffComment = comments.find((comment) => comment.body === SUCCESSFUL_RUN_HANDOFF_REQUIRED_NOTICE_BODY); expect(handoffComment).toBeTruthy(); expect(handoffComment?.authorType).toBe("system"); expect(handoffComment?.presentation).toMatchObject({ kind: "system_notice", tone: "warning", detailsDefaultOpen: false, }); expect(handoffComment?.metadata).toMatchObject({ version: 1, sections: expect.arrayContaining([ expect.objectContaining({ title: "Required action", rows: expect.arrayContaining([ expect.objectContaining({ type: "key_value", label: "Missing disposition", value: "clear_next_step" }), ]), }), expect.objectContaining({ title: "Run evidence", rows: expect.arrayContaining([ expect.objectContaining({ type: "run_link", runId }), expect.objectContaining({ type: "key_value", label: "Normalized cause", value: SUCCESSFUL_RUN_MISSING_STATE_REASON }), ]), }), ]), }); const activity = await db .select() .from(activityLog) .where(eq(activityLog.entityId, issueId)); expect(activity.some((event) => event.action === "issue.successful_run_handoff_required")).toBe(true); }); it("requeues a missing-disposition handoff when the previous corrective wake was cancelled", async () => { const { companyId, agentId, runId, issueId } = await seedQueuedIssueRunFixture(); const idempotencyKey = `finish_successful_run_handoff:${issueId}:${runId}:1`; await db.insert(agentWakeupRequests).values({ id: randomUUID(), companyId, agentId, source: "automation", triggerDetail: "system", reason: "finish_successful_run_handoff", payload: { issueId, sourceRunId: runId, handoffRequired: true, handoffReason: SUCCESSFUL_RUN_MISSING_STATE_REASON, }, status: "cancelled", idempotencyKey, requestedAt: new Date("2026-03-19T00:00:01.000Z"), finishedAt: new Date("2026-03-19T00:00:02.000Z"), updatedAt: new Date("2026-03-19T00:00:02.000Z"), }); mockAdapterExecute.mockImplementationOnce(async (ctx: { runId: string }) => { await db.insert(issueComments).values({ companyId, issueId, authorAgentId: agentId, createdByRunId: ctx.runId, body: "Implemented recovery handling, but did not choose a final issue state.", }); return { exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Implemented recovery handling, but did not choose a final issue state.", provider: "test", model: "test-model", }; }); const heartbeat = heartbeatService(db); await heartbeat.resumeQueuedRuns(); await waitForRunToSettle(heartbeat, runId, 5_000); const handoffWakeups = await waitForValue(async () => { const rows = await db .select() .from(agentWakeupRequests) .where(eq(agentWakeupRequests.idempotencyKey, idempotencyKey)); const requeued = rows.filter((wakeup) => wakeup.reason === "finish_successful_run_handoff"); return requeued.length > 1 ? requeued : null; }, 5_000); await waitForHeartbeatIdle(db, 5_000); expect(handoffWakeups).toHaveLength(2); expect(handoffWakeups.filter((wakeup) => wakeup.status === "cancelled")).toHaveLength(1); expect(handoffWakeups.some((wakeup) => wakeup.status !== "cancelled")).toBe(true); }); it("queues one missing-disposition handoff for artifact-producing successful runs left in progress", async () => { const { companyId, agentId, runId, issueId } = await seedQueuedIssueRunFixture(); mockAdapterExecute.mockImplementationOnce(async (ctx: { runId: string }) => { const documentId = randomUUID(); const revisionId = randomUUID(); await db.insert(issueComments).values({ companyId, issueId, authorAgentId: agentId, createdByRunId: ctx.runId, body: "Drafted the Phase 3 test plan but did not choose a final issue disposition.", }); await db.insert(documents).values({ id: documentId, companyId, title: "Regression test plan", format: "markdown", latestBody: "# Regression test plan\n\n- Cover artifact-producing successful runs", latestRevisionId: revisionId, latestRevisionNumber: 1, createdByAgentId: agentId, updatedByAgentId: agentId, }); await db.insert(documentRevisions).values({ id: revisionId, companyId, documentId, revisionNumber: 1, title: "Regression test plan", format: "markdown", body: "# Regression test plan\n\n- Cover artifact-producing successful runs", createdByAgentId: agentId, createdByRunId: ctx.runId, }); await db.insert(issueDocuments).values({ companyId, issueId, documentId, key: "plan", }); await db.insert(issueWorkProducts).values({ companyId, issueId, type: "report", provider: "test", externalId: "phase-3-report", title: "Phase 3 regression notes", status: "ready", summary: "Successful run produced a visible artifact.", createdByRunId: ctx.runId, }); return { exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Created comments, a plan document, and a work product without choosing a disposition.", provider: "test", model: "test-model", }; }); const heartbeat = heartbeatService(db); await heartbeat.resumeQueuedRuns(); const settledRun = await waitForRunToSettle(heartbeat, runId, 5_000); const handoffWakeups = await waitForValue(async () => { const rows = await db .select() .from(agentWakeupRequests) .where(eq(agentWakeupRequests.agentId, agentId)); const matches = rows.filter((wakeup) => wakeup.reason === "finish_successful_run_handoff"); return matches.length > 0 ? matches : null; }, 5_000); await waitForHeartbeatIdle(db, 5_000); const classifiedRun = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); expect(classifiedRun?.status ?? settledRun?.status).toBe("succeeded"); expect(classifiedRun?.livenessState).toBe("advanced"); expect(handoffWakeups).toHaveLength(1); expect(handoffWakeups[0]?.idempotencyKey).toBe(`finish_successful_run_handoff:${issueId}:${runId}:1`); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("in_progress"); await expect(sourceBlockerIssueIds(companyId, issueId)).resolves.toEqual([]); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments.filter((comment) => comment.body === SUCCESSFUL_RUN_HANDOFF_REQUIRED_NOTICE_BODY)).toHaveLength(1); expect(comments.some((comment) => comment.body.startsWith("Drafted the Phase 3 test plan"))).toBe(true); const workProducts = await db.select().from(issueWorkProducts).where(eq(issueWorkProducts.issueId, issueId)); expect(workProducts).toHaveLength(1); const recoveryIssues = await db .select() .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"))); expect(recoveryIssues).toHaveLength(0); }); it("redacts secret-bearing successful-run detected progress before handoff disclosure", async () => { const { agentId, runId, issueId } = await seedQueuedIssueRunFixture(); const bearerSecret = "live-bearer-token-value"; const apiKeySecret = "sk-testsuccessfulhandoffsecret"; const redactedDetectedSummary = redactDetectedSuccessfulRunProgressSummaryForBoard( `Next action noted: Authorization: Bearer ${bearerSecret} OPENAI_API_KEY=${apiKeySecret}`, { enabled: false }, ); expect(redactedDetectedSummary).toContain("***REDACTED***"); expect(redactedDetectedSummary).not.toContain(bearerSecret); expect(redactedDetectedSummary).not.toContain(apiKeySecret); mockAdapterExecute.mockResolvedValueOnce({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Made progress but left the issue open.", resultJson: { message: `Next action: Authorization: Bearer ${bearerSecret} OPENAI_API_KEY=${apiKeySecret}`, }, provider: "test", model: "test-model", }); const heartbeat = heartbeatService(db); await heartbeat.resumeQueuedRuns(); await waitForRunToSettle(heartbeat, runId, 5_000); const handoffWakeups = await waitForValue(async () => { const rows = await db .select() .from(agentWakeupRequests) .where(eq(agentWakeupRequests.agentId, agentId)); const matches = rows.filter((wakeup) => wakeup.reason === "finish_successful_run_handoff"); return matches.length > 0 ? matches : null; }, 5_000); await waitForHeartbeatIdle(db, 5_000); expect(handoffWakeups).toHaveLength(1); const wakeupPayloadText = JSON.stringify(handoffWakeups[0]?.payload ?? {}); expect(wakeupPayloadText).not.toContain(bearerSecret); expect(wakeupPayloadText).not.toContain(apiKeySecret); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); const handoffComment = comments.find((comment) => comment.body === SUCCESSFUL_RUN_HANDOFF_REQUIRED_NOTICE_BODY); expect(handoffComment).toBeTruthy(); expect(handoffComment?.body).not.toContain(bearerSecret); expect(handoffComment?.body).not.toContain(apiKeySecret); expect(JSON.stringify(handoffComment?.metadata ?? {})).not.toContain(bearerSecret); expect(JSON.stringify(handoffComment?.metadata ?? {})).not.toContain(apiKeySecret); const activity = await db .select() .from(activityLog) .where(eq(activityLog.entityId, issueId)); const handoffActivity = activity.find((event) => event.action === "issue.successful_run_handoff_required"); expect(handoffActivity).toBeTruthy(); const activityDetailsText = JSON.stringify(handoffActivity?.details ?? {}); expect(activityDetailsText).not.toContain(bearerSecret); expect(activityDetailsText).not.toContain(apiKeySecret); }); it("escalates an exhausted failed successful-run handoff without using generic continuation recovery first", async () => { const { companyId, agentId, runId, issueId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", runErrorCode: "adapter_failed", runError: "Authorization: Bearer sk-test-successful-handoff-secret", }); const sourceRunId = randomUUID(); await db .update(heartbeatRuns) .set({ contextSnapshot: { issueId, taskId: issueId, wakeReason: "finish_successful_run_handoff", sourceRunId, resumeFromRunId: sourceRunId, handoffRequired: true, handoffReason: "successful_run_missing_state", missingDisposition: "clear_next_step", handoffAttempt: 1, maxHandoffAttempts: 1, }, }) .where(eq(heartbeatRuns.id, runId)); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); expect(result.successfulRunHandoffEscalated).toBe(1); expect(result.issueIds).toEqual([issueId]); const recoveryAction = await expectSourceScopedStrandedRecoveryAction({ companyId, agentId, issueId, runId, previousStatus: "in_progress", retryReason: null, cause: SUCCESSFUL_RUN_MISSING_STATE_REASON, kind: "missing_disposition", }); expect(recoveryAction.evidence).toMatchObject({ sourceRunId, missingDisposition: "clear_next_step", latestRunStatus: "failed", latestRunErrorCode: "adapter_failed", recoveryCause: SUCCESSFUL_RUN_MISSING_STATE_REASON, }); expect(JSON.stringify(recoveryAction.evidence)).not.toContain("sk-test-successful-handoff-secret"); const sourceIssue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(sourceIssue?.status).toBe("blocked"); await expect(sourceBlockerIssueIds(companyId, issueId)).resolves.toEqual([]); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments[0]?.body).toBe(SUCCESSFUL_RUN_HANDOFF_EXHAUSTED_NOTICE_BODY); expect(comments[0]?.authorType).toBe("system"); expect(comments[0]?.presentation).toMatchObject({ kind: "system_notice", tone: "danger", detailsDefaultOpen: false, }); expect(comments[0]?.metadata).toMatchObject({ version: 1, sections: expect.arrayContaining([ expect.objectContaining({ title: "Recovery owner", rows: expect.arrayContaining([ expect.objectContaining({ type: "key_value", label: "Recovery action", value: recoveryAction.id }), expect.objectContaining({ type: "agent_link", label: "Recovery owner", name: "CodexCoder" }), ]), }), expect.objectContaining({ title: "Run evidence", rows: expect.arrayContaining([ expect.objectContaining({ type: "key_value", label: "Normalized cause", value: SUCCESSFUL_RUN_MISSING_STATE_REASON }), expect.objectContaining({ type: "key_value", label: "Missing disposition", value: "clear_next_step" }), ]), }), ]), }); expect(comments[0]?.body).not.toContain("sk-test-successful-handoff-secret"); expect(JSON.stringify(comments[0]?.metadata ?? {})).not.toContain("sk-test-successful-handoff-secret"); const activity = await db.select().from(activityLog).where(eq(activityLog.entityId, issueId)); expect(activity.some((event) => event.action === "issue.successful_run_handoff_escalated")).toBe(true); }); it("escalates an exhausted successful handoff run that still leaves no disposition", async () => { const { companyId, agentId, runId, issueId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "succeeded", livenessState: "advanced", }); const sourceRunId = randomUUID(); await db .update(heartbeatRuns) .set({ contextSnapshot: { issueId, taskId: issueId, wakeReason: "finish_successful_run_handoff", sourceRunId, resumeFromRunId: sourceRunId, handoffRequired: true, handoffReason: "successful_run_missing_state", missingDisposition: "clear_next_step", handoffAttempt: 1, maxHandoffAttempts: 1, }, }) .where(eq(heartbeatRuns.id, runId)); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.continuationRequeued).toBe(0); expect(result.successfulContinuationObserved).toBe(0); expect(result.successfulRunHandoffEscalated).toBe(1); const recoveryAction = await expectSourceScopedStrandedRecoveryAction({ companyId, agentId, issueId, runId, previousStatus: "in_progress", retryReason: null, cause: SUCCESSFUL_RUN_MISSING_STATE_REASON, kind: "missing_disposition", }); expect(recoveryAction.evidence).toMatchObject({ sourceRunId, latestRunStatus: "succeeded", missingDisposition: "clear_next_step", }); }); it("clears the detached warning when the run reports activity again", async () => { const { runId } = await seedRunFixture({ includeIssue: false, runErrorCode: "process_detached", runError: "Lost in-memory process handle, but child pid 123 is still alive", }); const heartbeat = heartbeatService(db); const updated = await heartbeat.reportRunActivity(runId); expect(updated?.errorCode).toBeNull(); expect(updated?.error).toBeNull(); const run = await heartbeat.getRun(runId); expect(run?.errorCode).toBeNull(); expect(run?.error).toBeNull(); }); it("tracks the first heartbeat with the agent role instead of adapter type", async () => { const { agentId, runId } = await seedRunFixture({ agentStatus: "running", includeIssue: false, }); const heartbeat = heartbeatService(db); await heartbeat.cancelRun(runId); expect(mockTrackAgentFirstHeartbeat).toHaveBeenCalledWith( mockTelemetryClient, expect.objectContaining({ agentRole: "engineer", agentId, }), ); }); it("records manual cancellation stop metadata", async () => { const { runId } = await seedRunFixture({ agentStatus: "running", includeIssue: false, }); const heartbeat = heartbeatService(db); const cancelled = await heartbeat.cancelRun(runId); expect(cancelled?.status).toBe("cancelled"); expect(cancelled?.resultJson).toMatchObject({ stopReason: "cancelled", effectiveTimeoutSec: 0, timeoutConfigured: false, timeoutFired: false, }); }); it("dispatches assigned todo work with no prior run as a normal assignment wake", async () => { const { companyId, agentId, issueId } = await seedAssignedTodoNoRunFixture(); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.assignmentDispatched).toBe(1); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); expect(result.issueIds).toEqual([issueId]); const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); expect(wakeups).toHaveLength(1); expect(wakeups[0]).toMatchObject({ companyId, agentId, source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: expect.objectContaining({ issueId, mutation: "assigned_todo_liveness_dispatch", }), }); expect(wakeups[0]?.payload as Record).not.toHaveProperty("modelProfile"); const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(1); expect(runs[0]?.retryOfRunId).toBeNull(); expect(runs[0]?.contextSnapshot).toMatchObject({ issueId, taskId: issueId, wakeReason: "issue_assigned", source: "issue.assigned_todo_liveness_dispatch", }); expect(runs[0]?.contextSnapshot as Record).not.toHaveProperty("modelProfile"); expect((runs[0]?.contextSnapshot as Record)?.retryReason).toBeUndefined(); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("todo"); const recoveryIssues = await db .select() .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"))); expect(recoveryIssues).toHaveLength(0); await expect(sourceBlockerIssueIds(companyId, issueId)).resolves.toEqual([]); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(0); if (runs[0]?.id) { await waitForRunToSettle(heartbeat, runs[0].id); } }); it("does not duplicate initial assigned todo dispatch when a queued wake already exists", async () => { const { companyId, agentId, issueId } = await seedAssignedTodoNoRunFixture(); await db.insert(agentWakeupRequests).values({ companyId, agentId, source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId, mutation: "assigned_todo_liveness_dispatch" }, status: "queued", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.assignmentDispatched).toBe(0); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); expect(result.skipped).toBe(1); expect(result.issueIds).toEqual([]); const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); expect(wakeups).toHaveLength(1); const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(0); }); it("skips budget-blocked assigned todo work with no prior run and continues the sweep", async () => { const blocked = await seedAssignedTodoNoRunFixture(); const unblocked = await seedAssignedTodoNoRunFixture(); await db.insert(budgetPolicies).values({ companyId: blocked.companyId, scopeType: "agent", scopeId: blocked.agentId, metric: "billed_cents", windowKind: "calendar_month_utc", amount: 1, hardStopEnabled: true, isActive: true, }); await db.insert(costEvents).values({ companyId: blocked.companyId, agentId: blocked.agentId, issueId: blocked.issueId, provider: "test", biller: "test", billingType: "tokens", model: "test-model", costCents: 1, occurredAt: new Date(), }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.assignmentDispatched).toBe(1); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); expect(result.skipped).toBe(1); expect(result.issueIds).toEqual([unblocked.issueId]); const blockedWakeups = await db .select() .from(agentWakeupRequests) .where(eq(agentWakeupRequests.agentId, blocked.agentId)); expect(blockedWakeups).toHaveLength(0); const blockedRuns = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, blocked.agentId)); expect(blockedRuns).toHaveLength(0); const blockedIssue = await db .select() .from(issues) .where(eq(issues.id, blocked.issueId)) .then((rows) => rows[0] ?? null); expect(blockedIssue?.status).toBe("todo"); const unblockedWakeups = await db .select() .from(agentWakeupRequests) .where(eq(agentWakeupRequests.agentId, unblocked.agentId)); expect(unblockedWakeups).toHaveLength(1); expect(unblockedWakeups[0]).toMatchObject({ reason: "issue_assigned", payload: expect.objectContaining({ issueId: unblocked.issueId, mutation: "assigned_todo_liveness_dispatch", }), }); expect(unblockedWakeups[0]?.payload as Record).not.toHaveProperty("modelProfile"); const unblockedRuns = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, unblocked.agentId)); expect(unblockedRuns).toHaveLength(1); if (unblockedRuns[0]?.id) { await waitForRunToSettle(heartbeat, unblockedRuns[0].id); } }); it("does not dispatch assigned todo work with no prior run when the agent is paused", async () => { const { agentId, issueId } = await seedAssignedTodoNoRunFixture({ agentStatus: "paused" }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.assignmentDispatched).toBe(0); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); expect(result.skipped).toBe(1); expect(result.issueIds).toEqual([]); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("todo"); const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(0); }); it("re-enqueues assigned todo work when the last issue run died and no wake remains", async () => { const { agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "todo", runStatus: "failed", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.assignmentDispatched).toBe(0); expect(result.dispatchRequeued).toBe(1); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); expect(result.issueIds).toEqual([issueId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const retryRun = runs.find((row) => row.id !== runId); expect(retryRun?.id).toBeTruthy(); expect((retryRun?.contextSnapshot as Record)?.retryReason).toBe("assignment_recovery"); expect(retryRun?.contextSnapshot as Record).not.toHaveProperty("modelProfile"); if (retryRun) { await waitForRunToSettle(heartbeat, retryRun.id); } }); it.each([ ["failed", "adapter_failed"], ["failed", "process_lost"], ["timed_out", "adapter_timed_out"], ] as const)( "re-enqueues stranded in-progress work after a %s/%s run before escalating", async (runStatus, runErrorCode) => { const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus, runErrorCode, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(1); expect(result.escalated).toBe(0); expect(result.issueIds).toEqual([issueId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const retryRun = runs.find((row) => row.id !== runId); expect(retryRun?.contextSnapshot as Record | undefined).toMatchObject({ issueId, taskId: issueId, retryReason: "issue_continuation_needed", retryOfRunId: runId, source: "issue.continuation_recovery", }); expect(retryRun?.contextSnapshot as Record).not.toHaveProperty("modelProfile"); const recoveries = await db .select() .from(issues) .where( and( eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, issueId), ), ); expect(recoveries).toHaveLength(0); if (retryRun?.id) { await waitForRunToSettle(heartbeat, retryRun.id); } }, ); it("still re-enqueues stranded assigned todo recovery when an old queued wake exists", async () => { const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "todo", runStatus: "failed", }); await db.insert(agentWakeupRequests).values({ companyId, agentId, source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId }, status: "queued", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.assignmentDispatched).toBe(0); expect(result.dispatchRequeued).toBe(1); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); expect(result.issueIds).toEqual([issueId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const retryRun = runs.find((row) => row.id !== runId); expect((retryRun?.contextSnapshot as Record)?.retryReason).toBe("assignment_recovery"); expect(retryRun?.contextSnapshot as Record).not.toHaveProperty("modelProfile"); if (retryRun) { await waitForRunToSettle(heartbeat, retryRun.id); } }); it("blocks assigned todo work after the one automatic dispatch recovery was already used", async () => { const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "todo", runStatus: "failed", retryReason: "assignment_recovery", runErrorCode: "process_lost", runError: "Authorization: Bearer sk-test-recovery-secret", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.escalated).toBe(1); expect(result.issueIds).toEqual([issueId]); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("blocked"); const recoveryAction = await expectSourceScopedStrandedRecoveryAction({ companyId, agentId, issueId, runId, previousStatus: "todo", retryReason: "assignment_recovery", }); expect(JSON.stringify(recoveryAction.evidence)).not.toContain("sk-test-recovery-secret"); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(1); expect(comments[0]?.body).toContain("retried dispatch"); expect(comments[0]?.body).toContain("Latest retry failure details were withheld from the issue thread"); expect(comments[0]?.body).toContain(`Recovery action: \`${recoveryAction.id}\``); expect(comments[0]?.body).toContain("Recovery owner: [CodexCoder]"); }); it("blocks an already stranded recovery issue without creating a recovery child", async () => { const { companyId, issueId } = await seedStrandedIssueFixture({ status: "todo", runStatus: "failed", retryReason: "assignment_recovery", }); const sourceIssueId = randomUUID(); const sourceRunId = randomUUID(); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(issues).values({ id: sourceIssueId, companyId, title: "Original source issue", status: "blocked", priority: "medium", issueNumber: 2, identifier: `${issuePrefix}-2`, }); await db .update(issues) .set({ title: "Recover stalled issue from previous adapter failure", parentId: sourceIssueId, originKind: "stranded_issue_recovery", originId: sourceIssueId, originRunId: sourceRunId, originFingerprint: [ "stranded_issue_recovery", companyId, sourceIssueId, sourceRunId, ].join(":"), }) .where(eq(issues.id, issueId)); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.escalated).toBe(1); expect(result.issueIds).toEqual([issueId]); const recoveryIssues = await db .select() .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"))); expect(recoveryIssues).toHaveLength(1); expect(recoveryIssues[0]).toMatchObject({ id: issueId, status: "blocked", parentId: sourceIssueId, originId: sourceIssueId, originRunId: sourceRunId, }); expect(recoveryIssues[0]?.checkoutRunId).toBeNull(); expect(recoveryIssues[0]?.executionRunId).toBeNull(); const blockerRelations = await db .select() .from(issueRelations) .where( and( eq(issueRelations.companyId, companyId), eq(issueRelations.relatedIssueId, issueId), eq(issueRelations.type, "blocks"), ), ); expect(blockerRelations).toHaveLength(0); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(1); expect(comments[0]?.body).toContain("stopped automatic stranded-work recovery"); expect(comments[0]?.body).toContain("recovery issues do not create nested `stranded_issue_recovery` issues"); expect(comments[0]?.body).toContain(`Recovery issue: [${recoveryIssues[0]?.identifier}]`); expect(comments[0]?.body).toContain("Next action:"); }); it("assigns open unassigned blockers back to their creator agent", async () => { const companyId = randomUUID(); const creatorAgentId = randomUUID(); const blockedAssigneeAgentId = randomUUID(); const blockerIssueId = randomUUID(); const blockedIssueId = randomUUID(); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values([ { id: creatorAgentId, companyId, name: "SecurityEngineer", role: "engineer", status: "idle", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: {}, permissions: {}, }, { id: blockedAssigneeAgentId, companyId, name: "CodexCoder", role: "engineer", status: "idle", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: {}, permissions: {}, }, ]); await db.insert(issues).values([ { id: blockerIssueId, companyId, title: "Fix blocker", status: "todo", priority: "high", createdByAgentId: creatorAgentId, issueNumber: 1, identifier: `${issuePrefix}-1`, }, { id: blockedIssueId, companyId, title: "Blocked work", status: "blocked", priority: "high", assigneeAgentId: blockedAssigneeAgentId, issueNumber: 2, identifier: `${issuePrefix}-2`, }, ]); await db.insert(issueRelations).values({ companyId, issueId: blockerIssueId, relatedIssueId: blockedIssueId, type: "blocks", createdByAgentId: creatorAgentId, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.orphanBlockersAssigned).toBe(1); expect(result.issueIds).toContain(blockerIssueId); const blocker = await db .select() .from(issues) .where(eq(issues.id, blockerIssueId)) .then((rows) => rows[0] ?? null); expect(blocker?.assigneeAgentId).toBe(creatorAgentId); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, blockerIssueId)); expect(comments[0]?.body).toContain("Assigned Orphan Blocker"); expect(comments[0]?.body).toContain(`[${issuePrefix}-2](/${issuePrefix}/issues/${issuePrefix}-2)`); const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, creatorAgentId)); expect(wakeups).toEqual([ expect.objectContaining({ reason: "issue_assigned", payload: expect.objectContaining({ issueId: blockerIssueId, mutation: "unassigned_blocker_recovery", }), }), ]); const runId = wakeups[0]?.runId; if (runId) { await waitForRunToSettle(heartbeat, runId); } }); it("re-enqueues continuation for stranded in-progress work with no active run", async () => { const { agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(1); expect(result.escalated).toBe(0); expect(result.issueIds).toEqual([issueId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const retryRun = runs.find((row) => row.id !== runId); expect(retryRun?.id).toBeTruthy(); expect((retryRun?.contextSnapshot as Record)?.retryReason).toBe("issue_continuation_needed"); expect(retryRun?.contextSnapshot as Record).not.toHaveProperty("modelProfile"); if (retryRun) { await waitForRunToSettle(heartbeat, retryRun.id); } }); it("does not continue seeded in-progress work that has no run linkage", async () => { const companyId = randomUUID(); const agentId = randomUUID(); const issueId = randomUUID(); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: "idle", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: {}, permissions: {}, }); await db.insert(issues).values({ id: issueId, companyId, title: "Seeded in-flight work", status: "in_progress", priority: "medium", assigneeAgentId: agentId, checkoutRunId: null, executionRunId: null, issueNumber: 1, identifier: `${issuePrefix}-1`, startedAt: new Date("2026-03-19T00:00:00.000Z"), }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); expect(result.skipped).toBe(1); const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(0); const [issue] = await db.select().from(issues).where(eq(issues.id, issueId)); expect(issue?.status).toBe("in_progress"); expect(issue?.executionRunId).toBeNull(); }); it("classifies actionable plan-only recovery and enqueues one liveness continuation", async () => { mockAdapterExecute.mockResolvedValueOnce({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "I will inspect the repo next and then implement the fix.", provider: "test", model: "test-model", }); const { agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", }); const heartbeat = heartbeatService(db); await heartbeat.reconcileStrandedAssignedIssues(); const livenessWake = await waitForValue(async () => { const rows = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); return rows.find((row) => row.reason === "run_liveness_continuation") ?? null; }); expect(livenessWake).toBeTruthy(); expect(livenessWake?.payload).toMatchObject({ issueId, livenessState: "plan_only", continuationAttempt: 1, }); const sourceRunId = (livenessWake?.payload as Record | null)?.sourceRunId; expect(sourceRunId).toBeTruthy(); const sourceRun = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.id, String(sourceRunId))) .then((rows) => rows[0] ?? null); if (sourceRun?.id) { await waitForRunToSettle(heartbeat, sourceRun.id, 5_000); } expect(sourceRun?.id).not.toBe(runId); expect(sourceRun?.livenessState).toBe("plan_only"); }); it("treats a plan document update as progress and does not enqueue liveness continuation", async () => { const { agentId, companyId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", }); mockAdapterExecute.mockImplementationOnce(async (ctx: { runId: string }) => { const documentId = randomUUID(); const revisionId = randomUUID(); await db.insert(documents).values({ id: documentId, companyId, title: "Plan", format: "markdown", latestBody: "# Plan\n\n- Inspect files\n- Implement fix", latestRevisionId: revisionId, latestRevisionNumber: 1, createdByAgentId: agentId, updatedByAgentId: agentId, }); await db.insert(documentRevisions).values({ id: revisionId, companyId, documentId, revisionNumber: 1, title: "Plan", format: "markdown", body: "# Plan\n\n- Inspect files\n- Implement fix", createdByAgentId: agentId, createdByRunId: ctx.runId, }); await db.insert(issueDocuments).values({ companyId, issueId, documentId, key: "plan", }); return { exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Plan:\n- Inspect files\n- Implement fix", provider: "test", model: "test-model", }; }); const heartbeat = heartbeatService(db); await heartbeat.reconcileStrandedAssignedIssues(); const retryRun = await waitForValue(async () => { const rows = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); return rows.find((row) => row.id !== runId && row.livenessState === "advanced") ?? null; }, 5_000); if (retryRun?.id) { await waitForRunToSettle(heartbeat, retryRun.id, 5_000); } expect(retryRun?.livenessState).toBe("advanced"); const wakes = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); expect(wakes.some((row) => row.reason === "run_liveness_continuation")).toBe(false); }); it("blocks stranded in-progress work after the continuation retry was already used", async () => { const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", retryReason: "issue_continuation_needed", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(1); expect(result.issueIds).toEqual([issueId]); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("blocked"); const recoveryAction = await expectSourceScopedStrandedRecoveryAction({ companyId, agentId, issueId, runId, previousStatus: "in_progress", retryReason: "issue_continuation_needed", }); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(1); expect(comments[0]?.body).toContain("retried continuation"); expect(comments[0]?.body).toContain("Latest retry failure details were withheld from the issue thread"); expect(comments[0]?.body).toContain(`Recovery action: \`${recoveryAction.id}\``); expect(comments[0]?.body).toContain("Recovery owner: [CodexCoder]"); }); it("redacts error-code-only stranded recovery failures in issue copy", async () => { const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", retryReason: "issue_continuation_needed", runErrorCode: "adapter_exit_code", runError: null, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.escalated).toBe(1); const recoveryAction = await expectSourceScopedStrandedRecoveryAction({ companyId, agentId, issueId, runId, previousStatus: "in_progress", retryReason: "issue_continuation_needed", }); expect(recoveryAction.evidence).toMatchObject({ latestRunErrorCode: "adapter_exit_code", }); expect(JSON.stringify(recoveryAction.evidence)).not.toContain("- Failure: none recorded"); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(1); expect(comments[0]?.body).toContain("Latest retry failure details were withheld from the issue thread"); expect(comments[0]?.body).not.toContain("- Failure: none recorded"); }); it("reuses the raced stranded recovery issue when duplicate active recovery creation conflicts", async () => { const { companyId, issueId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", retryReason: "issue_continuation_needed", }); const heartbeat = heartbeatService(db); const results = await Promise.allSettled( Array.from({ length: 8 }, () => heartbeat.reconcileStrandedAssignedIssues()), ); expect(results.every((result) => result.status === "fulfilled")).toBe(true); const actions = await db .select() .from(issueRecoveryActions) .where(and( eq(issueRecoveryActions.companyId, companyId), eq(issueRecoveryActions.sourceIssueId, issueId), )); expect(actions).toHaveLength(1); expect(actions[0]?.attemptCount).toBeGreaterThanOrEqual(1); const recoveries = await db .select() .from(issues) .where(and( eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, issueId), )); expect(recoveries).toHaveLength(0); await expect(sourceBlockerIssueIds(companyId, issueId)).resolves.toEqual([]); }); it("blocks stranded recovery issues in place instead of creating nested recovery issues", async () => { const sourceIssueId = randomUUID(); const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", }); await db .update(issues) .set({ title: "Recover stalled issue PAP-1", originKind: "stranded_issue_recovery", originId: sourceIssueId, }) .where(eq(issues.id, issueId)); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(issues).values({ id: sourceIssueId, companyId, title: "Original stranded source", status: "blocked", priority: "medium", issueNumber: 2, identifier: `${issuePrefix}-2`, }); await db.insert(issueRelations).values({ companyId, issueId, relatedIssueId: sourceIssueId, type: "blocks", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(1); expect(result.issueIds).toEqual([issueId]); const recoveryIssue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(recoveryIssue?.status).toBe("blocked"); expect(recoveryIssue?.assigneeAgentId).toBe(agentId); expect(recoveryIssue?.originKind).toBe("stranded_issue_recovery"); expect(recoveryIssue?.originId).toBe(sourceIssueId); const nestedRecoveries = await db .select() .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, issueId))); expect(nestedRecoveries).toHaveLength(0); const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(1); expect(runs[0]?.id).toBe(runId); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(1); expect(comments[0]?.body).toContain("stopped automatic stranded-work recovery"); expect(comments[0]?.body).toContain("Latest retry failure details were withheld from the issue thread"); expect(comments[0]?.body).toContain("recovery issues do not create nested `stranded_issue_recovery` issues"); await expect(sourceBlockerIssueIds(companyId, sourceIssueId)).resolves.toEqual([issueId]); }); it("keeps repeated recovery failures on the same canonical recovery issue", async () => { const sourceIssueId = randomUUID(); const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", }); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(issues).values({ id: sourceIssueId, companyId, title: "Original stranded source", status: "blocked", priority: "medium", issueNumber: 2, identifier: `${issuePrefix}-2`, }); await db .update(issues) .set({ title: "Recover stalled issue PAP-1", originKind: "stranded_issue_recovery", originId: sourceIssueId, }) .where(eq(issues.id, issueId)); await db.insert(issueRelations).values({ companyId, issueId, relatedIssueId: sourceIssueId, type: "blocks", }); const heartbeat = heartbeatService(db); const firstResult = await heartbeat.reconcileStrandedAssignedIssues(); expect(firstResult.escalated).toBe(1); expect(firstResult.issueIds).toEqual([issueId]); const secondRunId = randomUUID(); await db.insert(heartbeatRuns).values({ id: secondRunId, companyId, agentId, invocationSource: "assignment", triggerDetail: "system", status: "failed", contextSnapshot: { issueId, taskId: issueId, wakeReason: "issue_assigned", source: "stranded_issue_recovery", }, startedAt: new Date("2030-03-19T00:10:00.000Z"), finishedAt: new Date("2030-03-19T00:15:00.000Z"), createdAt: new Date("2030-03-19T00:10:00.000Z"), updatedAt: new Date("2030-03-19T00:15:00.000Z"), errorCode: "adapter_failed", error: "adapter failed while retrying recovery issue", }); await db .update(issues) .set({ status: "in_progress", checkoutRunId: secondRunId, executionRunId: null, }) .where(eq(issues.id, issueId)); const secondResult = await heartbeat.reconcileStrandedAssignedIssues(); expect(secondResult.dispatchRequeued).toBe(0); expect(secondResult.continuationRequeued).toBe(0); expect(secondResult.escalated).toBe(1); expect(secondResult.issueIds).toEqual([issueId]); const recoveryIssuesForSource = await db .select() .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, sourceIssueId))); expect(recoveryIssuesForSource.map((issue) => issue.id)).toEqual([issueId]); const nestedRecoveries = await db .select() .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, issueId))); expect(nestedRecoveries).toHaveLength(0); await expect(sourceBlockerIssueIds(companyId, sourceIssueId)).resolves.toEqual([issueId]); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(2); expect(comments[1]?.body).toContain("Latest retry failure details were withheld from the issue thread"); }); it("does not escalate paused-tree recovery when the automatic continuation retry was cancelled by the hold", async () => { const { companyId, agentId, issueId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "cancelled", retryReason: "issue_continuation_needed", activePauseHold: true, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); expect(result.skipped).toBe(1); expect(result.issueIds).toEqual([]); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("in_progress"); expect(issue?.checkoutRunId).toBeTruthy(); const recoveryIssues = await db .select() .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"))); expect(recoveryIssues).toHaveLength(0); const blockerRelations = await db .select() .from(issueRelations) .where( and( eq(issueRelations.companyId, companyId), eq(issueRelations.relatedIssueId, issueId), eq(issueRelations.type, "blocks"), ), ); expect(blockerRelations).toHaveLength(0); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(0); const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); expect(wakeups).toHaveLength(1); }); it("re-enqueues recovery when the latest in-progress continuation made progress but left no live path", async () => { const { agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "succeeded", livenessState: "advanced", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.continuationRequeued).toBe(1); expect(result.productiveContinuationObserved).toBe(0); expect(result.successfulContinuationObserved).toBe(0); expect(result.escalated).toBe(0); expect(result.issueIds).toEqual([issueId]); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("in_progress"); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(0); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const retryRun = runs.find((row) => row.id !== runId); expect(retryRun?.contextSnapshot as Record | undefined).toMatchObject({ issueId, taskId: issueId, retryReason: "issue_continuation_needed", retryOfRunId: runId, source: "issue.productive_terminal_continuation_recovery", }); expect(retryRun?.contextSnapshot as Record).not.toHaveProperty("modelProfile"); const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); expect(wakeups).toHaveLength(2); }); it("blocks stranded in-progress work after a productive continuation retry was already used", async () => { const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "succeeded", retryReason: "issue_continuation_needed", runSource: "issue.productive_terminal_continuation_recovery", livenessState: "advanced", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(1); expect(result.issueIds).toEqual([issueId]); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("blocked"); const recoveryAction = await expectSourceScopedStrandedRecoveryAction({ companyId, agentId, issueId, runId, previousStatus: "in_progress", retryReason: "issue_continuation_needed", }); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(1); expect(comments[0]?.body).toContain("automatically retried continuation"); expect(comments[0]?.body).toContain("still has no live execution path"); expect(comments[0]?.body).toContain(`Recovery action: \`${recoveryAction.id}\``); expect(comments[0]?.body).toContain("Recovery owner: [CodexCoder]"); }); it("allows one productive-terminal recovery after regular continuation recovery made progress", async () => { const { agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "succeeded", retryReason: "issue_continuation_needed", runSource: "issue.continuation_recovery", livenessState: "advanced", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.continuationRequeued).toBe(1); expect(result.escalated).toBe(0); expect(result.issueIds).toEqual([issueId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); const retryRun = runs.find((row) => row.id !== runId); expect(retryRun?.contextSnapshot as Record | undefined).toMatchObject({ issueId, taskId: issueId, retryReason: "issue_continuation_needed", retryOfRunId: runId, source: "issue.productive_terminal_continuation_recovery", }); expect(retryRun?.contextSnapshot as Record).not.toHaveProperty("modelProfile"); }); it("does not treat a productive terminal run as healthy when in-progress work has no live path", async () => { const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "succeeded", livenessState: "advanced", }); const heartbeat = heartbeatService(db); const sourceIssue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(sourceIssue).toMatchObject({ status: "in_progress", assigneeAgentId: agentId, assigneeUserId: null, executionRunId: null, }); const activeRuns = await db .select() .from(heartbeatRuns) .where(and(eq(heartbeatRuns.companyId, companyId), inArray(heartbeatRuns.status, ["queued", "running"]))); expect(activeRuns).toHaveLength(0); const liveWakeups = await db .select() .from(agentWakeupRequests) .where(and(eq(agentWakeupRequests.companyId, companyId), inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]))); expect(liveWakeups).toHaveLength(0); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.productiveContinuationObserved).toBe(0); expect(result.continuationRequeued + result.escalated).toBe(1); expect(result.issueIds).toEqual([issueId]); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); const recoveryIssues = await db .select() .from(issues) .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"))); const followupRuns = await db .select() .from(heartbeatRuns) .where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId))); expect(comments).toHaveLength(0); expect(recoveryIssues).toHaveLength(0); expect(followupRuns).toHaveLength(2); const retryRun = followupRuns.find((row) => row.id !== runId); expect(retryRun?.contextSnapshot as Record | undefined).toMatchObject({ issueId, taskId: issueId, retryReason: "issue_continuation_needed", retryOfRunId: runId, source: "issue.productive_terminal_continuation_recovery", }); expect(retryRun?.contextSnapshot as Record).not.toHaveProperty("modelProfile"); }); it("does not reconcile user-assigned work through the agent stranded-work recovery path", async () => { const { issueId, runId } = await seedStrandedIssueFixture({ status: "todo", runStatus: "failed", assignToUser: true, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("todo"); const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, runId)); expect(runs).toHaveLength(1); }); });