From 911a1e8b0d24205acd5006e837a5c5e8c4bfb447 Mon Sep 17 00:00:00 2001 From: Devin Foley Date: Fri, 29 May 2026 19:48:59 -0700 Subject: [PATCH] Fix continuation recovery retry streaks by failure cause (#7031) ## Thinking Path > - Paperclip orchestrates AI agents for zero-human companies. > - The recovery subsystem is responsible for keeping assigned work moving when a live heartbeat run disappears or fails. > - `continuation_recovery` is the path that re-enqueues stranded `in_progress` issues after an interrupted continuation attempt. > - That path recently gained cause-aware retry classes and transient retry caps, but the streak counter was still aggregating mixed failure causes into one retry history. > - That meant a sequence like `timeout -> timeout -> adapter_failed -> adapter_failed` could escalate as a false `3x adapter_failed` streak even though the latest cause had only happened twice. > - This pull request makes continuation retry streaks count only consecutive failures whose `errorCode` matches the latest run and adds a regression test for the mixed-cause case. > - The benefit is that transient retry backoff and escalation now match the actual current failure cause instead of inheriting stale budget from unrelated failures. ## What Changed - Updated `summarizeRecentContinuationRetries(...)` to stop counting as soon as the continuation failure cause no longer matches the latest run's `errorCode`. - Wired the continuation recovery escalation/backoff path to pass the latest classified `errorCode` into the retry streak summarizer. - Added a regression test proving mixed-cause continuation failures do not consume the transient retry cap for a new failure cause. ## Verification - `pnpm exec vitest run server/src/__tests__/heartbeat-process-recovery.test.ts` ## Risks - Low risk. The behavioral change is intentionally narrow, but any future continuation retry modes that rely on `errorCode = null` will now be counted as a separate streak bucket and should be kept in mind when adding new retry classifications. ## Model Used - OpenAI Codex via Paperclip `codex_local` (GPT-5-based Codex coding agent; exact backend revision is not surfaced in the runtime), with tool use, shell execution, and patch application in the local repository. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [ ] If this change affects the UI, I have included before/after screenshots - [ ] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip --- .../heartbeat-process-recovery.test.ts | 271 +++++++++++++++++- server/src/services/recovery/service.ts | 175 +++++++++-- 2 files changed, 427 insertions(+), 19 deletions(-) diff --git a/server/src/__tests__/heartbeat-process-recovery.test.ts b/server/src/__tests__/heartbeat-process-recovery.test.ts index 3fb2dda5..58a4e092 100644 --- a/server/src/__tests__/heartbeat-process-recovery.test.ts +++ b/server/src/__tests__/heartbeat-process-recovery.test.ts @@ -328,6 +328,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { await db.delete(agentRuntimeState); await db.delete(companySkills); await db.delete(costEvents); + await db.delete(workspaceOperations); await db.delete(environmentLeases); await db.delete(environments); await db.delete(issuePlanDecompositions); @@ -1980,7 +1981,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { }); it("re-enqueues assigned todo work when the last issue run died and no wake remains", async () => { - const { agentId, issueId, runId } = await seedStrandedIssueFixture({ + const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "todo", runStatus: "failed", }); @@ -2314,7 +2315,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { }); it("re-enqueues continuation for stranded in-progress work with no active run", async () => { - const { agentId, issueId, runId } = await seedStrandedIssueFixture({ + const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ status: "in_progress", runStatus: "failed", }); @@ -2561,6 +2562,272 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => { expect(comments[0]?.body).not.toContain("- Failure: none recorded"); }); + it("keeps retrying transient adapter_failed continuation runs before the cap", async () => { + const { agentId, issueId, runId } = await seedStrandedIssueFixture({ + status: "in_progress", + runStatus: "failed", + retryReason: "issue_continuation_needed", + runErrorCode: "adapter_failed", + runError: "ssh: connection reset", + }); + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.continuationRequeued).toBe(1); + expect(result.escalated).toBe(0); + expect(result.issueIds).toEqual([issueId]); + + const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); + expect(issue?.status).toBe("in_progress"); + + const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); + expect(runs).toHaveLength(2); + const retryRun = runs.find((row) => row.id !== runId); + expect(retryRun?.contextSnapshot as Record | undefined).toMatchObject({ + issueId, + retryReason: "issue_continuation_needed", + source: "issue.continuation_recovery", + }); + if (retryRun) { + await waitForRunToSettle(heartbeat, retryRun.id); + } + }); + + it("escalates after repeated adapter_failed continuation retries with the cause in the comment", async () => { + const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ + status: "in_progress", + runStatus: "failed", + retryReason: "issue_continuation_needed", + runErrorCode: "adapter_failed", + runError: "ssh: connection reset", + }); + // Backfill two more consecutive failed continuation retries so the cap (3) is reached. + const olderTimestamps = [ + new Date("2026-03-18T23:50:00.000Z"), + new Date("2026-03-18T23:55:00.000Z"), + ]; + for (const finishedAt of olderTimestamps) { + await db.insert(heartbeatRuns).values({ + id: randomUUID(), + companyId, + agentId, + invocationSource: "automation", + triggerDetail: "system", + status: "failed", + contextSnapshot: { + issueId, + taskId: issueId, + wakeReason: "issue_continuation_needed", + retryReason: "issue_continuation_needed", + source: "issue.continuation_recovery", + }, + errorCode: "adapter_failed", + error: "ssh: connection reset", + startedAt: finishedAt, + finishedAt, + createdAt: finishedAt, + updatedAt: finishedAt, + }); + } + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.continuationRequeued).toBe(0); + expect(result.escalated).toBe(1); + expect(result.issueIds).toEqual([issueId]); + + const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); + expect(issue?.status).toBe("blocked"); + + await expectSourceScopedStrandedRecoveryAction({ + companyId, + agentId, + issueId, + runId, + previousStatus: "in_progress", + retryReason: "issue_continuation_needed", + }); + + const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); + expect(comments).toHaveLength(1); + expect(comments[0]?.body).toContain("retried continuation"); + expect(comments[0]?.body).toContain("3× attempts"); + expect(comments[0]?.body).toContain("Latest cause: `adapter_failed`"); + }); + + it("does not count mixed-cause continuation failures toward the transient cap", async () => { + const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ + status: "in_progress", + runStatus: "failed", + retryReason: "issue_continuation_needed", + runErrorCode: "adapter_failed", + runError: "ssh: connection reset", + }); + + await db.insert(heartbeatRuns).values([ + { + id: randomUUID(), + companyId, + agentId, + invocationSource: "automation", + triggerDetail: "system", + status: "failed", + contextSnapshot: { + issueId, + taskId: issueId, + wakeReason: "issue_continuation_needed", + retryReason: "issue_continuation_needed", + source: "issue.continuation_recovery", + }, + errorCode: "timeout", + error: "request timed out", + startedAt: new Date("2026-03-18T23:45:00.000Z"), + finishedAt: new Date("2026-03-18T23:45:00.000Z"), + createdAt: new Date("2026-03-18T23:45:00.000Z"), + updatedAt: new Date("2026-03-18T23:45:00.000Z"), + }, + { + id: randomUUID(), + companyId, + agentId, + invocationSource: "automation", + triggerDetail: "system", + status: "failed", + contextSnapshot: { + issueId, + taskId: issueId, + wakeReason: "issue_continuation_needed", + retryReason: "issue_continuation_needed", + source: "issue.continuation_recovery", + }, + errorCode: "timeout", + error: "request timed out", + startedAt: new Date("2026-03-18T23:50:00.000Z"), + finishedAt: new Date("2026-03-18T23:50:00.000Z"), + createdAt: new Date("2026-03-18T23:50:00.000Z"), + updatedAt: new Date("2026-03-18T23:50:00.000Z"), + }, + { + id: randomUUID(), + companyId, + agentId, + invocationSource: "automation", + triggerDetail: "system", + status: "failed", + contextSnapshot: { + issueId, + taskId: issueId, + wakeReason: "issue_continuation_needed", + retryReason: "issue_continuation_needed", + source: "issue.continuation_recovery", + }, + errorCode: "adapter_failed", + error: "ssh: connection reset", + startedAt: new Date("2026-03-18T23:55:00.000Z"), + finishedAt: new Date("2026-03-18T23:55:00.000Z"), + createdAt: new Date("2026-03-18T23:55:00.000Z"), + updatedAt: new Date("2026-03-18T23:55:00.000Z"), + }, + ]); + + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.continuationRequeued).toBe(1); + expect(result.escalated).toBe(0); + expect(result.issueIds).toEqual([issueId]); + + const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); + expect(issue?.status).toBe("in_progress"); + + const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); + expect(runs).toHaveLength(5); + const retryRun = runs.find((row) => { + const ctx = row.contextSnapshot as Record | null; + return row.id !== runId && + row.errorCode === null && + ctx?.retryReason === "issue_continuation_needed" && + ctx?.source === "issue.continuation_recovery"; + }); + expect(retryRun?.contextSnapshot as Record | undefined).toMatchObject({ + issueId, + retryReason: "issue_continuation_needed", + source: "issue.continuation_recovery", + }); + if (retryRun) { + await waitForRunToSettle(heartbeat, retryRun.id); + } + }); + + it("escalates non-retryable continuation failures immediately without enqueuing another retry", async () => { + const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({ + status: "in_progress", + runStatus: "failed", + runErrorCode: "budget_blocked", + runError: "Budget exceeded; refusing to dispatch.", + }); + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.continuationRequeued).toBe(0); + expect(result.escalated).toBe(1); + expect(result.issueIds).toEqual([issueId]); + + const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null); + expect(issue?.status).toBe("blocked"); + + await expectSourceScopedStrandedRecoveryAction({ + companyId, + agentId, + issueId, + runId, + previousStatus: "in_progress", + retryReason: null, + }); + + const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId)); + expect(comments).toHaveLength(1); + expect(comments[0]?.body).toContain("non-retryable failure"); + expect(comments[0]?.body).toContain("`budget_blocked`"); + + const followupRuns = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); + const continuationRetryRun = followupRuns.find((row) => { + const ctx = row.contextSnapshot as Record | null; + return ctx?.retryReason === "issue_continuation_needed"; + }); + expect(continuationRetryRun).toBeUndefined(); + for (const row of followupRuns) { + if (row.id !== runId) { + await waitForRunToSettle(heartbeat, row.id); + } + } + }); + + it("leaves the productive-but-stranded continuation path unchanged under the new classifier", async () => { + const { agentId, issueId, runId } = await seedStrandedIssueFixture({ + status: "in_progress", + runStatus: "succeeded", + livenessState: "advanced", + }); + const heartbeat = heartbeatService(db); + + const result = await heartbeat.reconcileStrandedAssignedIssues(); + expect(result.continuationRequeued).toBe(1); + expect(result.escalated).toBe(0); + expect(result.issueIds).toEqual([issueId]); + + const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); + const retryRun = runs.find((row) => row.id !== runId); + expect(retryRun?.contextSnapshot as Record | undefined).toMatchObject({ + issueId, + retryReason: "issue_continuation_needed", + source: "issue.productive_terminal_continuation_recovery", + }); + if (retryRun) { + await waitForRunToSettle(heartbeat, retryRun.id); + } + }); + it("reuses the raced stranded recovery issue when duplicate active recovery creation conflicts", async () => { const { companyId, issueId } = await seedStrandedIssueFixture({ status: "in_progress", diff --git a/server/src/services/recovery/service.ts b/server/src/services/recovery/service.ts index 2220de26..77e9d3fb 100644 --- a/server/src/services/recovery/service.ts +++ b/server/src/services/recovery/service.ts @@ -159,6 +159,54 @@ function didAutomaticRecoveryFail( ); } +const TRANSIENT_INFRA_CONTINUATION_ERROR_CODES = new Set([ + "adapter_failed", + "codex_transient_upstream", + "claude_transient_upstream", + "timeout", +]); + +const NON_RETRYABLE_CONTINUATION_ERROR_CODES = new Set([ + "agent_not_invokable", + "agent_not_found", + "budget_blocked", + "budget_exhausted", + "issue_paused", + "issue_dependencies_blocked", +]); + +const CONTINUATION_RECOVERY_TRANSIENT_MAX_ATTEMPTS = 3; +const CONTINUATION_RECOVERY_DEFAULT_MAX_ATTEMPTS = 1; +const CONTINUATION_RECOVERY_TRANSIENT_BASE_BACKOFF_MS = 60_000; + +type ContinuationRetryClassification = { + kind: "transient_infra" | "non_retryable" | "default"; + maxAttempts: number; + baseBackoffMs: number; + errorCode: string | null; +}; + +function classifyContinuationFailure(latestRun: LatestIssueRun): ContinuationRetryClassification { + const errorCode = readNonEmptyString(latestRun?.errorCode); + if (errorCode && NON_RETRYABLE_CONTINUATION_ERROR_CODES.has(errorCode)) { + return { kind: "non_retryable", maxAttempts: 0, baseBackoffMs: 0, errorCode }; + } + if (errorCode && TRANSIENT_INFRA_CONTINUATION_ERROR_CODES.has(errorCode)) { + return { + kind: "transient_infra", + maxAttempts: CONTINUATION_RECOVERY_TRANSIENT_MAX_ATTEMPTS, + baseBackoffMs: CONTINUATION_RECOVERY_TRANSIENT_BASE_BACKOFF_MS, + errorCode, + }; + } + return { + kind: "default", + maxAttempts: CONTINUATION_RECOVERY_DEFAULT_MAX_ATTEMPTS, + baseBackoffMs: 0, + errorCode, + }; +} + function successfulRunHandoffRecoveryEvidence(latestRun: LatestIssueRun): SuccessfulRunHandoffRecoveryEvidence | null { if (!latestRun) return null; @@ -438,6 +486,54 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) .then((rows) => rows[0] ?? null); } + async function summarizeRecentContinuationRetries( + companyId: string, + issueId: string, + errorCodeToMatch: string | null, + ) { + const rows = await db + .select({ + id: heartbeatRuns.id, + status: heartbeatRuns.status, + errorCode: heartbeatRuns.errorCode, + contextSnapshot: heartbeatRuns.contextSnapshot, + finishedAt: heartbeatRuns.finishedAt, + }) + .from(heartbeatRuns) + .where( + and( + eq(heartbeatRuns.companyId, companyId), + sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issueId}`, + ), + ) + .orderBy(desc(heartbeatRuns.createdAt), desc(heartbeatRuns.id)) + .limit(10); + + let consecutive = 0; + let latestFinishedAt: Date | null = null; + for (const row of rows) { + const ctx = parseObject(row.contextSnapshot); + const retryReason = readNonEmptyString(ctx.retryReason); + if (retryReason !== "issue_continuation_needed") break; + if ( + !UNSUCCESSFUL_HEARTBEAT_RUN_TERMINAL_STATUSES.includes( + row.status as (typeof UNSUCCESSFUL_HEARTBEAT_RUN_TERMINAL_STATUSES)[number], + ) + ) { + break; + } + + const rowErrorCode = readNonEmptyString(row.errorCode); + if (errorCodeToMatch !== rowErrorCode) { + break; + } + + consecutive += 1; + if (latestFinishedAt === null) latestFinishedAt = row.finishedAt ?? null; + } + return { consecutive, latestFinishedAt }; + } + async function hasActiveExecutionPath(companyId: string, issueId: string) { const [run, deferredWake] = await Promise.all([ db @@ -2545,24 +2641,69 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) } continue; } - if (didAutomaticRecoveryFail(latestRun, "issue_continuation_needed")) { - const failureSummary = summarizeRunFailureForIssueComment(latestRun); - const updated = await escalateStrandedAssignedIssue({ - issue, - previousStatus: "in_progress", - latestRun, - comment: - "Paperclip automatically retried continuation for this assigned `in_progress` issue after its live " + - `execution disappeared, but it still has no live execution path.${failureSummary ?? ""} ` + - "Moving it to `blocked` so it is visible for intervention.", - }); - if (updated) { - result.escalated += 1; - result.issueIds.push(issue.id); - } else { - result.skipped += 1; + if (isUnsuccessfulTerminalIssueRun(latestRun)) { + const classification = classifyContinuationFailure(latestRun); + + if (classification.kind === "non_retryable") { + const failureSummary = summarizeRunFailureForIssueComment(latestRun); + const updated = await escalateStrandedAssignedIssue({ + issue, + previousStatus: "in_progress", + latestRun, + comment: + "Paperclip detected a non-retryable failure on this issue's continuation run " + + `(\`${classification.errorCode}\`). Skipping automatic retries and moving it to \`blocked\` ` + + `so it is visible for intervention.${failureSummary ?? ""}`, + }); + if (updated) { + result.escalated += 1; + result.issueIds.push(issue.id); + } else { + result.skipped += 1; + } + continue; + } + + if (didAutomaticRecoveryFail(latestRun, "issue_continuation_needed")) { + const { consecutive, latestFinishedAt } = await summarizeRecentContinuationRetries( + issue.companyId, + issue.id, + classification.errorCode, + ); + if (consecutive >= classification.maxAttempts) { + const failureSummary = summarizeRunFailureForIssueComment(latestRun); + const attemptCopy = consecutive <= 1 ? "" : ` (${consecutive}× attempts)`; + const causeCopy = classification.errorCode + ? ` Latest cause: \`${classification.errorCode}\`.` + : ""; + const updated = await escalateStrandedAssignedIssue({ + issue, + previousStatus: "in_progress", + latestRun, + comment: + "Paperclip automatically retried continuation for this assigned `in_progress` issue after its live " + + `execution disappeared, but it still has no live execution path${attemptCopy}.${causeCopy}${failureSummary ?? ""} ` + + "Moving it to `blocked` so it is visible for intervention.", + }); + if (updated) { + result.escalated += 1; + result.issueIds.push(issue.id); + } else { + result.skipped += 1; + } + continue; + } + + if (classification.baseBackoffMs > 0 && latestFinishedAt) { + const elapsed = Date.now() - latestFinishedAt.getTime(); + const requiredDelay = classification.baseBackoffMs * + Math.pow(2, Math.max(0, consecutive - 1)); + if (elapsed < requiredDelay) { + result.skipped += 1; + continue; + } + } } - continue; } if (await isInvocationBudgetBlocked(issue, agentId)) {