import { randomUUID } from "node:crypto"; import { spawn, type ChildProcess } from "node:child_process"; import { eq } from "drizzle-orm"; import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest"; import { activityLog, agents, agentRuntimeState, agentWakeupRequests, companySkills, companies, createDb, documentRevisions, documents, heartbeatRunEvents, heartbeatRuns, issueComments, issueDocuments, issues, } from "@paperclipai/db"; import { getEmbeddedPostgresTestSupport, startEmbeddedPostgresTestDatabase, } from "./helpers/embedded-postgres.js"; import { runningProcesses } from "../adapters/index.ts"; const mockTelemetryClient = vi.hoisted(() => ({ track: vi.fn() })); const mockTrackAgentFirstHeartbeat = vi.hoisted(() => vi.fn()); const mockAdapterExecute = vi.hoisted(() => vi.fn(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Recovered stranded heartbeat work.", provider: "test", model: "test-model", })), ); vi.mock("../telemetry.ts", () => ({ getTelemetryClient: () => mockTelemetryClient, })); vi.mock("@paperclipai/shared/telemetry", async () => { const actual = await vi.importActual( "@paperclipai/shared/telemetry", ); return { ...actual, trackAgentFirstHeartbeat: mockTrackAgentFirstHeartbeat, }; }); vi.mock("../adapters/index.ts", async () => { const actual = await vi.importActual("../adapters/index.ts"); return { ...actual, getServerAdapter: vi.fn(() => ({ supportsLocalAgentJwt: false, execute: mockAdapterExecute, })), }; }); import { heartbeatService } from "../services/heartbeat.ts"; const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; if (!embeddedPostgresSupport.supported) { console.warn( `Skipping embedded Postgres heartbeat recovery tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, ); } function spawnAliveProcess() { return spawn(process.execPath, ["-e", "setInterval(() => {}, 1000)"], { stdio: "ignore", }); } function isPidAlive(pid: number | null | undefined) { if (typeof pid !== "number" || !Number.isInteger(pid) || pid <= 0) return false; try { process.kill(pid, 0); return true; } catch { return false; } } async function waitForPidExit(pid: number, timeoutMs = 2_000) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (!isPidAlive(pid)) return true; await new Promise((resolve) => setTimeout(resolve, 50)); } return !isPidAlive(pid); } async function waitForRunToSettle( heartbeat: ReturnType, runId: string, timeoutMs = 3_000, ) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { const run = await heartbeat.getRun(runId); if (!run || (run.status !== "queued" && run.status !== "running")) return run; await new Promise((resolve) => setTimeout(resolve, 50)); } return heartbeat.getRun(runId); } async function waitForValue( read: () => Promise, timeoutMs = 3_000, ) { const deadline = Date.now() + timeoutMs; let latest: T | null | undefined = null; while (Date.now() < deadline) { latest = await read(); if (latest) return latest; await new Promise((resolve) => setTimeout(resolve, 50)); } return latest ?? null; } async function spawnOrphanedProcessGroup() { const leader = spawn( process.execPath, [ "-e", [ "const { spawn } = require('node:child_process');", "const child = spawn(process.execPath, ['-e', 'setInterval(() => {}, 1000)'], { stdio: 'ignore' });", "process.stdout.write(String(child.pid));", "setTimeout(() => process.exit(0), 25);", ].join(" "), ], { detached: true, stdio: ["ignore", "pipe", "ignore"], }, ); let stdout = ""; leader.stdout?.on("data", (chunk) => { stdout += String(chunk); }); await new Promise((resolve, reject) => { leader.once("error", reject); leader.once("exit", () => resolve()); }); const descendantPid = Number.parseInt(stdout.trim(), 10); if (!Number.isInteger(descendantPid) || descendantPid <= 0) { throw new Error(`Failed to capture orphaned descendant pid from detached process group: ${stdout}`); } return { processPid: leader.pid ?? null, processGroupId: leader.pid ?? null, descendantPid, }; } describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { let db!: ReturnType; let tempDb: Awaited> | null = null; const childProcesses = new Set(); const cleanupPids = new Set(); beforeAll(async () => { tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-recovery-"); db = createDb(tempDb.connectionString); }, 20_000); afterEach(async () => { vi.clearAllMocks(); mockAdapterExecute.mockImplementation(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Recovered stranded heartbeat work.", provider: "test", model: "test-model", })); runningProcesses.clear(); for (const child of childProcesses) { child.kill("SIGKILL"); } childProcesses.clear(); for (const pid of cleanupPids) { try { process.kill(pid, "SIGKILL"); } catch { // Ignore already-dead cleanup targets. } } cleanupPids.clear(); let idlePolls = 0; for (let attempt = 0; attempt < 100; attempt += 1) { const runs = await db .select({ status: heartbeatRuns.status, processPid: heartbeatRuns.processPid, processGroupId: heartbeatRuns.processGroupId, }) .from(heartbeatRuns); const managedExecutionStillActive = runs.some( (run) => (run.status === "queued" || run.status === "running") && !run.processPid && !run.processGroupId, ); if (!managedExecutionStillActive) { idlePolls += 1; if (idlePolls >= 3) break; } else { idlePolls = 0; } await new Promise((resolve) => setTimeout(resolve, 50)); } await new Promise((resolve) => setTimeout(resolve, 50)); await db.delete(activityLog); await db.delete(agentRuntimeState); await db.delete(companySkills); await db.delete(issueComments); await db.delete(issueDocuments); await db.delete(documentRevisions); await db.delete(documents); await db.delete(issues); await db.delete(heartbeatRunEvents); await db.delete(heartbeatRuns); await db.delete(agentWakeupRequests); for (let attempt = 0; attempt < 5; attempt += 1) { await db.delete(agentRuntimeState); try { await db.delete(agents); break; } catch (error) { if (attempt === 4) throw error; await new Promise((resolve) => setTimeout(resolve, 50)); } } await db.delete(companies); }); afterAll(async () => { for (const child of childProcesses) { child.kill("SIGKILL"); } childProcesses.clear(); for (const pid of cleanupPids) { try { process.kill(pid, "SIGKILL"); } catch { // Ignore already-dead cleanup targets. } } cleanupPids.clear(); runningProcesses.clear(); await tempDb?.cleanup(); }); async function seedRunFixture(input?: { adapterType?: string; agentStatus?: "paused" | "idle" | "running"; runStatus?: "running" | "queued" | "failed"; processPid?: number | null; processGroupId?: number | null; processLossRetryCount?: number; includeIssue?: boolean; runErrorCode?: string | null; runError?: string | null; }) { const companyId = randomUUID(); const agentId = randomUUID(); const runId = randomUUID(); const wakeupRequestId = randomUUID(); const issueId = randomUUID(); const now = new Date("2026-03-19T00:00:00.000Z"); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: input?.agentStatus ?? "paused", adapterType: input?.adapterType ?? "codex_local", adapterConfig: {}, runtimeConfig: {}, permissions: {}, }); await db.insert(agentWakeupRequests).values({ id: wakeupRequestId, companyId, agentId, source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: input?.includeIssue === false ? {} : { issueId }, status: "claimed", runId, claimedAt: now, }); await db.insert(heartbeatRuns).values({ id: runId, companyId, agentId, invocationSource: "assignment", triggerDetail: "system", status: input?.runStatus ?? "running", wakeupRequestId, contextSnapshot: input?.includeIssue === false ? {} : { issueId }, processPid: input?.processPid ?? null, processGroupId: input?.processGroupId ?? null, processLossRetryCount: input?.processLossRetryCount ?? 0, errorCode: input?.runErrorCode ?? null, error: input?.runError ?? null, startedAt: now, updatedAt: new Date("2026-03-19T00:00:00.000Z"), }); if (input?.includeIssue !== false) { await db.insert(issues).values({ id: issueId, companyId, title: "Recover local adapter after lost process", status: "in_progress", priority: "medium", assigneeAgentId: agentId, checkoutRunId: runId, executionRunId: runId, issueNumber: 1, identifier: `${issuePrefix}-1`, }); } return { companyId, agentId, runId, wakeupRequestId, issueId }; } async function seedStrandedIssueFixture(input: { status: "todo" | "in_progress"; runStatus: "failed" | "timed_out" | "cancelled" | "succeeded"; retryReason?: "assignment_recovery" | "issue_continuation_needed" | null; assignToUser?: boolean; }) { const companyId = randomUUID(); const agentId = randomUUID(); const runId = randomUUID(); const wakeupRequestId = randomUUID(); const issueId = randomUUID(); const now = new Date("2026-03-19T00:00:00.000Z"); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: "idle", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: {}, permissions: {}, }); await db.insert(agentWakeupRequests).values({ id: wakeupRequestId, companyId, agentId, source: "assignment", triggerDetail: "system", reason: input.retryReason === "assignment_recovery" ? "issue_assignment_recovery" : "issue_assigned", payload: { issueId }, status: input.runStatus === "cancelled" ? "cancelled" : "failed", runId, claimedAt: now, finishedAt: new Date("2026-03-19T00:05:00.000Z"), error: input.runStatus === "succeeded" ? null : "run failed before issue advanced", }); await db.insert(heartbeatRuns).values({ id: runId, companyId, agentId, invocationSource: "assignment", triggerDetail: "system", status: input.runStatus, wakeupRequestId, contextSnapshot: { issueId, taskId: issueId, wakeReason: input.retryReason === "assignment_recovery" ? "issue_assignment_recovery" : input.retryReason ?? "issue_assigned", ...(input.retryReason ? { retryReason: input.retryReason } : {}), }, startedAt: now, finishedAt: new Date("2026-03-19T00:05:00.000Z"), updatedAt: new Date("2026-03-19T00:05:00.000Z"), errorCode: input.runStatus === "succeeded" ? null : "process_lost", error: input.runStatus === "succeeded" ? null : "run failed before issue advanced", }); await db.insert(issues).values({ id: issueId, companyId, title: "Recover stranded assigned work", status: input.status, priority: "medium", assigneeAgentId: input.assignToUser ? null : agentId, assigneeUserId: input.assignToUser ? "user-1" : null, checkoutRunId: input.status === "in_progress" ? runId : null, executionRunId: null, issueNumber: 1, identifier: `${issuePrefix}-1`, startedAt: input.status === "in_progress" ? now : null, }); return { companyId, agentId, runId, wakeupRequestId, issueId }; } it("keeps a local run active when the recorded pid is still alive", async () => { const child = spawnAliveProcess(); childProcesses.add(child); expect(child.pid).toBeTypeOf("number"); const { runId, wakeupRequestId } = await seedRunFixture({ processPid: child.pid ?? null, includeIssue: false, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(0); const run = await heartbeat.getRun(runId); expect(run?.status).toBe("running"); expect(run?.errorCode).toBe("process_detached"); expect(run?.error).toContain(String(child.pid)); const wakeup = await db .select() .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null); expect(wakeup?.status).toBe("claimed"); }); it("queues exactly one retry when the recorded local pid is dead", async () => { const { agentId, runId, issueId } = await seedRunFixture({ processPid: 999_999_999, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(1); expect(result.runIds).toEqual([runId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const failedRun = runs.find((row) => row.id === runId); const retryRun = runs.find((row) => row.id !== runId); expect(failedRun?.status).toBe("failed"); expect(failedRun?.errorCode).toBe("process_lost"); expect(failedRun?.livenessState).toBe("failed"); expect(failedRun?.livenessReason).toContain("process_lost"); expect(failedRun?.resultJson).toMatchObject({ stopReason: "process_lost", timeoutConfigured: false, timeoutFired: false, }); expect(retryRun?.status).toBe("queued"); expect(retryRun?.retryOfRunId).toBe(runId); expect(retryRun?.processLossRetryCount).toBe(1); const issue = await db .select() .from(issues) .where(eq(issues.id, issueId)) .then((rows) => rows[0] ?? null); expect(issue?.executionRunId).toBe(retryRun?.id ?? null); expect(issue?.checkoutRunId).toBe(runId); }); it.skipIf(process.platform === "win32")("reaps orphaned descendant process groups when the parent pid is already gone", async () => { const orphan = await spawnOrphanedProcessGroup(); cleanupPids.add(orphan.descendantPid); expect(isPidAlive(orphan.descendantPid)).toBe(true); const { agentId, runId, issueId } = await seedRunFixture({ processPid: orphan.processPid, processGroupId: orphan.processGroupId, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(1); expect(result.runIds).toEqual([runId]); expect(await waitForPidExit(orphan.descendantPid, 2_000)).toBe(true); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const failedRun = runs.find((row) => row.id === runId); expect(failedRun?.status).toBe("failed"); expect(failedRun?.errorCode).toBe("process_lost"); expect(failedRun?.error).toContain("descendant process group"); const retryRun = runs.find((row) => row.id !== runId); expect(retryRun?.status).toBe("queued"); const issue = await db .select() .from(issues) .where(eq(issues.id, issueId)) .then((rows) => rows[0] ?? null); expect(issue?.executionRunId).toBe(retryRun?.id ?? null); }); it("does not queue a second retry after the first process-loss retry was already used", async () => { const { agentId, runId, issueId } = await seedRunFixture({ processPid: 999_999_999, processLossRetryCount: 1, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reapOrphanedRuns(); expect(result.reaped).toBe(1); expect(result.runIds).toEqual([runId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(1); expect(runs[0]?.status).toBe("failed"); const issue = await db .select() .from(issues) .where(eq(issues.id, issueId)) .then((rows) => rows[0] ?? null); expect(issue?.executionRunId).toBeNull(); expect(issue?.checkoutRunId).toBe(runId); }); it("clears the detached warning when the run reports activity again", async () => { const { runId } = await seedRunFixture({ includeIssue: false, runErrorCode: "process_detached", runError: "Lost in-memory process handle, but child pid 123 is still alive", }); const heartbeat = heartbeatService(db); const updated = await heartbeat.reportRunActivity(runId); expect(updated?.errorCode).toBeNull(); expect(updated?.error).toBeNull(); const run = await heartbeat.getRun(runId); expect(run?.errorCode).toBeNull(); expect(run?.error).toBeNull(); }); it("tracks the first heartbeat with the agent role instead of adapter type", async () => { const { agentId, runId } = await seedRunFixture({ agentStatus: "running", includeIssue: false, }); const heartbeat = heartbeatService(db); await heartbeat.cancelRun(runId); expect(mockTrackAgentFirstHeartbeat).toHaveBeenCalledWith( mockTelemetryClient, expect.objectContaining({ agentRole: "engineer", agentId, }), ); }); it("records manual cancellation stop metadata", async () => { const { runId } = await seedRunFixture({ agentStatus: "running", includeIssue: false, }); const heartbeat = heartbeatService(db); const cancelled = await heartbeat.cancelRun(runId); expect(cancelled?.status).toBe("cancelled"); expect(cancelled?.resultJson).toMatchObject({ stopReason: "cancelled", effectiveTimeoutSec: 0, timeoutConfigured: false, timeoutFired: false, }); }); it("re-enqueues assigned todo work when the last issue run died and no wake remains", async () => { const { agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "todo", runStatus: "failed", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(1); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); expect(result.issueIds).toEqual([issueId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const retryRun = runs.find((row) => row.id !== runId); expect(retryRun?.id).toBeTruthy(); expect((retryRun?.contextSnapshot as Record)?.retryReason).toBe("assignment_recovery"); if (retryRun) { await waitForRunToSettle(heartbeat, retryRun.id); } }); it("blocks assigned todo work after the one automatic dispatch recovery was already used", async () => { const { issueId } = await seedStrandedIssueFixture({ status: "todo", runStatus: "failed", retryReason: "assignment_recovery", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.escalated).toBe(1); expect(result.issueIds).toEqual([issueId]); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("blocked"); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(1); expect(comments[0]?.body).toContain("retried dispatch"); expect(comments[0]?.body).toContain("Latest retry failure: `process_lost` - run failed before issue advanced."); }); it("re-enqueues continuation for stranded in-progress work with no active run", async () => { const { agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(1); expect(result.escalated).toBe(0); expect(result.issueIds).toEqual([issueId]); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); expect(runs).toHaveLength(2); const retryRun = runs.find((row) => row.id !== runId); expect(retryRun?.id).toBeTruthy(); expect((retryRun?.contextSnapshot as Record)?.retryReason).toBe("issue_continuation_needed"); if (retryRun) { await waitForRunToSettle(heartbeat, retryRun.id); } }); it("classifies actionable plan-only recovery and enqueues one liveness continuation", async () => { mockAdapterExecute.mockResolvedValueOnce({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "I will inspect the repo next and then implement the fix.", provider: "test", model: "test-model", }); const { agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", }); const heartbeat = heartbeatService(db); await heartbeat.reconcileStrandedAssignedIssues(); const livenessWake = await waitForValue(async () => { const rows = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); return rows.find((row) => row.reason === "run_liveness_continuation") ?? null; }); expect(livenessWake).toBeTruthy(); expect(livenessWake?.payload).toMatchObject({ issueId, livenessState: "plan_only", continuationAttempt: 1, }); const sourceRunId = (livenessWake?.payload as Record | null)?.sourceRunId; expect(sourceRunId).toBeTruthy(); const sourceRun = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.id, String(sourceRunId))) .then((rows) => rows[0] ?? null); expect(sourceRun?.id).not.toBe(runId); expect(sourceRun?.livenessState).toBe("plan_only"); }); it("treats a plan document update as progress and does not enqueue liveness continuation", async () => { const { agentId, companyId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", }); mockAdapterExecute.mockImplementationOnce(async (ctx: { runId: string }) => { const documentId = randomUUID(); const revisionId = randomUUID(); await db.insert(documents).values({ id: documentId, companyId, title: "Plan", format: "markdown", latestBody: "# Plan\n\n- Inspect files\n- Implement fix", latestRevisionId: revisionId, latestRevisionNumber: 1, createdByAgentId: agentId, updatedByAgentId: agentId, }); await db.insert(documentRevisions).values({ id: revisionId, companyId, documentId, revisionNumber: 1, title: "Plan", format: "markdown", body: "# Plan\n\n- Inspect files\n- Implement fix", createdByAgentId: agentId, createdByRunId: ctx.runId, }); await db.insert(issueDocuments).values({ companyId, issueId, documentId, key: "plan", }); return { exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Plan:\n- Inspect files\n- Implement fix", provider: "test", model: "test-model", }; }); const heartbeat = heartbeatService(db); await heartbeat.reconcileStrandedAssignedIssues(); const retryRun = await waitForValue(async () => { const rows = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); return rows.find((row) => row.id !== runId && row.livenessState === "advanced") ?? null; }); expect(retryRun?.livenessState).toBe("advanced"); const wakes = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); expect(wakes.some((row) => row.reason === "run_liveness_continuation")).toBe(false); }); it("blocks stranded in-progress work after the continuation retry was already used", async () => { const { issueId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", retryReason: "issue_continuation_needed", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(1); expect(result.issueIds).toEqual([issueId]); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("blocked"); const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(1); expect(comments[0]?.body).toContain("retried continuation"); expect(comments[0]?.body).toContain("Latest retry failure: `process_lost` - run failed before issue advanced."); }); it("does not reconcile user-assigned work through the agent stranded-work recovery path", async () => { const { issueId, runId } = await seedStrandedIssueFixture({ status: "todo", runStatus: "failed", assignToUser: true, }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); expect(result.dispatchRequeued).toBe(0); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("todo"); const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, runId)); expect(runs).toHaveLength(1); }); });