From 68c37660f0fad7c92f58af6b7e6b6180c188f66f Mon Sep 17 00:00:00 2001 From: Dotta <34892728+cryppadotta@users.noreply.github.com> Date: Mon, 27 Apr 2026 20:02:44 -0500 Subject: [PATCH] Dispatch assigned todo work during recovery sweeps (#4614) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Thinking Path > - Paperclip orchestrates AI agents for autonomous companies. > - Agent assignments must reliably turn into heartbeat work without board operators manually nudging stuck tasks. > - The stranded-assignment recovery sweep already handles failed or lost runs. > - But assigned `todo` issues with no prior run could sit idle because there was nothing to retry or recover. > - This pull request dispatches those never-started assigned todos as normal assignment wakes. > - The benefit is that recovery fixes missed initial dispatches without creating unnecessary recovery issues. ## What Changed - Added an initial assigned-todo dispatch path to the recovery service when an assigned `todo` issue has no heartbeat run yet. - Reused invocation budget hard-stop checks before dispatching or requeueing recovery work. - Counted `assignmentDispatched` in startup/scheduled recovery logs. - Added heartbeat recovery regressions for first dispatch, duplicate queued wake prevention, budget-blocked skips, and paused-agent skips. ## Verification - `pnpm exec vitest run server/src/__tests__/heartbeat-process-recovery.test.ts` ## Risks - Low to medium risk: this changes liveness recovery behavior for assigned `todo` issues, but it stays on the existing assignment wake path and skips paused or budget-blocked agents. - No migrations. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex coding agent based on GPT-5, tool-enabled local repository and shell access, Paperclip heartbeat context. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --- .../heartbeat-process-recovery.test.ts | 253 ++++++++++++++++++ server/src/index.ts | 2 + server/src/services/recovery/service.ts | 77 +++++- 3 files changed, 331 insertions(+), 1 deletion(-) diff --git a/server/src/__tests__/heartbeat-process-recovery.test.ts b/server/src/__tests__/heartbeat-process-recovery.test.ts index 0723eb4e..211f119a 100644 --- a/server/src/__tests__/heartbeat-process-recovery.test.ts +++ b/server/src/__tests__/heartbeat-process-recovery.test.ts @@ -7,8 +7,10 @@ import { agents, agentRuntimeState, agentWakeupRequests, + budgetPolicies, companySkills, companies, + costEvents, createDb, documentRevisions, documents, @@ -306,6 +308,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { await db.delete(activityLog); await db.delete(agentRuntimeState); await db.delete(companySkills); + await db.delete(costEvents); await db.delete(issueComments); await db.delete(issueDocuments); await db.delete(documentRevisions); @@ -336,6 +339,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { } } await db.delete(agentWakeupRequests); + await db.delete(budgetPolicies); for (let attempt = 0; attempt < 5; attempt += 1) { await db.delete(agentRuntimeState); try { @@ -586,6 +590,48 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { return { companyId, agentId, runId, wakeupRequestId, issueId, rootIssueId }; } + async function seedAssignedTodoNoRunFixture(input?: { + agentStatus?: "paused" | "idle" | "running"; + }) { + const companyId = randomUUID(); + const agentId = randomUUID(); + const issueId = randomUUID(); + const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; + + await db.insert(companies).values({ + id: companyId, + name: "Paperclip", + issuePrefix, + requireBoardApprovalForNewAgents: false, + }); + + await db.insert(agents).values({ + id: agentId, + companyId, + name: "CodexCoder", + role: "engineer", + status: input?.agentStatus ?? "idle", + adapterType: "codex_local", + adapterConfig: {}, + runtimeConfig: {}, + permissions: {}, + }); + + await db.insert(issues).values({ + id: issueId, + companyId, + title: "Assigned todo work that never received a heartbeat", + status: "todo", + priority: "medium", + assigneeAgentId: agentId, + assigneeUserId: null, + issueNumber: 1, + identifier: `${issuePrefix}-1`, + }); + + return { companyId, agentId, issueId }; + } + async function expectStrandedRecoveryArtifacts(input: { companyId: string; agentId: string; @@ -1173,6 +1219,176 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { }); }); + it("dispatches assigned todo work with no prior run as a normal assignment wake", async () => { + const { companyId, agentId, issueId } = await seedAssignedTodoNoRunFixture(); + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.assignmentDispatched).toBe(1); + expect(result.dispatchRequeued).toBe(0); + expect(result.continuationRequeued).toBe(0); + expect(result.escalated).toBe(0); + expect(result.issueIds).toEqual([issueId]); + + const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); + expect(wakeups).toHaveLength(1); + expect(wakeups[0]).toMatchObject({ + companyId, + agentId, + source: "assignment", + triggerDetail: "system", + reason: "issue_assigned", + payload: expect.objectContaining({ + issueId, + mutation: "assigned_todo_liveness_dispatch", + }), + }); + + const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); + expect(runs).toHaveLength(1); + expect(runs[0]?.retryOfRunId).toBeNull(); + expect(runs[0]?.contextSnapshot).toMatchObject({ + issueId, + taskId: issueId, + wakeReason: "issue_assigned", + source: "issue.assigned_todo_liveness_dispatch", + }); + expect((runs[0]?.contextSnapshot as Record)?.retryReason).toBeUndefined(); + + const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); + expect(issue?.status).toBe("todo"); + + const recoveryIssues = await db + .select() + .from(issues) + .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"))); + expect(recoveryIssues).toHaveLength(0); + await expect(sourceBlockerIssueIds(companyId, issueId)).resolves.toEqual([]); + + const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); + expect(comments).toHaveLength(0); + + if (runs[0]?.id) { + await waitForRunToSettle(heartbeat, runs[0].id); + } + }); + + it("does not duplicate initial assigned todo dispatch when a queued wake already exists", async () => { + const { companyId, agentId, issueId } = await seedAssignedTodoNoRunFixture(); + await db.insert(agentWakeupRequests).values({ + companyId, + agentId, + source: "assignment", + triggerDetail: "system", + reason: "issue_assigned", + payload: { issueId, mutation: "assigned_todo_liveness_dispatch" }, + status: "queued", + }); + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.assignmentDispatched).toBe(0); + expect(result.dispatchRequeued).toBe(0); + expect(result.continuationRequeued).toBe(0); + expect(result.escalated).toBe(0); + expect(result.skipped).toBe(1); + expect(result.issueIds).toEqual([]); + + const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); + expect(wakeups).toHaveLength(1); + const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); + expect(runs).toHaveLength(0); + }); + + it("skips budget-blocked assigned todo work with no prior run and continues the sweep", async () => { + const blocked = await seedAssignedTodoNoRunFixture(); + const unblocked = await seedAssignedTodoNoRunFixture(); + await db.insert(budgetPolicies).values({ + companyId: blocked.companyId, + scopeType: "agent", + scopeId: blocked.agentId, + metric: "billed_cents", + windowKind: "calendar_month_utc", + amount: 1, + hardStopEnabled: true, + isActive: true, + }); + await db.insert(costEvents).values({ + companyId: blocked.companyId, + agentId: blocked.agentId, + issueId: blocked.issueId, + provider: "test", + biller: "test", + billingType: "tokens", + model: "test-model", + costCents: 1, + occurredAt: new Date(), + }); + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.assignmentDispatched).toBe(1); + expect(result.dispatchRequeued).toBe(0); + expect(result.continuationRequeued).toBe(0); + expect(result.escalated).toBe(0); + expect(result.skipped).toBe(1); + expect(result.issueIds).toEqual([unblocked.issueId]); + + const blockedWakeups = await db + .select() + .from(agentWakeupRequests) + .where(eq(agentWakeupRequests.agentId, blocked.agentId)); + expect(blockedWakeups).toHaveLength(0); + const blockedRuns = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, blocked.agentId)); + expect(blockedRuns).toHaveLength(0); + + const blockedIssue = await db + .select() + .from(issues) + .where(eq(issues.id, blocked.issueId)) + .then((rows) => rows[0] ?? null); + expect(blockedIssue?.status).toBe("todo"); + + const unblockedWakeups = await db + .select() + .from(agentWakeupRequests) + .where(eq(agentWakeupRequests.agentId, unblocked.agentId)); + expect(unblockedWakeups).toHaveLength(1); + expect(unblockedWakeups[0]).toMatchObject({ + reason: "issue_assigned", + payload: expect.objectContaining({ + issueId: unblocked.issueId, + mutation: "assigned_todo_liveness_dispatch", + }), + }); + const unblockedRuns = await db + .select() + .from(heartbeatRuns) + .where(eq(heartbeatRuns.agentId, unblocked.agentId)); + expect(unblockedRuns).toHaveLength(1); + if (unblockedRuns[0]?.id) { + await waitForRunToSettle(heartbeat, unblockedRuns[0].id); + } + }); + + it("does not dispatch assigned todo work with no prior run when the agent is paused", async () => { + const { agentId, issueId } = await seedAssignedTodoNoRunFixture({ agentStatus: "paused" }); + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.assignmentDispatched).toBe(0); + expect(result.dispatchRequeued).toBe(0); + expect(result.continuationRequeued).toBe(0); + expect(result.escalated).toBe(0); + expect(result.skipped).toBe(1); + expect(result.issueIds).toEqual([]); + + const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); + expect(issue?.status).toBe("todo"); + const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); + expect(runs).toHaveLength(0); + }); + it("re-enqueues assigned todo work when the last issue run died and no wake remains", async () => { const { agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "todo", @@ -1181,6 +1397,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.assignmentDispatched).toBe(0); expect(result.dispatchRequeued).toBe(1); expect(result.continuationRequeued).toBe(0); expect(result.escalated).toBe(0); @@ -1200,6 +1417,42 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { } }); + it("still re-enqueues stranded assigned todo recovery when an old queued wake exists", async () => { + const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ + status: "todo", + runStatus: "failed", + }); + await db.insert(agentWakeupRequests).values({ + companyId, + agentId, + source: "assignment", + triggerDetail: "system", + reason: "issue_assigned", + payload: { issueId }, + status: "queued", + }); + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.assignmentDispatched).toBe(0); + expect(result.dispatchRequeued).toBe(1); + expect(result.continuationRequeued).toBe(0); + expect(result.escalated).toBe(0); + expect(result.issueIds).toEqual([issueId]); + + const runs = await db + .select() + .from(heartbeatRuns) + .where(eq(heartbeatRuns.agentId, agentId)); + expect(runs).toHaveLength(2); + + const retryRun = runs.find((row) => row.id !== runId); + expect((retryRun?.contextSnapshot as Record)?.retryReason).toBe("assignment_recovery"); + if (retryRun) { + await waitForRunToSettle(heartbeat, retryRun.id); + } + }); + it("blocks assigned todo work after the one automatic dispatch recovery was already used", async () => { const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "todo", diff --git a/server/src/index.ts b/server/src/index.ts index 35e2ca5f..a3b7ddce 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -682,6 +682,7 @@ export async function startServer(): Promise { const reconciled = await heartbeat.reconcileStrandedAssignedIssues(); if ( promotion.promoted > 0 || + reconciled.assignmentDispatched > 0 || reconciled.dispatchRequeued > 0 || reconciled.continuationRequeued > 0 || reconciled.escalated > 0 @@ -740,6 +741,7 @@ export async function startServer(): Promise { const reconciled = await heartbeat.reconcileStrandedAssignedIssues(); if ( promotion.promoted > 0 || + reconciled.assignmentDispatched > 0 || reconciled.dispatchRequeued > 0 || reconciled.continuationRequeued > 0 || reconciled.escalated > 0 diff --git a/server/src/services/recovery/service.ts b/server/src/services/recovery/service.ts index 0f435d3b..40d12555 100644 --- a/server/src/services/recovery/service.ts +++ b/server/src/services/recovery/service.ts @@ -343,6 +343,21 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) return Boolean(run || deferredWake); } + async function hasQueuedIssueWake(companyId: string, issueId: string) { + return db + .select({ id: agentWakeupRequests.id }) + .from(agentWakeupRequests) + .where( + and( + eq(agentWakeupRequests.companyId, companyId), + eq(agentWakeupRequests.status, "queued"), + sql`${agentWakeupRequests.payload} ->> 'issueId' = ${issueId}`, + ), + ) + .limit(1) + .then((rows) => Boolean(rows[0])); + } + async function enqueueStrandedIssueRecovery(input: { issueId: string; agentId: string; @@ -386,6 +401,34 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) return queued; } + async function enqueueInitialAssignedTodoDispatch(issue: typeof issues.$inferSelect, agentId: string) { + return deps.enqueueWakeup(agentId, { + source: "assignment", + triggerDetail: "system", + reason: "issue_assigned", + payload: { + issueId: issue.id, + mutation: "assigned_todo_liveness_dispatch", + }, + requestedByActorType: "system", + requestedByActorId: null, + contextSnapshot: { + issueId: issue.id, + taskId: issue.id, + wakeReason: "issue_assigned", + source: "issue.assigned_todo_liveness_dispatch", + }, + }); + } + + async function isInvocationBudgetBlocked(issue: typeof issues.$inferSelect, agentId: string) { + const budgetBlock = await budgets.getInvocationBlock(issue.companyId, agentId, { + issueId: issue.id, + projectId: issue.projectId, + }); + return Boolean(budgetBlock); + } + async function reconcileUnassignedBlockingIssues() { const candidates = await db .select({ @@ -1526,6 +1569,7 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) ); const result = { + assignmentDispatched: 0, dispatchRequeued: 0, continuationRequeued: 0, orphanBlockersAssigned: 0, @@ -1574,7 +1618,28 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) } if (issue.status === "todo") { - if (!latestRun || latestRun.status === "succeeded") { + if (!latestRun) { + if (await hasQueuedIssueWake(issue.companyId, issue.id)) { + result.skipped += 1; + continue; + } + + if (await isInvocationBudgetBlocked(issue, agentId)) { + result.skipped += 1; + continue; + } + + const queued = await enqueueInitialAssignedTodoDispatch(issue, agentId); + if (queued) { + result.assignmentDispatched += 1; + result.issueIds.push(issue.id); + } else { + result.skipped += 1; + } + continue; + } + + if (latestRun.status === "succeeded") { result.skipped += 1; continue; } @@ -1599,6 +1664,11 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) continue; } + if (await isInvocationBudgetBlocked(issue, agentId)) { + result.skipped += 1; + continue; + } + const queued = await enqueueStrandedIssueRecovery({ issueId: issue.id, agentId, @@ -1640,6 +1710,11 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) continue; } + if (await isInvocationBudgetBlocked(issue, agentId)) { + result.skipped += 1; + continue; + } + const queued = await enqueueStrandedIssueRecovery({ issueId: issue.id, agentId,