diff --git a/server/src/__tests__/heartbeat-process-recovery.test.ts b/server/src/__tests__/heartbeat-process-recovery.test.ts index d66c2617..e572d062 100644 --- a/server/src/__tests__/heartbeat-process-recovery.test.ts +++ b/server/src/__tests__/heartbeat-process-recovery.test.ts @@ -516,6 +516,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { status: "todo" | "in_progress"; runStatus: "failed" | "timed_out" | "cancelled" | "succeeded"; retryReason?: "assignment_recovery" | "issue_continuation_needed" | null; + runSource?: string | null; assignToUser?: boolean; activePauseHold?: boolean; livenessState?: "completed" | "advanced" | "plan_only" | "empty_response" | "blocked" | "failed" | "needs_followup" | null; @@ -582,6 +583,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { ? "issue_assignment_recovery" : input.retryReason ?? "issue_assigned", ...(input.retryReason ? { retryReason: input.retryReason } : {}), + ...(input.runSource ? { source: input.runSource } : {}), }, startedAt: now, finishedAt: new Date("2026-03-19T00:05:00.000Z"), @@ -2180,21 +2182,20 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { expect(wakeups).toHaveLength(1); }); - it("records productive continuation instead of recovery when the latest automatic continuation succeeded", async () => { + it("re-enqueues recovery when the latest in-progress continuation made progress but left no live path", async () => { const { agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "succeeded", - retryReason: "issue_continuation_needed", livenessState: "advanced", }); const heartbeat = heartbeatService(db); const result = await heartbeat.reconcileStrandedAssignedIssues(); - expect(result.continuationRequeued).toBe(0); - expect(result.productiveContinuationObserved).toBe(1); + expect(result.continuationRequeued).toBe(1); + expect(result.productiveContinuationObserved).toBe(0); expect(result.successfulContinuationObserved).toBe(0); expect(result.escalated).toBe(0); - expect(result.issueIds).toEqual([]); + expect(result.issueIds).toEqual([issueId]); const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); expect(issue?.status).toBe("in_progress"); @@ -2206,10 +2207,136 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)); - expect(runs.map((row) => row.id)).toEqual([runId]); + expect(runs).toHaveLength(2); + const retryRun = runs.find((row) => row.id !== runId); + expect(retryRun?.contextSnapshot as Record | undefined).toMatchObject({ + issueId, + taskId: issueId, + retryReason: "issue_continuation_needed", + retryOfRunId: runId, + source: "issue.productive_terminal_continuation_recovery", + }); const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId)); - expect(wakeups).toHaveLength(1); + expect(wakeups).toHaveLength(2); + }); + + it("blocks stranded in-progress work after a productive continuation retry was already used", async () => { + const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ + status: "in_progress", + runStatus: "succeeded", + retryReason: "issue_continuation_needed", + runSource: "issue.productive_terminal_continuation_recovery", + livenessState: "advanced", + }); + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.continuationRequeued).toBe(0); + expect(result.escalated).toBe(1); + expect(result.issueIds).toEqual([issueId]); + + const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); + expect(issue?.status).toBe("blocked"); + + const recovery = await expectStrandedRecoveryArtifacts({ + companyId, + agentId, + issueId, + runId, + previousStatus: "in_progress", + retryReason: "issue_continuation_needed", + }); + + const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); + expect(comments).toHaveLength(1); + expect(comments[0]?.body).toContain("automatically retried continuation"); + expect(comments[0]?.body).toContain("still has no live execution path"); + expect(comments[0]?.body).toContain(`Recovery issue: [${recovery.identifier}]`); + }); + + it("allows one productive-terminal recovery after regular continuation recovery made progress", async () => { + const { agentId, issueId, runId } = await seedStrandedIssueFixture({ + status: "in_progress", + runStatus: "succeeded", + retryReason: "issue_continuation_needed", + runSource: "issue.continuation_recovery", + livenessState: "advanced", + }); + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.continuationRequeued).toBe(1); + expect(result.escalated).toBe(0); + expect(result.issueIds).toEqual([issueId]); + + const runs = await db + .select() + .from(heartbeatRuns) + .where(eq(heartbeatRuns.agentId, agentId)); + const retryRun = runs.find((row) => row.id !== runId); + expect(retryRun?.contextSnapshot as Record | undefined).toMatchObject({ + issueId, + taskId: issueId, + retryReason: "issue_continuation_needed", + retryOfRunId: runId, + source: "issue.productive_terminal_continuation_recovery", + }); + }); + + it("does not treat a productive terminal run as healthy when in-progress work has no live path", async () => { + const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ + status: "in_progress", + runStatus: "succeeded", + livenessState: "advanced", + }); + const heartbeat = heartbeatService(db); + + const sourceIssue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); + expect(sourceIssue).toMatchObject({ + status: "in_progress", + assigneeAgentId: agentId, + assigneeUserId: null, + executionRunId: null, + }); + + const activeRuns = await db + .select() + .from(heartbeatRuns) + .where(and(eq(heartbeatRuns.companyId, companyId), inArray(heartbeatRuns.status, ["queued", "running"]))); + expect(activeRuns).toHaveLength(0); + + const liveWakeups = await db + .select() + .from(agentWakeupRequests) + .where(and(eq(agentWakeupRequests.companyId, companyId), inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]))); + expect(liveWakeups).toHaveLength(0); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.productiveContinuationObserved).toBe(0); + expect(result.continuationRequeued + result.escalated).toBe(1); + expect(result.issueIds).toEqual([issueId]); + + const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); + const recoveryIssues = await db + .select() + .from(issues) + .where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery"))); + const followupRuns = await db + .select() + .from(heartbeatRuns) + .where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId))); + expect(comments).toHaveLength(0); + expect(recoveryIssues).toHaveLength(0); + expect(followupRuns).toHaveLength(2); + const retryRun = followupRuns.find((row) => row.id !== runId); + expect(retryRun?.contextSnapshot as Record | undefined).toMatchObject({ + issueId, + taskId: issueId, + retryReason: "issue_continuation_needed", + retryOfRunId: runId, + source: "issue.productive_terminal_continuation_recovery", + }); }); it("does not reconcile user-assigned work through the agent stranded-work recovery path", async () => { diff --git a/server/src/__tests__/run-continuations.test.ts b/server/src/__tests__/run-continuations.test.ts index a9e1e7b0..82b13235 100644 --- a/server/src/__tests__/run-continuations.test.ts +++ b/server/src/__tests__/run-continuations.test.ts @@ -106,6 +106,24 @@ describe("run liveness continuations", () => { expect(decision.nextAttempt).toBe(2); }); + it("leaves advanced terminal runs to stranded issue recovery instead of bounded liveness continuation", () => { + const decision = decideRunLivenessContinuation({ + run: run(), + issue: issue(), + agent: agent(), + livenessState: "advanced", + livenessReason: "Run produced concrete action evidence: created an issue comment", + nextAction: "Resume the implementation from the remaining acceptance criteria.", + budgetBlocked: false, + idempotentWakeExists: false, + }); + + expect(decision).toEqual({ + kind: "skip", + reason: "liveness state is not actionable for continuation", + }); + }); + it("does not enqueue a third continuation and returns an exhaustion comment", () => { const decision = decideRunLivenessContinuation({ run: run({ continuationAttempt: 2 }), diff --git a/server/src/services/recovery/service.ts b/server/src/services/recovery/service.ts index ec721351..87192d7a 100644 --- a/server/src/services/recovery/service.ts +++ b/server/src/services/recovery/service.ts @@ -74,6 +74,7 @@ type LatestIssueRun = Pick< typeof heartbeatRuns.$inferSelect, "id" | "agentId" | "status" | "error" | "errorCode" | "contextSnapshot" | "livenessState" > | null; +type SuccessfulLatestIssueRun = NonNullable & { status: "succeeded" }; type WatchdogDecisionActor = | { type: "board"; userId?: string | null; runId?: string | null } @@ -188,7 +189,7 @@ function isUnsuccessfulTerminalIssueRun(latestRun: LatestIssueRun) { ); } -function isSuccessfulInProgressContinuationRun(latestRun: LatestIssueRun) { +function isSuccessfulInProgressContinuationRun(latestRun: LatestIssueRun): latestRun is SuccessfulLatestIssueRun { return latestRun?.status === "succeeded"; } @@ -200,6 +201,13 @@ function isProductiveContinuationRun(latestRun: LatestIssueRun) { latestRun.livenessState === "needs_followup"); } +function isRepeatedProductiveContinuationRecovery(latestRun: SuccessfulLatestIssueRun) { + const latestContext = parseObject(latestRun.contextSnapshot); + return readNonEmptyString(latestContext.retryReason) === "issue_continuation_needed" && + readNonEmptyString(latestContext.source) === "issue.productive_terminal_continuation_recovery" && + isProductiveContinuationRun(latestRun); +} + function parseLivenessIncidentKey(incidentKey: string | null | undefined) { if (!incidentKey) return null; return parseIssueGraphLivenessIncidentKey(incidentKey); @@ -1706,12 +1714,51 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) continue; } if (isSuccessfulInProgressContinuationRun(latestRun)) { - if (isProductiveContinuationRun(latestRun)) { - result.productiveContinuationObserved += 1; - } else { + const successfulRun = latestRun; + + if (!isProductiveContinuationRun(successfulRun)) { result.successfulContinuationObserved += 1; + result.skipped += 1; + continue; + } + + if (isRepeatedProductiveContinuationRecovery(successfulRun)) { + const updated = await escalateStrandedAssignedIssue({ + issue, + previousStatus: "in_progress", + latestRun: successfulRun, + comment: + "Paperclip automatically retried continuation for this assigned `in_progress` issue and the retry " + + "made progress, but it still has no live execution path. Moving it to `blocked` so it is visible for intervention.", + }); + if (updated) { + result.escalated += 1; + result.issueIds.push(issue.id); + } else { + result.skipped += 1; + } + continue; + } + + if (await isInvocationBudgetBlocked(issue, agentId)) { + result.skipped += 1; + continue; + } + + const queued = await enqueueStrandedIssueRecovery({ + issueId: issue.id, + agentId, + reason: "issue_continuation_needed", + retryReason: "issue_continuation_needed", + source: "issue.productive_terminal_continuation_recovery", + retryOfRunId: successfulRun.id, + }); + if (queued) { + result.continuationRequeued += 1; + result.issueIds.push(issue.id); + } else { + result.skipped += 1; } - result.skipped += 1; continue; } if (didAutomaticRecoveryFail(latestRun, "issue_continuation_needed")) {