[codex] Recover productive terminal continuations (#4956)
## Thinking Path > - Paperclip orchestrates AI agents through issue-scoped heartbeat runs > - Recovery logic decides whether in-progress work still has a live path after a terminal run > - A productive terminal continuation can still leave an issue stranded when no active run or wake remains > - Treating that state as healthy leaves work stuck despite evidence that more action is needed > - This pull request re-enqueues recovery for productive terminal continuations that left no live path > - The benefit is fewer silently stranded in-progress issues after agents make partial progress ## What Changed - Reclassified successful-but-productive terminal continuations as recoverable when no live path remains. - Enqueue a follow-up recovery wake with the original run id and continuation metadata. - Added regression tests covering productive terminal continuation recovery and advanced liveness handoff. ## Verification - `pnpm exec vitest run server/src/__tests__/heartbeat-process-recovery.test.ts server/src/__tests__/run-continuations.test.ts` ## Risks - Medium risk: recovery may schedule one more follow-up where Paperclip previously considered the work observed. The existing uniqueness, budget, and escalation checks still constrain retry loops. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex, GPT-5 coding agent, tool use and local command execution. Exact context window was not exposed in the runtime. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -516,6 +516,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||
status: "todo" | "in_progress";
|
||||
runStatus: "failed" | "timed_out" | "cancelled" | "succeeded";
|
||||
retryReason?: "assignment_recovery" | "issue_continuation_needed" | null;
|
||||
runSource?: string | null;
|
||||
assignToUser?: boolean;
|
||||
activePauseHold?: boolean;
|
||||
livenessState?: "completed" | "advanced" | "plan_only" | "empty_response" | "blocked" | "failed" | "needs_followup" | null;
|
||||
@@ -582,6 +583,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||
? "issue_assignment_recovery"
|
||||
: input.retryReason ?? "issue_assigned",
|
||||
...(input.retryReason ? { retryReason: input.retryReason } : {}),
|
||||
...(input.runSource ? { source: input.runSource } : {}),
|
||||
},
|
||||
startedAt: now,
|
||||
finishedAt: new Date("2026-03-19T00:05:00.000Z"),
|
||||
@@ -2180,21 +2182,20 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||
expect(wakeups).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("records productive continuation instead of recovery when the latest automatic continuation succeeded", async () => {
|
||||
it("re-enqueues recovery when the latest in-progress continuation made progress but left no live path", async () => {
|
||||
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "succeeded",
|
||||
retryReason: "issue_continuation_needed",
|
||||
livenessState: "advanced",
|
||||
});
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.continuationRequeued).toBe(0);
|
||||
expect(result.productiveContinuationObserved).toBe(1);
|
||||
expect(result.continuationRequeued).toBe(1);
|
||||
expect(result.productiveContinuationObserved).toBe(0);
|
||||
expect(result.successfulContinuationObserved).toBe(0);
|
||||
expect(result.escalated).toBe(0);
|
||||
expect(result.issueIds).toEqual([]);
|
||||
expect(result.issueIds).toEqual([issueId]);
|
||||
|
||||
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
|
||||
expect(issue?.status).toBe("in_progress");
|
||||
@@ -2206,10 +2207,136 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(eq(heartbeatRuns.agentId, agentId));
|
||||
expect(runs.map((row) => row.id)).toEqual([runId]);
|
||||
expect(runs).toHaveLength(2);
|
||||
const retryRun = runs.find((row) => row.id !== runId);
|
||||
expect(retryRun?.contextSnapshot as Record<string, unknown> | undefined).toMatchObject({
|
||||
issueId,
|
||||
taskId: issueId,
|
||||
retryReason: "issue_continuation_needed",
|
||||
retryOfRunId: runId,
|
||||
source: "issue.productive_terminal_continuation_recovery",
|
||||
});
|
||||
|
||||
const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId));
|
||||
expect(wakeups).toHaveLength(1);
|
||||
expect(wakeups).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("blocks stranded in-progress work after a productive continuation retry was already used", async () => {
|
||||
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "succeeded",
|
||||
retryReason: "issue_continuation_needed",
|
||||
runSource: "issue.productive_terminal_continuation_recovery",
|
||||
livenessState: "advanced",
|
||||
});
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.continuationRequeued).toBe(0);
|
||||
expect(result.escalated).toBe(1);
|
||||
expect(result.issueIds).toEqual([issueId]);
|
||||
|
||||
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
|
||||
expect(issue?.status).toBe("blocked");
|
||||
|
||||
const recovery = await expectStrandedRecoveryArtifacts({
|
||||
companyId,
|
||||
agentId,
|
||||
issueId,
|
||||
runId,
|
||||
previousStatus: "in_progress",
|
||||
retryReason: "issue_continuation_needed",
|
||||
});
|
||||
|
||||
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
|
||||
expect(comments).toHaveLength(1);
|
||||
expect(comments[0]?.body).toContain("automatically retried continuation");
|
||||
expect(comments[0]?.body).toContain("still has no live execution path");
|
||||
expect(comments[0]?.body).toContain(`Recovery issue: [${recovery.identifier}]`);
|
||||
});
|
||||
|
||||
it("allows one productive-terminal recovery after regular continuation recovery made progress", async () => {
|
||||
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "succeeded",
|
||||
retryReason: "issue_continuation_needed",
|
||||
runSource: "issue.continuation_recovery",
|
||||
livenessState: "advanced",
|
||||
});
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.continuationRequeued).toBe(1);
|
||||
expect(result.escalated).toBe(0);
|
||||
expect(result.issueIds).toEqual([issueId]);
|
||||
|
||||
const runs = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(eq(heartbeatRuns.agentId, agentId));
|
||||
const retryRun = runs.find((row) => row.id !== runId);
|
||||
expect(retryRun?.contextSnapshot as Record<string, unknown> | undefined).toMatchObject({
|
||||
issueId,
|
||||
taskId: issueId,
|
||||
retryReason: "issue_continuation_needed",
|
||||
retryOfRunId: runId,
|
||||
source: "issue.productive_terminal_continuation_recovery",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not treat a productive terminal run as healthy when in-progress work has no live path", async () => {
|
||||
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "succeeded",
|
||||
livenessState: "advanced",
|
||||
});
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const sourceIssue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
|
||||
expect(sourceIssue).toMatchObject({
|
||||
status: "in_progress",
|
||||
assigneeAgentId: agentId,
|
||||
assigneeUserId: null,
|
||||
executionRunId: null,
|
||||
});
|
||||
|
||||
const activeRuns = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.companyId, companyId), inArray(heartbeatRuns.status, ["queued", "running"])));
|
||||
expect(activeRuns).toHaveLength(0);
|
||||
|
||||
const liveWakeups = await db
|
||||
.select()
|
||||
.from(agentWakeupRequests)
|
||||
.where(and(eq(agentWakeupRequests.companyId, companyId), inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"])));
|
||||
expect(liveWakeups).toHaveLength(0);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.productiveContinuationObserved).toBe(0);
|
||||
expect(result.continuationRequeued + result.escalated).toBe(1);
|
||||
expect(result.issueIds).toEqual([issueId]);
|
||||
|
||||
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
|
||||
const recoveryIssues = await db
|
||||
.select()
|
||||
.from(issues)
|
||||
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery")));
|
||||
const followupRuns = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId)));
|
||||
expect(comments).toHaveLength(0);
|
||||
expect(recoveryIssues).toHaveLength(0);
|
||||
expect(followupRuns).toHaveLength(2);
|
||||
const retryRun = followupRuns.find((row) => row.id !== runId);
|
||||
expect(retryRun?.contextSnapshot as Record<string, unknown> | undefined).toMatchObject({
|
||||
issueId,
|
||||
taskId: issueId,
|
||||
retryReason: "issue_continuation_needed",
|
||||
retryOfRunId: runId,
|
||||
source: "issue.productive_terminal_continuation_recovery",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not reconcile user-assigned work through the agent stranded-work recovery path", async () => {
|
||||
|
||||
@@ -106,6 +106,24 @@ describe("run liveness continuations", () => {
|
||||
expect(decision.nextAttempt).toBe(2);
|
||||
});
|
||||
|
||||
it("leaves advanced terminal runs to stranded issue recovery instead of bounded liveness continuation", () => {
|
||||
const decision = decideRunLivenessContinuation({
|
||||
run: run(),
|
||||
issue: issue(),
|
||||
agent: agent(),
|
||||
livenessState: "advanced",
|
||||
livenessReason: "Run produced concrete action evidence: created an issue comment",
|
||||
nextAction: "Resume the implementation from the remaining acceptance criteria.",
|
||||
budgetBlocked: false,
|
||||
idempotentWakeExists: false,
|
||||
});
|
||||
|
||||
expect(decision).toEqual({
|
||||
kind: "skip",
|
||||
reason: "liveness state is not actionable for continuation",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not enqueue a third continuation and returns an exhaustion comment", () => {
|
||||
const decision = decideRunLivenessContinuation({
|
||||
run: run({ continuationAttempt: 2 }),
|
||||
|
||||
@@ -74,6 +74,7 @@ type LatestIssueRun = Pick<
|
||||
typeof heartbeatRuns.$inferSelect,
|
||||
"id" | "agentId" | "status" | "error" | "errorCode" | "contextSnapshot" | "livenessState"
|
||||
> | null;
|
||||
type SuccessfulLatestIssueRun = NonNullable<LatestIssueRun> & { status: "succeeded" };
|
||||
|
||||
type WatchdogDecisionActor =
|
||||
| { type: "board"; userId?: string | null; runId?: string | null }
|
||||
@@ -188,7 +189,7 @@ function isUnsuccessfulTerminalIssueRun(latestRun: LatestIssueRun) {
|
||||
);
|
||||
}
|
||||
|
||||
function isSuccessfulInProgressContinuationRun(latestRun: LatestIssueRun) {
|
||||
function isSuccessfulInProgressContinuationRun(latestRun: LatestIssueRun): latestRun is SuccessfulLatestIssueRun {
|
||||
return latestRun?.status === "succeeded";
|
||||
}
|
||||
|
||||
@@ -200,6 +201,13 @@ function isProductiveContinuationRun(latestRun: LatestIssueRun) {
|
||||
latestRun.livenessState === "needs_followup");
|
||||
}
|
||||
|
||||
function isRepeatedProductiveContinuationRecovery(latestRun: SuccessfulLatestIssueRun) {
|
||||
const latestContext = parseObject(latestRun.contextSnapshot);
|
||||
return readNonEmptyString(latestContext.retryReason) === "issue_continuation_needed" &&
|
||||
readNonEmptyString(latestContext.source) === "issue.productive_terminal_continuation_recovery" &&
|
||||
isProductiveContinuationRun(latestRun);
|
||||
}
|
||||
|
||||
function parseLivenessIncidentKey(incidentKey: string | null | undefined) {
|
||||
if (!incidentKey) return null;
|
||||
return parseIssueGraphLivenessIncidentKey(incidentKey);
|
||||
@@ -1706,12 +1714,51 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
|
||||
continue;
|
||||
}
|
||||
if (isSuccessfulInProgressContinuationRun(latestRun)) {
|
||||
if (isProductiveContinuationRun(latestRun)) {
|
||||
result.productiveContinuationObserved += 1;
|
||||
} else {
|
||||
const successfulRun = latestRun;
|
||||
|
||||
if (!isProductiveContinuationRun(successfulRun)) {
|
||||
result.successfulContinuationObserved += 1;
|
||||
result.skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (isRepeatedProductiveContinuationRecovery(successfulRun)) {
|
||||
const updated = await escalateStrandedAssignedIssue({
|
||||
issue,
|
||||
previousStatus: "in_progress",
|
||||
latestRun: successfulRun,
|
||||
comment:
|
||||
"Paperclip automatically retried continuation for this assigned `in_progress` issue and the retry " +
|
||||
"made progress, but it still has no live execution path. Moving it to `blocked` so it is visible for intervention.",
|
||||
});
|
||||
if (updated) {
|
||||
result.escalated += 1;
|
||||
result.issueIds.push(issue.id);
|
||||
} else {
|
||||
result.skipped += 1;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (await isInvocationBudgetBlocked(issue, agentId)) {
|
||||
result.skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
const queued = await enqueueStrandedIssueRecovery({
|
||||
issueId: issue.id,
|
||||
agentId,
|
||||
reason: "issue_continuation_needed",
|
||||
retryReason: "issue_continuation_needed",
|
||||
source: "issue.productive_terminal_continuation_recovery",
|
||||
retryOfRunId: successfulRun.id,
|
||||
});
|
||||
if (queued) {
|
||||
result.continuationRequeued += 1;
|
||||
result.issueIds.push(issue.id);
|
||||
} else {
|
||||
result.skipped += 1;
|
||||
}
|
||||
result.skipped += 1;
|
||||
continue;
|
||||
}
|
||||
if (didAutomaticRecoveryFail(latestRun, "issue_continuation_needed")) {
|
||||
|
||||
Reference in New Issue
Block a user