Files
paperclip/server/src/__tests__/heartbeat-process-recovery.test.ts
T
Devin Foley 4cf612a92d Fix runtime state race, workspace sync, plugin startup, and orphaned leases (#4804)
## Thinking Path

> - Paperclip orchestrates AI agents for zero-human companies
> - Agents run inside environments that are leased, and the server
manages runtime state, workspace configuration, and plugin lifecycle
> - Several edge cases caused failures during concurrent operations: a
race condition in runtime state insertion could produce duplicate-key
errors, reused workspaces didn't sync their configuration when the
parent issue was updated, sandbox provider plugins could be queried
before registration completed, and orphaned environment leases from
failed runs were never released
> - This PR fixes these four runtime/environment issues
> - The benefit is more reliable concurrent agent execution and proper
resource cleanup

## What Changed

- `services/heartbeat.ts`: Fixed a race condition where concurrent
runtime state inserts could fail with a duplicate-key error by using an
upsert pattern
- `services/issues.ts`: Sync reused workspace configuration when an
issue is updated, so the workspace reflects the latest issue state
- `services/environment-runtime.ts`: Fixed a startup race where sandbox
provider plugins could be queried before registration completed, by
awaiting plugin readiness before resolving environment drivers
- `services/heartbeat.ts`: Release environment leases for orphaned runs
that lost their process without cleanup

## Verification

- `pnpm test` — all existing and new tests pass, including new tests for
runtime state upsert and process recovery lease cleanup
- `pnpm typecheck` — clean
- Manual: trigger concurrent agent runs to verify no duplicate-key
failures; verify orphaned leases are released after process loss

## Risks

- Low risk. The runtime state upsert changes insert-to-upsert behavior,
which could mask a legitimate duplicate if two different runs produce
the same key — but this is prevented by the run ID being part of the
key. The plugin startup await is bounded by the existing registration
timeout.

## Model Used

Codex GPT 5.4 high via Paperclip.

## Checklist

- [x] I have included a thinking path that traces from project context
to this change
- [x] I have specified the model used (with version and capability
details)
- [x] I have checked ROADMAP.md and confirmed this PR does not duplicate
planned core work
- [x] I have run tests locally and they pass
- [x] I have added or updated tests where applicable
- [ ] If this change affects the UI, I have included before/after
screenshots
- [x] I have updated relevant documentation to reflect my changes
- [x] I have considered and documented any risks above
- [x] I will address all Greptile and reviewer comments before
requesting merge
2026-04-29 16:37:10 -07:00

2235 lines
74 KiB
TypeScript

import { randomUUID } from "node:crypto";
import { spawn, type ChildProcess } from "node:child_process";
import { and, eq, or, inArray } from "drizzle-orm";
import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest";
import {
activityLog,
agents,
agentRuntimeState,
agentWakeupRequests,
budgetPolicies,
companySkills,
companies,
costEvents,
createDb,
documentRevisions,
documents,
environmentLeases,
environments,
heartbeatRunEvents,
heartbeatRuns,
issueComments,
issueDocuments,
issueRelations,
issueTreeHoldMembers,
issueTreeHolds,
issues,
} from "@paperclipai/db";
import {
getEmbeddedPostgresTestSupport,
startEmbeddedPostgresTestDatabase,
} from "./helpers/embedded-postgres.js";
import { runningProcesses } from "../adapters/index.ts";
const mockTelemetryClient = vi.hoisted(() => ({ track: vi.fn() }));
const mockTrackAgentFirstHeartbeat = vi.hoisted(() => vi.fn());
const mockAdapterExecute = vi.hoisted(() =>
vi.fn(async () => ({
exitCode: 0,
signal: null,
timedOut: false,
errorMessage: null,
summary: "Recovered stranded heartbeat work.",
provider: "test",
model: "test-model",
})),
);
vi.mock("../telemetry.ts", () => ({
getTelemetryClient: () => mockTelemetryClient,
}));
vi.mock("@paperclipai/shared/telemetry", async () => {
const actual = await vi.importActual<typeof import("@paperclipai/shared/telemetry")>(
"@paperclipai/shared/telemetry",
);
return {
...actual,
trackAgentFirstHeartbeat: mockTrackAgentFirstHeartbeat,
};
});
vi.mock("../adapters/index.ts", async () => {
const actual = await vi.importActual<typeof import("../adapters/index.ts")>("../adapters/index.ts");
return {
...actual,
getServerAdapter: vi.fn(() => ({
supportsLocalAgentJwt: false,
execute: mockAdapterExecute,
})),
};
});
import { heartbeatService } from "../services/heartbeat.ts";
const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport();
const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip;
if (!embeddedPostgresSupport.supported) {
console.warn(
`Skipping embedded Postgres heartbeat recovery tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`,
);
}
function spawnAliveProcess() {
return spawn(process.execPath, ["-e", "setInterval(() => {}, 1000)"], {
stdio: "ignore",
});
}
function isPidAlive(pid: number | null | undefined) {
if (typeof pid !== "number" || !Number.isInteger(pid) || pid <= 0) return false;
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
async function waitForPidExit(pid: number, timeoutMs = 2_000) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (!isPidAlive(pid)) return true;
await new Promise((resolve) => setTimeout(resolve, 50));
}
return !isPidAlive(pid);
}
async function waitForRunToSettle(
heartbeat: ReturnType<typeof heartbeatService>,
runId: string,
timeoutMs = 3_000,
) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const run = await heartbeat.getRun(runId);
if (!run || (run.status !== "queued" && run.status !== "running")) return run;
await new Promise((resolve) => setTimeout(resolve, 50));
}
return heartbeat.getRun(runId);
}
async function waitForValue<T>(
read: () => Promise<T | null | undefined>,
timeoutMs = 3_000,
) {
const deadline = Date.now() + timeoutMs;
let latest: T | null | undefined = null;
while (Date.now() < deadline) {
latest = await read();
if (latest) return latest;
await new Promise((resolve) => setTimeout(resolve, 50));
}
return latest ?? null;
}
async function waitForHeartbeatIdle(
db: ReturnType<typeof createDb>,
timeoutMs = 3_000,
) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const runs = await db
.select({
status: heartbeatRuns.status,
})
.from(heartbeatRuns);
if (!runs.some((run) => run.status === "queued" || run.status === "running")) {
return;
}
await new Promise((resolve) => setTimeout(resolve, 50));
}
}
async function cancelActiveRunsForCleanup(
db: ReturnType<typeof createDb>,
timeoutMs = 3_000,
) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const activeRuns = await db
.select({
id: heartbeatRuns.id,
wakeupRequestId: heartbeatRuns.wakeupRequestId,
})
.from(heartbeatRuns)
.where(
or(
eq(heartbeatRuns.status, "queued"),
eq(heartbeatRuns.status, "running"),
),
);
if (activeRuns.length === 0) return;
const now = new Date();
const runIds = activeRuns.map((run) => run.id);
const wakeupRequestIds = activeRuns
.map((run) => run.wakeupRequestId)
.filter((value): value is string => typeof value === "string" && value.length > 0);
await db
.update(heartbeatRuns)
.set({
status: "cancelled",
finishedAt: now,
updatedAt: now,
errorCode: "test_cleanup",
error: "Cancelled by heartbeat-process-recovery test cleanup",
processPid: null,
processGroupId: null,
})
.where(inArray(heartbeatRuns.id, runIds));
if (wakeupRequestIds.length > 0) {
await db
.update(agentWakeupRequests)
.set({
status: "cancelled",
finishedAt: now,
error: "Cancelled by heartbeat-process-recovery test cleanup",
})
.where(inArray(agentWakeupRequests.id, wakeupRequestIds));
}
await new Promise((resolve) => setTimeout(resolve, 50));
}
}
async function spawnOrphanedProcessGroup() {
const leader = spawn(
process.execPath,
[
"-e",
[
"const { spawn } = require('node:child_process');",
"const child = spawn(process.execPath, ['-e', 'setInterval(() => {}, 1000)'], { stdio: 'ignore' });",
"process.stdout.write(String(child.pid));",
"setTimeout(() => process.exit(0), 25);",
].join(" "),
],
{
detached: true,
stdio: ["ignore", "pipe", "ignore"],
},
);
let stdout = "";
leader.stdout?.on("data", (chunk) => {
stdout += String(chunk);
});
await new Promise<void>((resolve, reject) => {
leader.once("error", reject);
leader.once("exit", () => resolve());
});
const descendantPid = Number.parseInt(stdout.trim(), 10);
if (!Number.isInteger(descendantPid) || descendantPid <= 0) {
throw new Error(`Failed to capture orphaned descendant pid from detached process group: ${stdout}`);
}
return {
processPid: leader.pid ?? null,
processGroupId: leader.pid ?? null,
descendantPid,
};
}
describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
let db!: ReturnType<typeof createDb>;
let tempDb: Awaited<ReturnType<typeof startEmbeddedPostgresTestDatabase>> | null = null;
const childProcesses = new Set<ChildProcess>();
const cleanupPids = new Set<number>();
beforeAll(async () => {
tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-recovery-");
db = createDb(tempDb.connectionString);
}, 20_000);
afterEach(async () => {
vi.clearAllMocks();
mockAdapterExecute.mockImplementation(async () => ({
exitCode: 0,
signal: null,
timedOut: false,
errorMessage: null,
summary: "Recovered stranded heartbeat work.",
provider: "test",
model: "test-model",
}));
runningProcesses.clear();
for (const child of childProcesses) {
child.kill("SIGKILL");
}
childProcesses.clear();
for (const pid of cleanupPids) {
try {
process.kill(pid, "SIGKILL");
} catch {
// Ignore already-dead cleanup targets.
}
}
cleanupPids.clear();
await cancelActiveRunsForCleanup(db, 5_000);
let idlePolls = 0;
for (let attempt = 0; attempt < 100; attempt += 1) {
const runs = await db
.select({
status: heartbeatRuns.status,
processPid: heartbeatRuns.processPid,
processGroupId: heartbeatRuns.processGroupId,
})
.from(heartbeatRuns);
const managedExecutionStillActive = runs.some(
(run) =>
(run.status === "queued" || run.status === "running") &&
!run.processPid &&
!run.processGroupId,
);
if (!managedExecutionStillActive) {
idlePolls += 1;
if (idlePolls >= 3) break;
} else {
idlePolls = 0;
}
await new Promise((resolve) => setTimeout(resolve, 50));
}
await new Promise((resolve) => setTimeout(resolve, 50));
await waitForHeartbeatIdle(db, 5_000);
await new Promise((resolve) => setTimeout(resolve, 100));
await db.delete(activityLog);
await db.delete(agentRuntimeState);
await db.delete(companySkills);
await db.delete(costEvents);
await db.delete(environmentLeases);
await db.delete(environments);
await db.delete(issueComments);
await db.delete(issueDocuments);
await db.delete(documentRevisions);
await db.delete(documents);
await db.delete(issueRelations);
await db.delete(issueTreeHoldMembers);
await db.delete(issueTreeHolds);
for (let attempt = 0; attempt < 5; attempt += 1) {
await db.delete(issueComments);
await db.delete(issueDocuments);
try {
await db.delete(issues);
break;
} catch (error) {
if (attempt === 4) throw error;
await new Promise((resolve) => setTimeout(resolve, 50));
}
}
for (let attempt = 0; attempt < 5; attempt += 1) {
await db.delete(activityLog);
await db.delete(heartbeatRunEvents);
try {
await db.delete(heartbeatRuns);
break;
} catch (error) {
if (attempt === 4) throw error;
await new Promise((resolve) => setTimeout(resolve, 50));
}
}
await db.delete(agentWakeupRequests);
await db.delete(budgetPolicies);
for (let attempt = 0; attempt < 5; attempt += 1) {
await db.delete(agentRuntimeState);
try {
await db.delete(agents);
break;
} catch (error) {
if (attempt === 4) throw error;
await new Promise((resolve) => setTimeout(resolve, 50));
}
}
for (let attempt = 0; attempt < 5; attempt += 1) {
await db.delete(companySkills);
try {
await db.delete(companies);
break;
} catch (error) {
if (attempt === 4) throw error;
await new Promise((resolve) => setTimeout(resolve, 50));
}
}
});
afterAll(async () => {
for (const child of childProcesses) {
child.kill("SIGKILL");
}
childProcesses.clear();
for (const pid of cleanupPids) {
try {
process.kill(pid, "SIGKILL");
} catch {
// Ignore already-dead cleanup targets.
}
}
cleanupPids.clear();
runningProcesses.clear();
await tempDb?.cleanup();
});
async function seedRunFixture(input?: {
adapterType?: string;
agentStatus?: "paused" | "idle" | "running";
runStatus?: "running" | "queued" | "failed";
processPid?: number | null;
processGroupId?: number | null;
processLossRetryCount?: number;
includeIssue?: boolean;
runErrorCode?: string | null;
runError?: string | null;
}) {
const companyId = randomUUID();
const agentId = randomUUID();
const runId = randomUUID();
const wakeupRequestId = randomUUID();
const issueId = randomUUID();
const now = new Date("2026-03-19T00:00:00.000Z");
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(companies).values({
id: companyId,
name: "Paperclip",
issuePrefix,
requireBoardApprovalForNewAgents: false,
});
await db.insert(agents).values({
id: agentId,
companyId,
name: "CodexCoder",
role: "engineer",
status: input?.agentStatus ?? "paused",
adapterType: input?.adapterType ?? "codex_local",
adapterConfig: {},
runtimeConfig: {},
permissions: {},
});
await db.insert(agentWakeupRequests).values({
id: wakeupRequestId,
companyId,
agentId,
source: "assignment",
triggerDetail: "system",
reason: "issue_assigned",
payload: input?.includeIssue === false ? {} : { issueId },
status: "claimed",
runId,
claimedAt: now,
});
await db.insert(heartbeatRuns).values({
id: runId,
companyId,
agentId,
invocationSource: "assignment",
triggerDetail: "system",
status: input?.runStatus ?? "running",
wakeupRequestId,
contextSnapshot: input?.includeIssue === false ? {} : { issueId },
processPid: input?.processPid ?? null,
processGroupId: input?.processGroupId ?? null,
processLossRetryCount: input?.processLossRetryCount ?? 0,
errorCode: input?.runErrorCode ?? null,
error: input?.runError ?? null,
startedAt: now,
updatedAt: new Date("2026-03-19T00:00:00.000Z"),
});
if (input?.includeIssue !== false) {
await db.insert(issues).values({
id: issueId,
companyId,
title: "Recover local adapter after lost process",
status: "in_progress",
priority: "medium",
assigneeAgentId: agentId,
checkoutRunId: runId,
executionRunId: runId,
issueNumber: 1,
identifier: `${issuePrefix}-1`,
});
}
return { companyId, agentId, runId, wakeupRequestId, issueId };
}
async function seedEnvironmentLeaseFixture(input: {
companyId: string;
runId: string;
issueId: string;
provider?: string;
}) {
const environmentId = randomUUID();
const leaseId = randomUUID();
const now = new Date("2026-03-19T00:00:00.000Z");
await db.insert(environments).values({
id: environmentId,
companyId: input.companyId,
name: "Local test environment",
driver: "local",
status: "active",
config: {},
metadata: null,
});
await db.insert(environmentLeases).values({
id: leaseId,
companyId: input.companyId,
environmentId,
issueId: input.issueId,
heartbeatRunId: input.runId,
status: "active",
leasePolicy: "ephemeral",
provider: input.provider ?? "local",
providerLeaseId: null,
acquiredAt: now,
lastUsedAt: now,
metadata: {
driver: "local",
},
createdAt: now,
updatedAt: now,
});
return { environmentId, leaseId };
}
async function seedStrandedIssueFixture(input: {
status: "todo" | "in_progress";
runStatus: "failed" | "timed_out" | "cancelled" | "succeeded";
retryReason?: "assignment_recovery" | "issue_continuation_needed" | null;
assignToUser?: boolean;
activePauseHold?: boolean;
livenessState?: "completed" | "advanced" | "plan_only" | "empty_response" | "blocked" | "failed" | "needs_followup" | null;
runErrorCode?: string | null;
runError?: string | null;
}) {
const companyId = randomUUID();
const agentId = randomUUID();
const runId = randomUUID();
const wakeupRequestId = randomUUID();
const rootIssueId = randomUUID();
const issueId = randomUUID();
const now = new Date("2026-03-19T00:00:00.000Z");
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(companies).values({
id: companyId,
name: "Paperclip",
issuePrefix,
requireBoardApprovalForNewAgents: false,
});
await db.insert(agents).values({
id: agentId,
companyId,
name: "CodexCoder",
role: "engineer",
status: "idle",
adapterType: "codex_local",
adapterConfig: {},
runtimeConfig: {},
permissions: {},
});
await db.insert(agentWakeupRequests).values({
id: wakeupRequestId,
companyId,
agentId,
source: "assignment",
triggerDetail: "system",
reason: input.retryReason === "assignment_recovery" ? "issue_assignment_recovery" : "issue_assigned",
payload: { issueId },
status: input.runStatus === "cancelled" ? "cancelled" : "failed",
runId,
claimedAt: now,
finishedAt: new Date("2026-03-19T00:05:00.000Z"),
error: input.runStatus === "succeeded"
? null
: ("runError" in input ? input.runError : "run failed before issue advanced"),
});
await db.insert(heartbeatRuns).values({
id: runId,
companyId,
agentId,
invocationSource: "assignment",
triggerDetail: "system",
status: input.runStatus,
wakeupRequestId,
contextSnapshot: {
issueId,
taskId: issueId,
wakeReason: input.retryReason === "assignment_recovery"
? "issue_assignment_recovery"
: input.retryReason ?? "issue_assigned",
...(input.retryReason ? { retryReason: input.retryReason } : {}),
},
startedAt: now,
finishedAt: new Date("2026-03-19T00:05:00.000Z"),
updatedAt: new Date("2026-03-19T00:05:00.000Z"),
errorCode: input.runStatus === "succeeded"
? null
: ("runErrorCode" in input ? input.runErrorCode : "process_lost"),
error: input.runStatus === "succeeded"
? null
: ("runError" in input ? input.runError : "run failed before issue advanced"),
livenessState: input.livenessState ?? null,
});
await db.insert(issues).values([
...(input.activePauseHold
? [{
id: rootIssueId,
companyId,
title: "Paused recovery root",
status: "todo",
priority: "medium",
issueNumber: 1,
identifier: `${issuePrefix}-1`,
}]
: []),
{
id: issueId,
companyId,
parentId: input.activePauseHold ? rootIssueId : null,
title: "Recover stranded assigned work",
status: input.status,
priority: "medium",
assigneeAgentId: input.assignToUser ? null : agentId,
assigneeUserId: input.assignToUser ? "user-1" : null,
checkoutRunId: input.status === "in_progress" ? runId : null,
executionRunId: null,
issueNumber: input.activePauseHold ? 2 : 1,
identifier: `${issuePrefix}-${input.activePauseHold ? 2 : 1}`,
startedAt: input.status === "in_progress" ? now : null,
},
]);
if (input.activePauseHold) {
await db.insert(issueTreeHolds).values({
companyId,
rootIssueId,
mode: "pause",
status: "active",
reason: "pause recovery subtree",
releasePolicy: { strategy: "manual" },
});
}
return { companyId, agentId, runId, wakeupRequestId, issueId, rootIssueId };
}
async function seedAssignedTodoNoRunFixture(input?: {
agentStatus?: "paused" | "idle" | "running";
}) {
const companyId = randomUUID();
const agentId = randomUUID();
const issueId = randomUUID();
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(companies).values({
id: companyId,
name: "Paperclip",
issuePrefix,
requireBoardApprovalForNewAgents: false,
});
await db.insert(agents).values({
id: agentId,
companyId,
name: "CodexCoder",
role: "engineer",
status: input?.agentStatus ?? "idle",
adapterType: "codex_local",
adapterConfig: {},
runtimeConfig: {},
permissions: {},
});
await db.insert(issues).values({
id: issueId,
companyId,
title: "Assigned todo work that never received a heartbeat",
status: "todo",
priority: "medium",
assigneeAgentId: agentId,
assigneeUserId: null,
issueNumber: 1,
identifier: `${issuePrefix}-1`,
});
return { companyId, agentId, issueId };
}
async function expectStrandedRecoveryArtifacts(input: {
companyId: string;
agentId: string;
issueId: string;
runId: string;
previousStatus: "todo" | "in_progress";
retryReason: "assignment_recovery" | "issue_continuation_needed";
}) {
const recovery = await waitForValue(async () =>
db.select().from(issues).where(
and(
eq(issues.companyId, input.companyId),
eq(issues.originKind, "stranded_issue_recovery"),
eq(issues.originId, input.issueId),
),
).then((rows) => rows[0] ?? null),
);
if (!recovery) throw new Error("Expected stranded issue recovery issue to be created");
expect(recovery).toMatchObject({
companyId: input.companyId,
parentId: input.issueId,
assigneeAgentId: input.agentId,
originKind: "stranded_issue_recovery",
originId: input.issueId,
originRunId: input.runId,
priority: "medium",
});
expect(recovery.title).toContain("Recover stalled issue");
expect(recovery.description).toContain(`Previous source status: \`${input.previousStatus}\``);
expect(recovery.description).toContain(`Retry reason: \`${input.retryReason}\``);
expect(recovery.description).toContain("Fix the runtime/adapter problem");
const relation = await db
.select()
.from(issueRelations)
.where(
and(
eq(issueRelations.companyId, input.companyId),
eq(issueRelations.issueId, recovery.id),
eq(issueRelations.relatedIssueId, input.issueId),
eq(issueRelations.type, "blocks"),
),
)
.then((rows) => rows[0] ?? null);
expect(relation).toBeTruthy();
const wakeups = await db
.select()
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.agentId, input.agentId));
const recoveryWakeup = wakeups.find((wakeup) => {
const payload = wakeup.payload as Record<string, unknown> | null;
return payload?.issueId === recovery.id &&
payload?.sourceIssueId === input.issueId &&
payload?.strandedRunId === input.runId;
});
expect(recoveryWakeup).toMatchObject({
companyId: input.companyId,
reason: "issue_assigned",
source: "assignment",
});
const recoveryRun = recoveryWakeup?.runId
? await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, recoveryWakeup.runId))
.then((rows) => rows[0] ?? null)
: null;
expect(recoveryRun?.contextSnapshot).toMatchObject({
issueId: recovery.id,
taskId: recovery.id,
source: "stranded_issue_recovery",
sourceIssueId: input.issueId,
strandedRunId: input.runId,
});
return recovery;
}
async function sourceBlockerIssueIds(companyId: string, sourceIssueId: string) {
return db
.select({ blockerIssueId: issueRelations.issueId })
.from(issueRelations)
.where(
and(
eq(issueRelations.companyId, companyId),
eq(issueRelations.relatedIssueId, sourceIssueId),
eq(issueRelations.type, "blocks"),
),
)
.then((rows) => rows.map((row) => row.blockerIssueId));
}
async function seedQueuedIssueRunFixture() {
const companyId = randomUUID();
const agentId = randomUUID();
const runId = randomUUID();
const wakeupRequestId = randomUUID();
const issueId = randomUUID();
const now = new Date("2026-03-19T00:00:00.000Z");
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(companies).values({
id: companyId,
name: "Paperclip",
issuePrefix,
requireBoardApprovalForNewAgents: false,
});
await db.insert(agents).values({
id: agentId,
companyId,
name: "CodexCoder",
role: "engineer",
status: "idle",
adapterType: "codex_local",
adapterConfig: {},
runtimeConfig: {
heartbeat: {
wakeOnDemand: true,
maxConcurrentRuns: 1,
},
},
permissions: {},
});
await db.insert(agentWakeupRequests).values({
id: wakeupRequestId,
companyId,
agentId,
source: "assignment",
triggerDetail: "system",
reason: "issue_assigned",
payload: { issueId },
status: "queued",
runId,
requestedAt: now,
updatedAt: now,
});
await db.insert(heartbeatRuns).values({
id: runId,
companyId,
agentId,
invocationSource: "assignment",
triggerDetail: "system",
status: "queued",
wakeupRequestId,
contextSnapshot: {
issueId,
taskId: issueId,
wakeReason: "issue_assigned",
},
updatedAt: now,
createdAt: now,
});
await db.insert(issues).values({
id: issueId,
companyId,
title: "Retry transient Codex failure without blocking",
status: "in_progress",
priority: "medium",
assigneeAgentId: agentId,
checkoutRunId: runId,
executionRunId: runId,
issueNumber: 1,
identifier: `${issuePrefix}-1`,
startedAt: now,
});
return { companyId, agentId, runId, wakeupRequestId, issueId };
}
it("keeps a local run active when the recorded pid is still alive", async () => {
const child = spawnAliveProcess();
childProcesses.add(child);
expect(child.pid).toBeTypeOf("number");
const { runId, wakeupRequestId } = await seedRunFixture({
processPid: child.pid ?? null,
includeIssue: false,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reapOrphanedRuns();
expect(result.reaped).toBe(0);
const run = await heartbeat.getRun(runId);
expect(run?.status).toBe("running");
expect(run?.errorCode).toBe("process_detached");
expect(run?.error).toContain(String(child.pid));
const wakeup = await db
.select()
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.id, wakeupRequestId))
.then((rows) => rows[0] ?? null);
expect(wakeup?.status).toBe("claimed");
});
it("queues exactly one retry when the recorded local pid is dead", async () => {
const { agentId, runId, issueId } = await seedRunFixture({
processPid: 999_999_999,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reapOrphanedRuns();
expect(result.reaped).toBe(1);
expect(result.runIds).toEqual([runId]);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(2);
const failedRun = runs.find((row) => row.id === runId);
const retryRun = runs.find((row) => row.id !== runId);
expect(failedRun?.status).toBe("failed");
expect(failedRun?.errorCode).toBe("process_lost");
expect(failedRun?.livenessState).toBe("failed");
expect(failedRun?.livenessReason).toContain("process_lost");
expect(failedRun?.resultJson).toMatchObject({
stopReason: "process_lost",
timeoutConfigured: false,
timeoutFired: false,
});
expect(retryRun?.status).toBe("queued");
expect(retryRun?.retryOfRunId).toBe(runId);
expect(retryRun?.processLossRetryCount).toBe(1);
const issue = await db
.select()
.from(issues)
.where(eq(issues.id, issueId))
.then((rows) => rows[0] ?? null);
expect(issue?.executionRunId).toBe(retryRun?.id ?? null);
expect(issue?.checkoutRunId).toBe(runId);
});
it("releases active environment leases when an orphaned run is reaped", async () => {
const { runId, issueId, companyId } = await seedRunFixture({
processPid: 999_999_999,
});
const { leaseId } = await seedEnvironmentLeaseFixture({
companyId,
runId,
issueId,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reapOrphanedRuns();
expect(result.reaped).toBe(1);
expect(result.runIds).toEqual([runId]);
const lease = await db
.select()
.from(environmentLeases)
.where(eq(environmentLeases.id, leaseId))
.then((rows) => rows[0] ?? null);
expect(lease?.status).toBe("failed");
expect(lease?.releasedAt).toBeTruthy();
});
it.skipIf(process.platform === "win32")("reaps orphaned descendant process groups when the parent pid is already gone", async () => {
const orphan = await spawnOrphanedProcessGroup();
cleanupPids.add(orphan.descendantPid);
expect(isPidAlive(orphan.descendantPid)).toBe(true);
const { agentId, runId, issueId } = await seedRunFixture({
processPid: orphan.processPid,
processGroupId: orphan.processGroupId,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reapOrphanedRuns();
expect(result.reaped).toBe(1);
expect(result.runIds).toEqual([runId]);
expect(await waitForPidExit(orphan.descendantPid, 2_000)).toBe(true);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(2);
const failedRun = runs.find((row) => row.id === runId);
expect(failedRun?.status).toBe("failed");
expect(failedRun?.errorCode).toBe("process_lost");
expect(failedRun?.error).toContain("descendant process group");
const retryRun = runs.find((row) => row.id !== runId);
expect(retryRun?.status).toBe("queued");
const issue = await db
.select()
.from(issues)
.where(eq(issues.id, issueId))
.then((rows) => rows[0] ?? null);
expect(issue?.executionRunId).toBe(retryRun?.id ?? null);
});
it("blocks the issue when process-loss retry is exhausted and the immediate continuation recovery also fails", async () => {
mockAdapterExecute.mockRejectedValueOnce(new Error("continuation recovery failed"));
const { companyId, agentId, runId, issueId } = await seedRunFixture({
agentStatus: "idle",
processPid: 999_999_999,
processLossRetryCount: 1,
});
const resolvedBlockerId = randomUUID();
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(issues).values({
id: resolvedBlockerId,
companyId,
title: "Already completed prerequisite",
status: "done",
priority: "medium",
issueNumber: 2,
identifier: `${issuePrefix}-2`,
});
await db.insert(issueRelations).values({
companyId,
issueId: resolvedBlockerId,
relatedIssueId: issueId,
type: "blocks",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reapOrphanedRuns();
expect(result.reaped).toBe(1);
expect(result.runIds).toEqual([runId]);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(2);
expect(runs.find((row) => row.id === runId)?.status).toBe("failed");
const continuationRun = runs.find((row) => row.id !== runId);
expect(continuationRun?.contextSnapshot as Record<string, unknown> | undefined).toMatchObject({
retryReason: "issue_continuation_needed",
retryOfRunId: runId,
});
const blockedIssue = await waitForValue(async () =>
db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => {
const issue = rows[0] ?? null;
return issue?.status === "blocked" ? issue : null;
})
);
expect(blockedIssue?.status).toBe("blocked");
expect(blockedIssue?.executionRunId).toBeNull();
expect(blockedIssue?.checkoutRunId).toBeNull();
if (!continuationRun?.id) throw new Error("Expected continuation recovery run to exist");
const recovery = await expectStrandedRecoveryArtifacts({
companyId,
agentId,
issueId,
runId: continuationRun.id,
previousStatus: "in_progress",
retryReason: "issue_continuation_needed",
});
const blockerRelations = await db
.select()
.from(issueRelations)
.where(
and(
eq(issueRelations.companyId, companyId),
eq(issueRelations.relatedIssueId, issueId),
eq(issueRelations.type, "blocks"),
),
);
expect(blockerRelations.map((relation) => relation.issueId)).toEqual([recovery.id]);
const comments = await waitForValue(async () => {
const rows = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
return rows.length > 0 ? rows : null;
});
expect(comments).toHaveLength(1);
expect(comments[0]?.body).toContain("retried continuation");
expect(comments[0]?.body).toContain(`Recovery issue: [${recovery.identifier}]`);
});
it("blocks failed recovery work in place during immediate terminal-run cleanup", async () => {
const sourceIssueId = randomUUID();
const { companyId, agentId, runId, issueId } = await seedRunFixture({
agentStatus: "idle",
processPid: 999_999_999,
processLossRetryCount: 1,
runErrorCode: "process_lost",
runError: "Authorization: Bearer sk-test-recovery-secret",
});
await db
.update(issues)
.set({
title: "Recover stalled issue PAP-1",
originKind: "stranded_issue_recovery",
originId: sourceIssueId,
})
.where(eq(issues.id, issueId));
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(issues).values({
id: sourceIssueId,
companyId,
title: "Original stranded source",
status: "blocked",
priority: "medium",
issueNumber: 2,
identifier: `${issuePrefix}-2`,
});
await db.insert(issueRelations).values({
companyId,
issueId,
relatedIssueId: sourceIssueId,
type: "blocks",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reapOrphanedRuns();
expect(result.reaped).toBe(1);
expect(result.runIds).toEqual([runId]);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(1);
expect(runs[0]?.status).toBe("failed");
const recoveryIssue = await waitForValue(async () =>
db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => {
const issue = rows[0] ?? null;
return issue?.status === "blocked" ? issue : null;
})
);
expect(recoveryIssue?.assigneeAgentId).toBe(agentId);
expect(recoveryIssue?.originKind).toBe("stranded_issue_recovery");
expect(recoveryIssue?.originId).toBe(sourceIssueId);
expect(recoveryIssue?.executionRunId).toBeNull();
const nestedRecoveries = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, issueId)));
expect(nestedRecoveries).toHaveLength(0);
const comments = await waitForValue(async () => {
const rows = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
return rows.length > 0 ? rows : null;
});
expect(comments).toHaveLength(1);
expect(comments[0]?.body).toContain("stopped automatic stranded-work recovery");
expect(comments[0]?.body).toContain("recovery issues do not create nested `stranded_issue_recovery` issues");
expect(comments[0]?.body).toContain("Latest retry failure details were withheld from the issue thread");
expect(comments[0]?.body).not.toContain("sk-test-recovery-secret");
await expect(sourceBlockerIssueIds(companyId, sourceIssueId)).resolves.toEqual([issueId]);
});
it("does not block paused-tree work when immediate continuation recovery is suppressed by the hold", async () => {
const { companyId, agentId, runId, issueId } = await seedRunFixture({
agentStatus: "idle",
processPid: 999_999_999,
processLossRetryCount: 1,
});
await db.insert(issueTreeHolds).values({
companyId,
rootIssueId: issueId,
mode: "pause",
status: "active",
reason: "pause immediate recovery subtree",
releasePolicy: { strategy: "manual" },
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reapOrphanedRuns();
expect(result.reaped).toBe(1);
expect(result.runIds).toEqual([runId]);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(1);
expect(runs[0]?.status).toBe("failed");
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
expect(issue?.status).toBe("in_progress");
expect(issue?.executionRunId).toBeNull();
expect(issue?.checkoutRunId).toBe(runId);
const recoveryIssues = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery")));
expect(recoveryIssues).toHaveLength(0);
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
expect(comments).toHaveLength(0);
});
it("schedules a bounded retry for codex transient upstream failures instead of blocking the issue immediately", async () => {
mockAdapterExecute.mockResolvedValueOnce({
exitCode: 1,
signal: null,
timedOut: false,
errorCode: "adapter_failed",
errorFamily: "transient_upstream",
errorMessage:
"Error running remote compact task: We're currently experiencing high demand, which may cause temporary errors.",
provider: "openai",
model: "gpt-5.4",
resultJson: {
errorFamily: "transient_upstream",
},
});
const { agentId, runId, issueId } = await seedQueuedIssueRunFixture();
const heartbeat = heartbeatService(db);
await heartbeat.resumeQueuedRuns();
await waitForRunToSettle(heartbeat, runId);
const runs = await waitForValue(async () => {
const rows = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
return rows.length >= 2 ? rows : null;
});
expect(runs).toHaveLength(2);
const failedRun = runs?.find((row) => row.id === runId);
const retryRun = runs?.find((row) => row.id !== runId);
expect(failedRun?.status).toBe("failed");
expect(failedRun?.errorCode).toBe("adapter_failed");
expect((failedRun?.resultJson as Record<string, unknown> | null)?.errorFamily).toBe("transient_upstream");
expect(retryRun?.status).toBe("scheduled_retry");
expect(retryRun?.scheduledRetryReason).toBe("transient_failure");
expect((retryRun?.contextSnapshot as Record<string, unknown> | null)?.codexTransientFallbackMode).toBe("same_session");
const issue = await db
.select()
.from(issues)
.where(eq(issues.id, issueId))
.then((rows) => rows[0] ?? null);
expect(issue?.status).toBe("in_progress");
expect(issue?.executionRunId).toBe(retryRun?.id ?? null);
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
expect(comments).toHaveLength(0);
});
it("clears the detached warning when the run reports activity again", async () => {
const { runId } = await seedRunFixture({
includeIssue: false,
runErrorCode: "process_detached",
runError: "Lost in-memory process handle, but child pid 123 is still alive",
});
const heartbeat = heartbeatService(db);
const updated = await heartbeat.reportRunActivity(runId);
expect(updated?.errorCode).toBeNull();
expect(updated?.error).toBeNull();
const run = await heartbeat.getRun(runId);
expect(run?.errorCode).toBeNull();
expect(run?.error).toBeNull();
});
it("tracks the first heartbeat with the agent role instead of adapter type", async () => {
const { agentId, runId } = await seedRunFixture({
agentStatus: "running",
includeIssue: false,
});
const heartbeat = heartbeatService(db);
await heartbeat.cancelRun(runId);
expect(mockTrackAgentFirstHeartbeat).toHaveBeenCalledWith(
mockTelemetryClient,
expect.objectContaining({
agentRole: "engineer",
agentId,
}),
);
});
it("records manual cancellation stop metadata", async () => {
const { runId } = await seedRunFixture({
agentStatus: "running",
includeIssue: false,
});
const heartbeat = heartbeatService(db);
const cancelled = await heartbeat.cancelRun(runId);
expect(cancelled?.status).toBe("cancelled");
expect(cancelled?.resultJson).toMatchObject({
stopReason: "cancelled",
effectiveTimeoutSec: 0,
timeoutConfigured: false,
timeoutFired: false,
});
});
it("dispatches assigned todo work with no prior run as a normal assignment wake", async () => {
const { companyId, agentId, issueId } = await seedAssignedTodoNoRunFixture();
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.assignmentDispatched).toBe(1);
expect(result.dispatchRequeued).toBe(0);
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(0);
expect(result.issueIds).toEqual([issueId]);
const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId));
expect(wakeups).toHaveLength(1);
expect(wakeups[0]).toMatchObject({
companyId,
agentId,
source: "assignment",
triggerDetail: "system",
reason: "issue_assigned",
payload: expect.objectContaining({
issueId,
mutation: "assigned_todo_liveness_dispatch",
}),
});
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(1);
expect(runs[0]?.retryOfRunId).toBeNull();
expect(runs[0]?.contextSnapshot).toMatchObject({
issueId,
taskId: issueId,
wakeReason: "issue_assigned",
source: "issue.assigned_todo_liveness_dispatch",
});
expect((runs[0]?.contextSnapshot as Record<string, unknown>)?.retryReason).toBeUndefined();
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
expect(issue?.status).toBe("todo");
const recoveryIssues = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery")));
expect(recoveryIssues).toHaveLength(0);
await expect(sourceBlockerIssueIds(companyId, issueId)).resolves.toEqual([]);
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
expect(comments).toHaveLength(0);
if (runs[0]?.id) {
await waitForRunToSettle(heartbeat, runs[0].id);
}
});
it("does not duplicate initial assigned todo dispatch when a queued wake already exists", async () => {
const { companyId, agentId, issueId } = await seedAssignedTodoNoRunFixture();
await db.insert(agentWakeupRequests).values({
companyId,
agentId,
source: "assignment",
triggerDetail: "system",
reason: "issue_assigned",
payload: { issueId, mutation: "assigned_todo_liveness_dispatch" },
status: "queued",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.assignmentDispatched).toBe(0);
expect(result.dispatchRequeued).toBe(0);
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(0);
expect(result.skipped).toBe(1);
expect(result.issueIds).toEqual([]);
const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId));
expect(wakeups).toHaveLength(1);
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(0);
});
it("skips budget-blocked assigned todo work with no prior run and continues the sweep", async () => {
const blocked = await seedAssignedTodoNoRunFixture();
const unblocked = await seedAssignedTodoNoRunFixture();
await db.insert(budgetPolicies).values({
companyId: blocked.companyId,
scopeType: "agent",
scopeId: blocked.agentId,
metric: "billed_cents",
windowKind: "calendar_month_utc",
amount: 1,
hardStopEnabled: true,
isActive: true,
});
await db.insert(costEvents).values({
companyId: blocked.companyId,
agentId: blocked.agentId,
issueId: blocked.issueId,
provider: "test",
biller: "test",
billingType: "tokens",
model: "test-model",
costCents: 1,
occurredAt: new Date(),
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.assignmentDispatched).toBe(1);
expect(result.dispatchRequeued).toBe(0);
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(0);
expect(result.skipped).toBe(1);
expect(result.issueIds).toEqual([unblocked.issueId]);
const blockedWakeups = await db
.select()
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.agentId, blocked.agentId));
expect(blockedWakeups).toHaveLength(0);
const blockedRuns = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, blocked.agentId));
expect(blockedRuns).toHaveLength(0);
const blockedIssue = await db
.select()
.from(issues)
.where(eq(issues.id, blocked.issueId))
.then((rows) => rows[0] ?? null);
expect(blockedIssue?.status).toBe("todo");
const unblockedWakeups = await db
.select()
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.agentId, unblocked.agentId));
expect(unblockedWakeups).toHaveLength(1);
expect(unblockedWakeups[0]).toMatchObject({
reason: "issue_assigned",
payload: expect.objectContaining({
issueId: unblocked.issueId,
mutation: "assigned_todo_liveness_dispatch",
}),
});
const unblockedRuns = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, unblocked.agentId));
expect(unblockedRuns).toHaveLength(1);
if (unblockedRuns[0]?.id) {
await waitForRunToSettle(heartbeat, unblockedRuns[0].id);
}
});
it("does not dispatch assigned todo work with no prior run when the agent is paused", async () => {
const { agentId, issueId } = await seedAssignedTodoNoRunFixture({ agentStatus: "paused" });
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.assignmentDispatched).toBe(0);
expect(result.dispatchRequeued).toBe(0);
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(0);
expect(result.skipped).toBe(1);
expect(result.issueIds).toEqual([]);
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
expect(issue?.status).toBe("todo");
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(0);
});
it("re-enqueues assigned todo work when the last issue run died and no wake remains", async () => {
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "todo",
runStatus: "failed",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.assignmentDispatched).toBe(0);
expect(result.dispatchRequeued).toBe(1);
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(0);
expect(result.issueIds).toEqual([issueId]);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(2);
const retryRun = runs.find((row) => row.id !== runId);
expect(retryRun?.id).toBeTruthy();
expect((retryRun?.contextSnapshot as Record<string, unknown>)?.retryReason).toBe("assignment_recovery");
if (retryRun) {
await waitForRunToSettle(heartbeat, retryRun.id);
}
});
it.each([
["failed", "adapter_failed"],
["failed", "process_lost"],
["timed_out", "adapter_timed_out"],
] as const)(
"re-enqueues stranded in-progress work after a %s/%s run before escalating",
async (runStatus, runErrorCode) => {
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus,
runErrorCode,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.dispatchRequeued).toBe(0);
expect(result.continuationRequeued).toBe(1);
expect(result.escalated).toBe(0);
expect(result.issueIds).toEqual([issueId]);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(2);
const retryRun = runs.find((row) => row.id !== runId);
expect(retryRun?.contextSnapshot as Record<string, unknown> | undefined).toMatchObject({
issueId,
taskId: issueId,
retryReason: "issue_continuation_needed",
retryOfRunId: runId,
source: "issue.continuation_recovery",
});
const recoveries = await db
.select()
.from(issues)
.where(
and(
eq(issues.companyId, companyId),
eq(issues.originKind, "stranded_issue_recovery"),
eq(issues.originId, issueId),
),
);
expect(recoveries).toHaveLength(0);
if (retryRun?.id) {
await waitForRunToSettle(heartbeat, retryRun.id);
}
},
);
it("still re-enqueues stranded assigned todo recovery when an old queued wake exists", async () => {
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "todo",
runStatus: "failed",
});
await db.insert(agentWakeupRequests).values({
companyId,
agentId,
source: "assignment",
triggerDetail: "system",
reason: "issue_assigned",
payload: { issueId },
status: "queued",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.assignmentDispatched).toBe(0);
expect(result.dispatchRequeued).toBe(1);
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(0);
expect(result.issueIds).toEqual([issueId]);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(2);
const retryRun = runs.find((row) => row.id !== runId);
expect((retryRun?.contextSnapshot as Record<string, unknown>)?.retryReason).toBe("assignment_recovery");
if (retryRun) {
await waitForRunToSettle(heartbeat, retryRun.id);
}
});
it("blocks assigned todo work after the one automatic dispatch recovery was already used", async () => {
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "todo",
runStatus: "failed",
retryReason: "assignment_recovery",
runErrorCode: "process_lost",
runError: "Authorization: Bearer sk-test-recovery-secret",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.dispatchRequeued).toBe(0);
expect(result.escalated).toBe(1);
expect(result.issueIds).toEqual([issueId]);
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
expect(issue?.status).toBe("blocked");
const recovery = await expectStrandedRecoveryArtifacts({
companyId,
agentId,
issueId,
runId,
previousStatus: "todo",
retryReason: "assignment_recovery",
});
expect(recovery.description ?? "").not.toContain("sk-test-recovery-secret");
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
expect(comments).toHaveLength(1);
expect(comments[0]?.body).toContain("retried dispatch");
expect(comments[0]?.body).toContain("Latest retry failure details were withheld from the issue thread");
expect(comments[0]?.body).toContain(`Recovery issue: [${recovery.identifier}]`);
});
it("assigns open unassigned blockers back to their creator agent", async () => {
const companyId = randomUUID();
const creatorAgentId = randomUUID();
const blockedAssigneeAgentId = randomUUID();
const blockerIssueId = randomUUID();
const blockedIssueId = randomUUID();
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(companies).values({
id: companyId,
name: "Paperclip",
issuePrefix,
requireBoardApprovalForNewAgents: false,
});
await db.insert(agents).values([
{
id: creatorAgentId,
companyId,
name: "SecurityEngineer",
role: "engineer",
status: "idle",
adapterType: "codex_local",
adapterConfig: {},
runtimeConfig: {},
permissions: {},
},
{
id: blockedAssigneeAgentId,
companyId,
name: "CodexCoder",
role: "engineer",
status: "idle",
adapterType: "codex_local",
adapterConfig: {},
runtimeConfig: {},
permissions: {},
},
]);
await db.insert(issues).values([
{
id: blockerIssueId,
companyId,
title: "Fix blocker",
status: "todo",
priority: "high",
createdByAgentId: creatorAgentId,
issueNumber: 1,
identifier: `${issuePrefix}-1`,
},
{
id: blockedIssueId,
companyId,
title: "Blocked work",
status: "blocked",
priority: "high",
assigneeAgentId: blockedAssigneeAgentId,
issueNumber: 2,
identifier: `${issuePrefix}-2`,
},
]);
await db.insert(issueRelations).values({
companyId,
issueId: blockerIssueId,
relatedIssueId: blockedIssueId,
type: "blocks",
createdByAgentId: creatorAgentId,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.orphanBlockersAssigned).toBe(1);
expect(result.issueIds).toContain(blockerIssueId);
const blocker = await db
.select()
.from(issues)
.where(eq(issues.id, blockerIssueId))
.then((rows) => rows[0] ?? null);
expect(blocker?.assigneeAgentId).toBe(creatorAgentId);
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, blockerIssueId));
expect(comments[0]?.body).toContain("Assigned Orphan Blocker");
expect(comments[0]?.body).toContain(`[${issuePrefix}-2](/${issuePrefix}/issues/${issuePrefix}-2)`);
const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, creatorAgentId));
expect(wakeups).toEqual([
expect.objectContaining({
reason: "issue_assigned",
payload: expect.objectContaining({
issueId: blockerIssueId,
mutation: "unassigned_blocker_recovery",
}),
}),
]);
const runId = wakeups[0]?.runId;
if (runId) {
await waitForRunToSettle(heartbeat, runId);
}
});
it("re-enqueues continuation for stranded in-progress work with no active run", async () => {
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus: "failed",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.dispatchRequeued).toBe(0);
expect(result.continuationRequeued).toBe(1);
expect(result.escalated).toBe(0);
expect(result.issueIds).toEqual([issueId]);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(2);
const retryRun = runs.find((row) => row.id !== runId);
expect(retryRun?.id).toBeTruthy();
expect((retryRun?.contextSnapshot as Record<string, unknown>)?.retryReason).toBe("issue_continuation_needed");
if (retryRun) {
await waitForRunToSettle(heartbeat, retryRun.id);
}
});
it("does not continue seeded in-progress work that has no run linkage", async () => {
const companyId = randomUUID();
const agentId = randomUUID();
const issueId = randomUUID();
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(companies).values({
id: companyId,
name: "Paperclip",
issuePrefix,
requireBoardApprovalForNewAgents: false,
});
await db.insert(agents).values({
id: agentId,
companyId,
name: "CodexCoder",
role: "engineer",
status: "idle",
adapterType: "codex_local",
adapterConfig: {},
runtimeConfig: {},
permissions: {},
});
await db.insert(issues).values({
id: issueId,
companyId,
title: "Seeded in-flight work",
status: "in_progress",
priority: "medium",
assigneeAgentId: agentId,
checkoutRunId: null,
executionRunId: null,
issueNumber: 1,
identifier: `${issuePrefix}-1`,
startedAt: new Date("2026-03-19T00:00:00.000Z"),
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.dispatchRequeued).toBe(0);
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(0);
expect(result.skipped).toBe(1);
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(0);
const [issue] = await db.select().from(issues).where(eq(issues.id, issueId));
expect(issue?.status).toBe("in_progress");
expect(issue?.executionRunId).toBeNull();
});
it("classifies actionable plan-only recovery and enqueues one liveness continuation", async () => {
mockAdapterExecute.mockResolvedValueOnce({
exitCode: 0,
signal: null,
timedOut: false,
errorMessage: null,
summary: "I will inspect the repo next and then implement the fix.",
provider: "test",
model: "test-model",
});
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus: "failed",
});
const heartbeat = heartbeatService(db);
await heartbeat.reconcileStrandedAssignedIssues();
const livenessWake = await waitForValue(async () => {
const rows = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId));
return rows.find((row) => row.reason === "run_liveness_continuation") ?? null;
});
expect(livenessWake).toBeTruthy();
expect(livenessWake?.payload).toMatchObject({
issueId,
livenessState: "plan_only",
continuationAttempt: 1,
});
const sourceRunId = (livenessWake?.payload as Record<string, unknown> | null)?.sourceRunId;
expect(sourceRunId).toBeTruthy();
const sourceRun = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, String(sourceRunId)))
.then((rows) => rows[0] ?? null);
if (sourceRun?.id) {
await waitForRunToSettle(heartbeat, sourceRun.id, 5_000);
}
expect(sourceRun?.id).not.toBe(runId);
expect(sourceRun?.livenessState).toBe("plan_only");
});
it("treats a plan document update as progress and does not enqueue liveness continuation", async () => {
const { agentId, companyId, issueId, runId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus: "failed",
});
mockAdapterExecute.mockImplementationOnce(async (ctx: { runId: string }) => {
const documentId = randomUUID();
const revisionId = randomUUID();
await db.insert(documents).values({
id: documentId,
companyId,
title: "Plan",
format: "markdown",
latestBody: "# Plan\n\n- Inspect files\n- Implement fix",
latestRevisionId: revisionId,
latestRevisionNumber: 1,
createdByAgentId: agentId,
updatedByAgentId: agentId,
});
await db.insert(documentRevisions).values({
id: revisionId,
companyId,
documentId,
revisionNumber: 1,
title: "Plan",
format: "markdown",
body: "# Plan\n\n- Inspect files\n- Implement fix",
createdByAgentId: agentId,
createdByRunId: ctx.runId,
});
await db.insert(issueDocuments).values({
companyId,
issueId,
documentId,
key: "plan",
});
return {
exitCode: 0,
signal: null,
timedOut: false,
errorMessage: null,
summary: "Plan:\n- Inspect files\n- Implement fix",
provider: "test",
model: "test-model",
};
});
const heartbeat = heartbeatService(db);
await heartbeat.reconcileStrandedAssignedIssues();
const retryRun = await waitForValue(async () => {
const rows = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
return rows.find((row) => row.id !== runId && row.livenessState === "advanced") ?? null;
}, 5_000);
if (retryRun?.id) {
await waitForRunToSettle(heartbeat, retryRun.id, 5_000);
}
expect(retryRun?.livenessState).toBe("advanced");
const wakes = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId));
expect(wakes.some((row) => row.reason === "run_liveness_continuation")).toBe(false);
});
it("blocks stranded in-progress work after the continuation retry was already used", async () => {
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus: "failed",
retryReason: "issue_continuation_needed",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(1);
expect(result.issueIds).toEqual([issueId]);
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
expect(issue?.status).toBe("blocked");
const recovery = await expectStrandedRecoveryArtifacts({
companyId,
agentId,
issueId,
runId,
previousStatus: "in_progress",
retryReason: "issue_continuation_needed",
});
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
expect(comments).toHaveLength(1);
expect(comments[0]?.body).toContain("retried continuation");
expect(comments[0]?.body).toContain("Latest retry failure details were withheld from the issue thread");
expect(comments[0]?.body).toContain(`Recovery issue: [${recovery.identifier}]`);
});
it("redacts error-code-only stranded recovery failures in issue copy", async () => {
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus: "failed",
retryReason: "issue_continuation_needed",
runErrorCode: "adapter_exit_code",
runError: null,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.escalated).toBe(1);
const recovery = await expectStrandedRecoveryArtifacts({
companyId,
agentId,
issueId,
runId,
previousStatus: "in_progress",
retryReason: "issue_continuation_needed",
});
expect(recovery.description).toContain("Latest retry failure details were withheld from the issue thread");
expect(recovery.description).not.toContain("- Failure: none recorded");
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
expect(comments).toHaveLength(1);
expect(comments[0]?.body).toContain("Latest retry failure details were withheld from the issue thread");
expect(comments[0]?.body).not.toContain("- Failure: none recorded");
});
it("reuses the raced stranded recovery issue when duplicate active recovery creation conflicts", async () => {
const { companyId, issueId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus: "failed",
retryReason: "issue_continuation_needed",
});
const heartbeat = heartbeatService(db);
const results = await Promise.allSettled(
Array.from({ length: 8 }, () => heartbeat.reconcileStrandedAssignedIssues()),
);
expect(results.every((result) => result.status === "fulfilled")).toBe(true);
const recoveries = await db
.select()
.from(issues)
.where(and(
eq(issues.companyId, companyId),
eq(issues.originKind, "stranded_issue_recovery"),
eq(issues.originId, issueId),
));
expect(recoveries).toHaveLength(1);
await expect(sourceBlockerIssueIds(companyId, issueId)).resolves.toEqual([recoveries[0]?.id]);
});
it("blocks stranded recovery issues in place instead of creating nested recovery issues", async () => {
const sourceIssueId = randomUUID();
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus: "failed",
});
await db
.update(issues)
.set({
title: "Recover stalled issue PAP-1",
originKind: "stranded_issue_recovery",
originId: sourceIssueId,
})
.where(eq(issues.id, issueId));
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(issues).values({
id: sourceIssueId,
companyId,
title: "Original stranded source",
status: "blocked",
priority: "medium",
issueNumber: 2,
identifier: `${issuePrefix}-2`,
});
await db.insert(issueRelations).values({
companyId,
issueId,
relatedIssueId: sourceIssueId,
type: "blocks",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.dispatchRequeued).toBe(0);
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(1);
expect(result.issueIds).toEqual([issueId]);
const recoveryIssue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
expect(recoveryIssue?.status).toBe("blocked");
expect(recoveryIssue?.assigneeAgentId).toBe(agentId);
expect(recoveryIssue?.originKind).toBe("stranded_issue_recovery");
expect(recoveryIssue?.originId).toBe(sourceIssueId);
const nestedRecoveries = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, issueId)));
expect(nestedRecoveries).toHaveLength(0);
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
expect(runs).toHaveLength(1);
expect(runs[0]?.id).toBe(runId);
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
expect(comments).toHaveLength(1);
expect(comments[0]?.body).toContain("stopped automatic stranded-work recovery");
expect(comments[0]?.body).toContain("Latest retry failure details were withheld from the issue thread");
expect(comments[0]?.body).toContain("recovery issues do not create nested `stranded_issue_recovery` issues");
await expect(sourceBlockerIssueIds(companyId, sourceIssueId)).resolves.toEqual([issueId]);
});
it("keeps repeated recovery failures on the same canonical recovery issue", async () => {
const sourceIssueId = randomUUID();
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus: "failed",
});
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(issues).values({
id: sourceIssueId,
companyId,
title: "Original stranded source",
status: "blocked",
priority: "medium",
issueNumber: 2,
identifier: `${issuePrefix}-2`,
});
await db
.update(issues)
.set({
title: "Recover stalled issue PAP-1",
originKind: "stranded_issue_recovery",
originId: sourceIssueId,
})
.where(eq(issues.id, issueId));
await db.insert(issueRelations).values({
companyId,
issueId,
relatedIssueId: sourceIssueId,
type: "blocks",
});
const heartbeat = heartbeatService(db);
const firstResult = await heartbeat.reconcileStrandedAssignedIssues();
expect(firstResult.escalated).toBe(1);
expect(firstResult.issueIds).toEqual([issueId]);
const secondRunId = randomUUID();
await db.insert(heartbeatRuns).values({
id: secondRunId,
companyId,
agentId,
invocationSource: "assignment",
triggerDetail: "system",
status: "failed",
contextSnapshot: {
issueId,
taskId: issueId,
wakeReason: "issue_assigned",
source: "stranded_issue_recovery",
},
startedAt: new Date("2030-03-19T00:10:00.000Z"),
finishedAt: new Date("2030-03-19T00:15:00.000Z"),
createdAt: new Date("2030-03-19T00:10:00.000Z"),
updatedAt: new Date("2030-03-19T00:15:00.000Z"),
errorCode: "adapter_failed",
error: "adapter failed while retrying recovery issue",
});
await db
.update(issues)
.set({
status: "in_progress",
checkoutRunId: secondRunId,
executionRunId: null,
})
.where(eq(issues.id, issueId));
const secondResult = await heartbeat.reconcileStrandedAssignedIssues();
expect(secondResult.dispatchRequeued).toBe(0);
expect(secondResult.continuationRequeued).toBe(0);
expect(secondResult.escalated).toBe(1);
expect(secondResult.issueIds).toEqual([issueId]);
const recoveryIssuesForSource = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, sourceIssueId)));
expect(recoveryIssuesForSource.map((issue) => issue.id)).toEqual([issueId]);
const nestedRecoveries = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"), eq(issues.originId, issueId)));
expect(nestedRecoveries).toHaveLength(0);
await expect(sourceBlockerIssueIds(companyId, sourceIssueId)).resolves.toEqual([issueId]);
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
expect(comments).toHaveLength(2);
expect(comments[1]?.body).toContain("Latest retry failure details were withheld from the issue thread");
});
it("does not escalate paused-tree recovery when the automatic continuation retry was cancelled by the hold", async () => {
const { companyId, agentId, issueId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus: "cancelled",
retryReason: "issue_continuation_needed",
activePauseHold: true,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.dispatchRequeued).toBe(0);
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(0);
expect(result.skipped).toBe(1);
expect(result.issueIds).toEqual([]);
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
expect(issue?.status).toBe("in_progress");
expect(issue?.checkoutRunId).toBeTruthy();
const recoveryIssues = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery")));
expect(recoveryIssues).toHaveLength(0);
const blockerRelations = await db
.select()
.from(issueRelations)
.where(
and(
eq(issueRelations.companyId, companyId),
eq(issueRelations.relatedIssueId, issueId),
eq(issueRelations.type, "blocks"),
),
);
expect(blockerRelations).toHaveLength(0);
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
expect(comments).toHaveLength(0);
const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId));
expect(wakeups).toHaveLength(1);
});
it("records productive continuation instead of recovery when the latest automatic continuation succeeded", async () => {
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
status: "in_progress",
runStatus: "succeeded",
retryReason: "issue_continuation_needed",
livenessState: "advanced",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.continuationRequeued).toBe(0);
expect(result.productiveContinuationObserved).toBe(1);
expect(result.successfulContinuationObserved).toBe(0);
expect(result.escalated).toBe(0);
expect(result.issueIds).toEqual([]);
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
expect(issue?.status).toBe("in_progress");
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
expect(comments).toHaveLength(0);
const runs = await db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.agentId, agentId));
expect(runs.map((row) => row.id)).toEqual([runId]);
const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId));
expect(wakeups).toHaveLength(1);
});
it("does not reconcile user-assigned work through the agent stranded-work recovery path", async () => {
const { issueId, runId } = await seedStrandedIssueFixture({
status: "todo",
runStatus: "failed",
assignToUser: true,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.reconcileStrandedAssignedIssues();
expect(result.dispatchRequeued).toBe(0);
expect(result.continuationRequeued).toBe(0);
expect(result.escalated).toBe(0);
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
expect(issue?.status).toBe("todo");
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, runId));
expect(runs).toHaveLength(1);
});
});