[codex] Roll up May 17 branch changes (#6210)

## Thinking Path

> - Paperclip is the control plane for autonomous AI companies, so agent
work needs visible ownership, recovery, and operator controls.
> - This local branch had accumulated several related control-plane
reliability and operator-experience fixes across recovery actions,
watchdog folding, model-profile defaults, mentions, markdown editing,
plugin launchers, and small UI polish.
> - The branch needed to be converted into a PR against the current
`origin/master` without losing dirty work or including lockfile/workflow
churn.
> - The safest standalone shape is a single rollup PR because the
recovery/server/UI files overlap heavily across the local commits and
splitting would create avoidable conflicts.
> - This pull request replays the local branch onto latest
`origin/master`, preserves the uncommitted work as logical commits, and
adds a Zod 4 validator compatibility fix found during verification.
> - The benefit is that the May 17 local branch can be reviewed and
merged as one coherent, conflict-free branch under the 100-file Greptile
limit.

## What Changed

- Rebased the local May 17 branch work onto current `origin/master` in a
dedicated worktree.
- Preserved and committed previously dirty changes for recovery retry
handling, plugin/sidebar launcher polish, and `.herenow` ignores.
- Added recovery-action behavior for returning source issues to `todo`
when retrying source-scoped recovery.
- Included the existing local recovery/liveness/watchdog fold, Codex
cheap-profile, markdown/mention, duplicate-agent, and UI polish commits
from the branch.
- Normalized shared validator `z.record(...)` schemas to explicit
string-key records for Zod 4 compatibility.
- Confirmed the PR has no `pnpm-lock.yaml` or `.github/workflows/*`
changes and stays below the 100-file Greptile limit.

## Verification

- `pnpm install --frozen-lockfile --ignore-scripts`
- `npm run install` in
`node_modules/.pnpm/sqlite3@5.1.7/node_modules/sqlite3` to build the
local native sqlite3 binding after installing with scripts disabled
- `pnpm exec vitest run packages/shared/src/validators/issue.test.ts
packages/shared/src/project-mentions.test.ts
packages/adapter-utils/src/server-utils.test.ts
server/src/__tests__/heartbeat-model-profile.test.ts
server/src/__tests__/issue-recovery-actions.test.ts
server/src/__tests__/issue-agent-mutation-ownership-routes.test.ts
server/src/__tests__/heartbeat-active-run-output-watchdog.test.ts
server/src/__tests__/plugin-local-folders.test.ts
ui/src/components/IssueRecoveryActionCard.test.tsx
ui/src/components/Sidebar.test.tsx
ui/src/components/SidebarAccountMenu.test.tsx
ui/src/components/IssueProperties.test.tsx
ui/src/components/MarkdownEditor.test.tsx
ui/src/components/MarkdownBody.test.tsx
ui/src/lib/duplicate-agent-payload.test.ts
ui/src/pages/Routines.test.tsx`
- First pass: 13 files passed with 201 passing tests; 3 server files
failed before sqlite3 native binding was built.
- After rebuilding sqlite3:
`server/src/__tests__/heartbeat-model-profile.test.ts`,
`server/src/__tests__/issue-recovery-actions.test.ts`, and
`server/src/__tests__/heartbeat-active-run-output-watchdog.test.ts`
passed/loaded; embedded Postgres tests were skipped by the local host
guard.
- `pnpm --filter @paperclipai/shared typecheck`
- `pnpm --filter @paperclipai/adapter-utils typecheck`
- `pnpm --filter @paperclipai/server typecheck`
- `pnpm --filter @paperclipai/ui typecheck`

## Risks

- Medium risk: this is a broad rollup PR across recovery semantics,
server tests, shared validators, and UI surfaces.
- Some embedded Postgres tests skipped locally due the host guard, so CI
should provide the stronger database-backed signal.
- UI changes were covered by component tests, but no browser screenshot
was captured in this PR creation pass.
- This branch may overlap with existing recovery/liveness PR work; merge
this PR independently or restack/close overlapping branches rather than
merging duplicate implementations together.

> For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and
discuss it in `#dev` before opening the PR. Feature PRs that overlap
with planned core work may need to be redirected — check the roadmap
first. See `CONTRIBUTING.md`.

## Model Used

- OpenAI Codex, GPT-5-based coding agent, tool-enabled local repository
and GitHub workflow, medium reasoning effort.

## Checklist

- [x] I have included a thinking path that traces from project context
to this change
- [x] I have specified the model used (with version and capability
details)
- [x] I have checked ROADMAP.md and confirmed this PR does not duplicate
planned core work
- [x] I have run tests locally and they pass
- [x] I have added or updated tests where applicable
- [ ] If this change affects the UI, I have included before/after
screenshots
- [x] I have updated relevant documentation to reflect my changes
- [x] I have considered and documented any risks above
- [x] I will address all Greptile and reviewer comments before
requesting merge

---------

Co-authored-by: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Dotta
2026-05-17 17:15:06 -05:00
committed by GitHub
parent 705c1b8d81
commit d734bd43d1
83 changed files with 3675 additions and 180 deletions
@@ -84,6 +84,11 @@ vi.mock("../services/index.js", () => ({
getActiveForIssue: vi.fn(async () => null),
listActiveForIssues: vi.fn(async () => new Map()),
}),
issueThreadInteractionService: () => ({
listForIssue: vi.fn(async () => []),
expireRequestConfirmationsSupersededByComment: vi.fn(async () => []),
expireStaleRequestConfirmationsForIssueDocument: vi.fn(async () => []),
}),
documentService: () => ({}),
routineService: () => ({}),
workProductService: () => ({}),
@@ -0,0 +1,279 @@
import { execFile } from "node:child_process";
import { randomUUID } from "node:crypto";
import { mkdtemp, rm, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { promisify } from "node:util";
import { eq, ne } from "drizzle-orm";
import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest";
import {
agentTaskSessions,
agents,
companies,
createDb,
executionWorkspaces,
heartbeatRuns,
issues,
projects,
projectWorkspaces,
} from "@paperclipai/db";
import {
getEmbeddedPostgresTestSupport,
startEmbeddedPostgresTestDatabase,
} from "./helpers/embedded-postgres.js";
import { heartbeatService } from "../services/heartbeat.ts";
import { instanceSettingsService } from "../services/instance-settings.ts";
const execFileAsync = promisify(execFile);
const adapterExecute = vi.hoisted(() => vi.fn(async () => ({
exitCode: 0,
signal: null,
timedOut: false,
sessionParams: { sessionId: "fresh-session" },
sessionDisplayId: "fresh-session",
summary: "Accepted plan workspace refresh test run.",
provider: "test",
model: "test-model",
})));
vi.mock("../adapters/index.js", () => ({
getServerAdapter: () => ({
type: "codex_local",
execute: adapterExecute,
supportsLocalAgentJwt: false,
}),
listAdapterModelProfiles: async () => [],
runningProcesses: new Map(),
}));
const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport();
const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip;
if (!embeddedPostgresSupport.supported) {
console.warn(
`Skipping embedded Postgres accepted-plan workspace refresh tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`,
);
}
async function createGitRepo() {
const repoRoot = await mkdtemp(path.join(os.tmpdir(), "paperclip-accepted-plan-repo-"));
await execFileAsync("git", ["init"], { cwd: repoRoot });
await execFileAsync("git", ["config", "user.email", "paperclip-test@example.com"], { cwd: repoRoot });
await execFileAsync("git", ["config", "user.name", "Paperclip Test"], { cwd: repoRoot });
await writeFile(path.join(repoRoot, "README.md"), "accepted plan workspace refresh\n");
await execFileAsync("git", ["add", "README.md"], { cwd: repoRoot });
await execFileAsync("git", ["commit", "-m", "initial"], { cwd: repoRoot });
return repoRoot;
}
describeEmbeddedPostgres("accepted plan workspace refresh", () => {
let db!: ReturnType<typeof createDb>;
let tempDb: Awaited<ReturnType<typeof startEmbeddedPostgresTestDatabase>> | null = null;
const tempRoots: string[] = [];
beforeAll(async () => {
tempDb = await startEmbeddedPostgresTestDatabase("paperclip-accepted-plan-workspace-");
db = createDb(tempDb.connectionString);
}, 20_000);
afterEach(async () => {
adapterExecute.mockClear();
let idlePolls = 0;
for (let attempt = 0; attempt < 100; attempt += 1) {
const runs = await db
.select({ status: heartbeatRuns.status })
.from(heartbeatRuns);
const hasActiveRun = runs.some((run) => run.status === "queued" || run.status === "running");
if (!hasActiveRun) {
idlePolls += 1;
if (idlePolls >= 5) break;
} else {
idlePolls = 0;
}
await new Promise((resolve) => setTimeout(resolve, 50));
}
while (tempRoots.length > 0) {
const root = tempRoots.pop();
if (root) await rm(root, { recursive: true, force: true }).catch(() => undefined);
}
});
afterAll(async () => {
await db.$client.end();
await tempDb?.cleanup();
});
it("realizes an isolated workspace and drops stale shared task-session params before executing", async () => {
const companyId = randomUUID();
const projectId = randomUUID();
const projectWorkspaceId = randomUUID();
const sharedExecutionWorkspaceId = randomUUID();
const issueId = randomUUID();
const agentId = randomUUID();
const repoRoot = await createGitRepo();
tempRoots.push(repoRoot);
await instanceSettingsService(db).updateExperimental({
enableIsolatedWorkspaces: true,
});
await db.insert(companies).values({
id: companyId,
name: "Acme",
issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`,
status: "active",
createdAt: new Date(),
updatedAt: new Date(),
});
await db.insert(projects).values({
id: projectId,
companyId,
name: "Accepted Plan Workspace Refresh",
status: "active",
createdAt: new Date(),
updatedAt: new Date(),
});
await db.insert(projectWorkspaces).values({
id: projectWorkspaceId,
companyId,
projectId,
name: "Primary",
cwd: repoRoot,
isPrimary: true,
createdAt: new Date(),
updatedAt: new Date(),
});
await db.insert(agents).values({
id: agentId,
companyId,
name: "CodexCoder",
role: "engineer",
status: "idle",
adapterType: "codex_local",
adapterConfig: {},
runtimeConfig: {},
permissions: {},
createdAt: new Date(),
updatedAt: new Date(),
});
await db.insert(executionWorkspaces).values({
id: sharedExecutionWorkspaceId,
companyId,
projectId,
projectWorkspaceId,
mode: "shared_workspace",
strategyType: "project_primary",
name: "Shared planning workspace",
status: "active",
cwd: repoRoot,
providerType: "local_fs",
providerRef: repoRoot,
createdAt: new Date(),
updatedAt: new Date(),
});
await db.insert(issues).values({
id: issueId,
companyId,
projectId,
projectWorkspaceId,
title: "Implement accepted plan",
status: "in_progress",
workMode: "planning",
priority: "medium",
assigneeAgentId: agentId,
identifier: "PAP-9122",
executionWorkspaceId: sharedExecutionWorkspaceId,
executionWorkspaceSettings: {
mode: "isolated_workspace",
},
createdAt: new Date(),
updatedAt: new Date(),
});
await db.insert(agentTaskSessions).values({
companyId,
agentId,
adapterType: "codex_local",
taskKey: issueId,
sessionParamsJson: {
sessionId: "stale-shared-session",
cwd: repoRoot,
workspaceId: projectWorkspaceId,
},
sessionDisplayId: "stale-shared-session",
});
adapterExecute.mockImplementationOnce(async () => {
await db.update(issues).set({ status: "done", updatedAt: new Date() }).where(eq(issues.id, issueId));
return {
exitCode: 0,
signal: null,
timedOut: false,
sessionParams: { sessionId: "fresh-session" },
sessionDisplayId: "fresh-session",
summary: "Accepted plan workspace refresh test run.",
provider: "test",
model: "test-model",
};
});
const heartbeat = heartbeatService(db);
const run = await heartbeat.wakeup(agentId, {
source: "automation",
triggerDetail: "system",
reason: "issue_commented",
contextSnapshot: {
issueId,
taskId: issueId,
wakeReason: "issue_commented",
interactionKind: "request_confirmation",
interactionStatus: "accepted",
forceFreshSession: true,
workspaceRefreshReason: "accepted_plan_confirmation",
},
});
expect(run).not.toBeNull();
await vi.waitFor(async () => {
const latest = await heartbeat.getRun(run!.id);
expect(latest?.status).toBe("succeeded");
}, { timeout: 10_000 });
expect(adapterExecute).toHaveBeenCalledTimes(1);
const adapterInput = adapterExecute.mock.calls[0]?.[0] as {
runtime: { sessionId: string | null; sessionParams: Record<string, unknown> | null };
context: Record<string, unknown>;
};
expect(adapterInput.runtime.sessionId).toBeNull();
expect(adapterInput.runtime.sessionParams).toBeNull();
expect(adapterInput.context.paperclipWorkspace).toEqual(expect.objectContaining({
mode: "isolated_workspace",
strategy: "git_worktree",
}));
expect((adapterInput.context.paperclipWorkspace as { cwd: string }).cwd).not.toBe(repoRoot);
const refreshedIssue = await db
.select({
executionWorkspaceId: issues.executionWorkspaceId,
executionWorkspaceSettings: issues.executionWorkspaceSettings,
})
.from(issues)
.where(eq(issues.id, issueId))
.then((rows) => rows[0]);
expect(refreshedIssue?.executionWorkspaceId).toBeTruthy();
expect(refreshedIssue?.executionWorkspaceId).not.toBe(sharedExecutionWorkspaceId);
expect(refreshedIssue?.executionWorkspaceSettings).toMatchObject({
mode: "isolated_workspace",
});
const isolatedRows = await db
.select()
.from(executionWorkspaces)
.where(ne(executionWorkspaces.id, sharedExecutionWorkspaceId));
expect(isolatedRows).toHaveLength(1);
expect(isolatedRows[0]).toMatchObject({
mode: "isolated_workspace",
strategyType: "git_worktree",
sourceIssueId: issueId,
});
expect(isolatedRows[0]?.cwd).not.toBe(repoRoot);
}, 20_000);
});
@@ -2,11 +2,15 @@ import { randomUUID } from "node:crypto";
import { and, eq, sql } from "drizzle-orm";
import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest";
import {
activityLog,
agents,
companies,
createDb,
heartbeatRunEvents,
heartbeatRunWatchdogDecisions,
heartbeatRuns,
issueComments,
issueRecoveryActions,
issueRelations,
issues,
} from "@paperclipai/db";
@@ -94,7 +98,15 @@ describeEmbeddedPostgres("active-run output watchdog", () => {
await tempDb?.cleanup();
});
async function seedRunningRun(opts: { now: Date; ageMs: number; withOutput?: boolean; logChunk?: string }) {
async function seedRunningRun(opts: {
now: Date;
ageMs: number;
withOutput?: boolean;
logChunk?: string;
sourceStatus?: "in_progress" | "done" | "cancelled";
sourceOriginKind?: string;
sameRunTerminalEvidence?: "activity" | "comment";
}) {
const companyId = randomUUID();
const managerId = randomUUID();
const coderId = randomUUID();
@@ -103,6 +115,8 @@ describeEmbeddedPostgres("active-run output watchdog", () => {
const issuePrefix = `W${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
const startedAt = new Date(opts.now.getTime() - opts.ageMs);
const lastOutputAt = opts.withOutput ? new Date(opts.now.getTime() - 5 * 60 * 1000) : null;
const sourceStatus = opts.sourceStatus ?? "in_progress";
const terminalEvidenceAt = new Date(startedAt.getTime() + 10 * 60 * 1000);
await db.insert(companies).values({
id: companyId,
@@ -139,11 +153,14 @@ describeEmbeddedPostgres("active-run output watchdog", () => {
id: issueId,
companyId,
title: "Long running implementation",
status: "in_progress",
status: sourceStatus,
priority: "medium",
assigneeAgentId: coderId,
issueNumber: 1,
identifier: `${issuePrefix}-1`,
originKind: opts.sourceOriginKind ?? "manual",
completedAt: sourceStatus === "done" ? terminalEvidenceAt : null,
cancelledAt: sourceStatus === "cancelled" ? terminalEvidenceAt : null,
updatedAt: startedAt,
createdAt: startedAt,
});
@@ -181,6 +198,35 @@ describeEmbeddedPostgres("active-run output watchdog", () => {
.where(eq(heartbeatRuns.id, runId));
}
await db.update(issues).set({ executionRunId: runId }).where(eq(issues.id, issueId));
if (opts.sameRunTerminalEvidence === "activity") {
await db.insert(activityLog).values({
companyId,
actorType: "agent",
actorId: coderId,
agentId: coderId,
runId,
action: "issue.updated",
entityType: "issue",
entityId: issueId,
details: {
identifier: `${issuePrefix}-1`,
status: sourceStatus,
_previous: { status: "in_progress" },
},
createdAt: terminalEvidenceAt,
});
} else if (opts.sameRunTerminalEvidence === "comment") {
await db.insert(issueComments).values({
companyId,
issueId,
authorAgentId: coderId,
authorType: "agent",
createdByRunId: runId,
body: "Completed and verified.",
createdAt: terminalEvidenceAt,
updatedAt: terminalEvidenceAt,
});
}
return { companyId, managerId, coderId, issueId, runId, issuePrefix };
}
@@ -271,6 +317,211 @@ describeEmbeddedPostgres("active-run output watchdog", () => {
expect(source?.status).toBe("blocked");
});
it("folds terminal source issues with same-run durable evidence instead of creating watchdog work", async () => {
const now = new Date("2026-04-22T20:00:00.000Z");
const { companyId, coderId, issueId, runId } = await seedRunningRun({
now,
ageMs: ACTIVE_RUN_OUTPUT_CRITICAL_THRESHOLD_MS + 60_000,
sourceStatus: "done",
sameRunTerminalEvidence: "activity",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.scanSilentActiveRuns({ now, companyId });
expect(result).toMatchObject({ created: 0, folded: 1, skipped: 0 });
const evaluations = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stale_active_run_evaluation")));
expect(evaluations).toHaveLength(0);
const [run] = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, runId));
expect(run?.status).toBe("succeeded");
expect(run?.errorCode).toBeNull();
expect(run?.finishedAt?.toISOString()).toBe(now.toISOString());
expect(run?.resultJson).toMatchObject({
sourceResolvedWatchdogFold: {
sourceIssueId: issueId,
sourceIssueStatus: "done",
sameRunEvidenceKind: "activity",
evaluationIssueId: null,
evaluationIssueIdentifier: null,
cleanup: { outcome: "no_process_metadata" },
},
});
const [source] = await db.select().from(issues).where(eq(issues.id, issueId));
expect(source?.executionRunId).toBeNull();
const [agent] = await db.select().from(agents).where(eq(agents.id, coderId));
expect(agent?.status).toBe("idle");
const [decision] = await db
.select()
.from(heartbeatRunWatchdogDecisions)
.where(eq(heartbeatRunWatchdogDecisions.runId, runId));
expect(decision?.decision).toBe("dismissed_false_positive");
const [event] = await db
.select()
.from(heartbeatRunEvents)
.where(eq(heartbeatRunEvents.runId, runId));
expect(event?.message).toContain("Source-resolved watchdog fold");
});
it("still escalates terminal source issues without same-run terminal evidence", async () => {
const now = new Date("2026-04-22T20:00:00.000Z");
const { companyId, runId } = await seedRunningRun({
now,
ageMs: ACTIVE_RUN_OUTPUT_CRITICAL_THRESHOLD_MS + 60_000,
sourceStatus: "done",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.scanSilentActiveRuns({ now, companyId });
expect(result).toMatchObject({ created: 1, folded: 0 });
const [run] = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, runId));
expect(run?.status).toBe("running");
const [evaluation] = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stale_active_run_evaluation")));
expect(evaluation?.originId).toBe(runId);
expect(evaluation?.parentId).toBeNull();
});
it("still escalates when a same-run comment is followed by another actor marking the source done", async () => {
const now = new Date("2026-04-22T20:00:00.000Z");
const { companyId, issueId, runId, issuePrefix } = await seedRunningRun({
now,
ageMs: ACTIVE_RUN_OUTPUT_CRITICAL_THRESHOLD_MS + 60_000,
sourceStatus: "in_progress",
sameRunTerminalEvidence: "comment",
});
const completedAt = new Date(now.getTime() - 5 * 60_000);
await db
.update(issues)
.set({ status: "done", completedAt, updatedAt: completedAt })
.where(eq(issues.id, issueId));
await db.insert(activityLog).values({
companyId,
actorType: "user",
actorId: "board-user",
agentId: null,
runId: null,
action: "issue.updated",
entityType: "issue",
entityId: issueId,
details: {
identifier: `${issuePrefix}-1`,
status: "done",
_previous: { status: "in_progress" },
},
createdAt: completedAt,
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.scanSilentActiveRuns({ now, companyId });
expect(result).toMatchObject({ created: 1, folded: 0 });
const [run] = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, runId));
expect(run?.status).toBe("running");
const [evaluation] = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stale_active_run_evaluation")));
expect(evaluation?.originId).toBe(runId);
expect(evaluation?.parentId).toBeNull();
});
it("folds existing evaluation and active watchdog recovery action idempotently", async () => {
const now = new Date("2026-04-22T20:00:00.000Z");
const { companyId, managerId, issueId, runId, issuePrefix } = await seedRunningRun({
now,
ageMs: ACTIVE_RUN_OUTPUT_CRITICAL_THRESHOLD_MS + 60_000,
sourceStatus: "done",
sameRunTerminalEvidence: "activity",
});
const evaluationIssueId = randomUUID();
await db.insert(issues).values({
id: evaluationIssueId,
companyId,
title: "Existing stale evaluation",
status: "todo",
priority: "high",
assigneeAgentId: managerId,
issueNumber: 2,
identifier: `${issuePrefix}-2`,
originKind: "stale_active_run_evaluation",
originId: runId,
originRunId: runId,
originFingerprint: `stale_active_run:${companyId}:${runId}`,
});
await db.insert(issueRelations).values({
companyId,
issueId: evaluationIssueId,
relatedIssueId: issueId,
type: "blocks",
});
await db.insert(issueRecoveryActions).values({
companyId,
sourceIssueId: issueId,
recoveryIssueId: evaluationIssueId,
kind: "active_run_watchdog",
status: "active",
ownerType: "agent",
ownerAgentId: managerId,
cause: "active_run_watchdog",
fingerprint: `active-run-watchdog:${companyId}:${runId}:${issueId}`,
evidence: { runId },
nextAction: "Review stale active run",
});
const heartbeat = heartbeatService(db);
const first = await heartbeat.scanSilentActiveRuns({ now, companyId });
const second = await heartbeat.scanSilentActiveRuns({ now, companyId });
expect(first).toMatchObject({ created: 0, folded: 1 });
expect(second).toMatchObject({ scanned: 0, created: 0, folded: 0 });
const [evaluation] = await db.select().from(issues).where(eq(issues.id, evaluationIssueId));
expect(evaluation?.status).toBe("done");
const [run] = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, runId));
expect(run?.resultJson).toMatchObject({
sourceResolvedWatchdogFold: {
sourceIssueId: issueId,
sourceIssueStatus: "done",
evaluationIssueId,
evaluationIssueIdentifier: `${issuePrefix}-2`,
},
});
const [action] = await db.select().from(issueRecoveryActions).where(eq(issueRecoveryActions.sourceIssueId, issueId));
expect(action?.status).toBe("resolved");
expect(action?.outcome).toBe("false_positive");
const decisions = await db
.select()
.from(heartbeatRunWatchdogDecisions)
.where(eq(heartbeatRunWatchdogDecisions.runId, runId));
expect(decisions).toHaveLength(1);
});
it("refuses recovery-on-recovery stale-run recursion", async () => {
const now = new Date("2026-04-22T20:00:00.000Z");
const { companyId } = await seedRunningRun({
now,
ageMs: ACTIVE_RUN_OUTPUT_CRITICAL_THRESHOLD_MS + 60_000,
sourceOriginKind: "stale_active_run_evaluation",
});
const heartbeat = heartbeatService(db);
const result = await heartbeat.scanSilentActiveRuns({ now, companyId });
expect(result).toMatchObject({ created: 0, skipped: 1 });
const evaluations = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stale_active_run_evaluation")));
expect(evaluations).toHaveLength(1);
});
it("skips snoozed runs and healthy noisy runs", async () => {
const now = new Date("2026-04-22T20:00:00.000Z");
const stale = await seedRunningRun({
@@ -332,6 +332,82 @@ describeEmbeddedPostgres("heartbeat issue graph liveness escalation", () => {
});
});
it("treats open recovery issues as active waiting paths for non-assigned-backlog states", async () => {
await enableAutoRecovery();
const { companyId, managerId, blockedIssueId, blockerIssueId } = await seedBlockedChain();
const existingEscalationId = randomUUID();
await db.insert(issues).values({
id: existingEscalationId,
companyId,
title: "Existing liveness unblock work",
status: "todo",
priority: "high",
parentId: blockerIssueId,
assigneeAgentId: managerId,
issueNumber: 5,
identifier: `${`P${companyId.replace(/-/g, "").slice(0, 4)}`}-5`,
originKind: "harness_liveness_escalation",
originId: [
"harness_liveness",
companyId,
blockedIssueId,
"in_review_without_action_path",
blockerIssueId,
].join(":"),
});
const result = await heartbeatService(db).reconcileIssueGraphLiveness();
expect(result.findings).toBe(0);
expect(result.escalationsCreated).toBe(0);
expect(result.existingEscalations).toBe(0);
const escalations = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "harness_liveness_escalation")));
expect(escalations).toHaveLength(1);
});
it("keeps active invalid_review_participant recoveries from being retired", async () => {
await enableAutoRecovery();
const { companyId, managerId, blockedIssueId, blockerIssueId } = await seedBlockedChain();
const existingEscalationId = randomUUID();
await db.insert(issues).values({
id: existingEscalationId,
companyId,
title: "Existing invalid review participant unblock work",
status: "todo",
priority: "high",
parentId: blockedIssueId,
assigneeAgentId: managerId,
issueNumber: 5,
identifier: `${`P${companyId.replace(/-/g, "").slice(0, 4)}`}-5`,
originKind: "harness_liveness_escalation",
originId: [
"harness_liveness",
companyId,
blockedIssueId,
"invalid_review_participant",
blockerIssueId,
].join(":"),
});
const result = await heartbeatService(db).reconcileIssueGraphLiveness();
expect(result.findings).toBe(0);
expect(result.escalationsCreated).toBe(0);
expect(result.existingEscalations).toBe(0);
const escalations = await db
.select()
.from(issues)
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "harness_liveness_escalation")));
expect(escalations).toHaveLength(1);
});
it("creates one manager escalation, preserves blockers, and records owner selection", async () => {
await enableAutoRecovery();
const { companyId, managerId, blockedIssueId, blockerIssueId } = await seedBlockedChain();
@@ -1,5 +1,8 @@
import { describe, expect, it } from "vitest";
import type { AdapterModelProfileDefinition } from "../adapters/index.js";
import {
listAdapterModelProfiles,
type AdapterModelProfileDefinition,
} from "../adapters/index.js";
import {
mergeModelProfileAdapterConfig,
normalizeModelProfileWakeContext,
@@ -17,6 +20,27 @@ const cheapProfile: AdapterModelProfileDefinition = {
};
describe("heartbeat model profile application", () => {
it("uses the Codex local adapter cheap default when the agent has no runtime override", async () => {
const modelProfile = resolveModelProfileApplication({
adapterModelProfiles: await listAdapterModelProfiles("codex_local"),
agentRuntimeConfig: {},
issueModelProfile: "cheap",
contextSnapshot: {},
});
expect(modelProfile).toMatchObject({
requested: "cheap",
requestedBy: "issue_override",
applied: "cheap",
configSource: "adapter_default",
fallbackReason: null,
adapterConfig: {
model: "gpt-5.3-codex-spark",
modelReasoningEffort: "high",
},
});
});
it("applies cheap profile patches before explicit issue adapter config overrides", () => {
const modelProfile = resolveModelProfileApplication({
adapterModelProfiles: [cheapProfile],
@@ -21,4 +21,21 @@ describe("compactRunLogChunk", () => {
expect(compacted).toContain("[paperclip truncated run log chunk:");
expect(compacted.endsWith("tail")).toBe(true);
});
it("redacts Paperclip credential shapes before persisting run-log chunks", () => {
const chunk = [
"Authorization: Bearer live-bearer-token-value",
`export PAPERCLIP_API_KEY='paperclip-shell-secret'`,
`payload {"PAPERCLIP_API_KEY":"paperclip-json-secret"}`,
"--paperclip-api-key=paperclip-flag-secret",
].join("\n");
const compacted = compactRunLogChunk(chunk);
expect(compacted).toContain("***REDACTED***");
expect(compacted).not.toContain("live-bearer-token-value");
expect(compacted).not.toContain("paperclip-shell-secret");
expect(compacted).not.toContain("paperclip-json-secret");
expect(compacted).not.toContain("paperclip-flag-secret");
});
});
@@ -322,6 +322,18 @@ describe("shouldResetTaskSessionForWake", () => {
).toBe(true);
});
it("resets session context for accepted planning confirmations that refresh workspace selection", () => {
expect(
shouldResetTaskSessionForWake({
wakeReason: "issue_commented",
interactionKind: "request_confirmation",
interactionStatus: "accepted",
forceFreshSession: true,
workspaceRefreshReason: "accepted_plan_confirmation",
}),
).toBe(true);
});
it("does not reset session context on mention wake comment", () => {
expect(
shouldResetTaskSessionForWake({
@@ -106,6 +106,11 @@ function registerModuleMocks() {
syncDocument: async () => undefined,
syncIssue: async () => undefined,
}),
issueThreadInteractionService: () => ({
listForIssue: vi.fn(async () => []),
expireRequestConfirmationsSupersededByComment: vi.fn(async () => []),
expireStaleRequestConfirmationsForIssueDocument: vi.fn(async () => []),
}),
issueService: () => mockIssueService,
logActivity: mockLogActivity,
projectService: () => ({}),
@@ -8,6 +8,7 @@ const companyId = "22222222-2222-4222-8222-222222222222";
const ownerAgentId = "33333333-3333-4333-8333-333333333333";
const peerAgentId = "44444444-4444-4444-8444-444444444444";
const ownerRunId = "55555555-5555-4555-8555-555555555555";
const recoveryActionId = "77777777-7777-4777-8777-777777777777";
const mockIssueService = vi.hoisted(() => ({
addComment: vi.fn(),
@@ -62,6 +63,14 @@ const mockIssueThreadInteractionService = vi.hoisted(() => ({
}));
const mockIssueRecoveryActionService = vi.hoisted(() => ({
getActiveForIssue: vi.fn(async () => null),
resolveActiveForIssue: vi.fn(async () => null),
}));
const mockHeartbeatService = vi.hoisted(() => ({
wakeup: vi.fn(async () => undefined),
reportRunActivity: vi.fn(async () => undefined),
getRun: vi.fn(async () => null),
getActiveRunForAgent: vi.fn(async () => null),
cancelRun: vi.fn(async () => null),
}));
function registerRouteMocks() {
@@ -109,13 +118,7 @@ function registerRouteMocks() {
saveIssueVote: vi.fn(async () => ({ vote: null, consentEnabledNow: false, sharingEnabled: false })),
}),
goalService: () => ({}),
heartbeatService: () => ({
wakeup: vi.fn(async () => undefined),
reportRunActivity: vi.fn(async () => undefined),
getRun: vi.fn(async () => null),
getActiveRunForAgent: vi.fn(async () => null),
cancelRun: vi.fn(async () => null),
}),
heartbeatService: () => mockHeartbeatService,
instanceSettingsService: () => ({
get: vi.fn(async () => ({
id: "instance-settings-1",
@@ -189,13 +192,16 @@ async function createApp(actor: Record<string, unknown>) {
vi.importActual<typeof import("../middleware/index.js")>("../middleware/index.js"),
vi.importActual<typeof import("../routes/issues.js")>("../routes/issues.js"),
]);
const fakeDb = {
transaction: async (callback: (tx: Record<string, never>) => Promise<unknown>) => callback({}),
};
const app = express();
app.use(express.json());
app.use((req, _res, next) => {
(req as any).actor = actor;
next();
});
app.use("/api", issueRoutes({} as any, mockStorageService as any));
app.use("/api", issueRoutes(fakeDb as any, mockStorageService as any));
app.use(errorHandler);
return app;
}
@@ -265,6 +271,45 @@ describe("agent issue mutation checkout ownership", () => {
mockIssueService.listWakeableBlockedDependents.mockReset();
mockIssueRecoveryActionService.getActiveForIssue.mockReset();
mockIssueRecoveryActionService.getActiveForIssue.mockResolvedValue(null);
mockIssueRecoveryActionService.resolveActiveForIssue.mockReset();
mockIssueRecoveryActionService.resolveActiveForIssue.mockResolvedValue({
id: recoveryActionId,
companyId,
sourceIssueId: issueId,
recoveryIssueId: null,
kind: "issue_graph_liveness",
status: "resolved",
ownerType: "agent",
ownerAgentId,
ownerUserId: null,
previousOwnerAgentId: null,
returnOwnerAgentId: null,
cause: "issue_graph_liveness",
fingerprint: "graph-liveness:test",
evidence: {},
nextAction: "Restore a live execution path.",
wakePolicy: null,
monitorPolicy: null,
attemptCount: 1,
maxAttempts: null,
timeoutAt: null,
lastAttemptAt: new Date("2026-05-13T18:00:00.000Z"),
outcome: "restored",
resolutionNote: "Resolved by recovery owner",
resolvedAt: new Date("2026-05-13T18:05:00.000Z"),
createdAt: new Date("2026-05-13T17:55:00.000Z"),
updatedAt: new Date("2026-05-13T18:05:00.000Z"),
});
mockHeartbeatService.wakeup.mockReset();
mockHeartbeatService.wakeup.mockResolvedValue(undefined);
mockHeartbeatService.reportRunActivity.mockReset();
mockHeartbeatService.reportRunActivity.mockResolvedValue(undefined);
mockHeartbeatService.getRun.mockReset();
mockHeartbeatService.getRun.mockResolvedValue(null);
mockHeartbeatService.getActiveRunForAgent.mockReset();
mockHeartbeatService.getActiveRunForAgent.mockResolvedValue(null);
mockHeartbeatService.cancelRun.mockReset();
mockHeartbeatService.cancelRun.mockResolvedValue(null);
mockIssueService.remove.mockReset();
mockIssueService.removeAttachment.mockReset();
mockIssueService.update.mockReset();
@@ -415,6 +460,47 @@ describe("agent issue mutation checkout ownership", () => {
);
});
it("preserves committed issue updates, comments, documents, and work product writes when recovery revalidation fails", async () => {
const app = await createApp(ownerActor());
mockIssueRecoveryActionService.getActiveForIssue.mockRejectedValueOnce(new Error("revalidation read failed"));
await request(app)
.patch(`/api/issues/${issueId}`)
.send({ title: "Updated after commit" })
.expect(200);
mockIssueRecoveryActionService.getActiveForIssue.mockRejectedValueOnce(new Error("revalidation read failed"));
await request(app)
.post(`/api/issues/${issueId}/comments`)
.send({ body: "progress update" })
.expect(201);
mockIssueRecoveryActionService.getActiveForIssue.mockRejectedValueOnce(new Error("revalidation read failed"));
await request(app)
.put(`/api/issues/${issueId}/documents/plan`)
.send({ format: "markdown", body: "# updated" })
.expect(200);
mockIssueRecoveryActionService.getActiveForIssue.mockRejectedValueOnce(new Error("revalidation read failed"));
await request(app)
.patch("/api/work-products/product-1")
.send({ title: "Updated product" })
.expect(200);
expect(mockIssueService.update).toHaveBeenCalledWith(
issueId,
expect.objectContaining({ title: "Updated after commit" }),
);
expect(mockIssueService.addComment).toHaveBeenCalledWith(
issueId,
"progress update",
expect.any(Object),
expect.any(Object),
);
expect(mockDocumentService.upsertIssueDocument).toHaveBeenCalled();
expect(mockWorkProductService.update).toHaveBeenCalledWith("product-1", { title: "Updated product" });
});
it("preserves board mutations on active checkouts", async () => {
const app = await createApp(boardActor());
@@ -477,4 +563,103 @@ describe("agent issue mutation checkout ownership", () => {
title: "Claimable update",
});
});
it("rejects peer-agent status updates that would clear a recovery action they do not own", async () => {
mockIssueService.getById.mockResolvedValue(
makeIssue({ status: "blocked", assigneeAgentId: null, assigneeUserId: "board-user" }),
);
mockIssueRecoveryActionService.getActiveForIssue.mockResolvedValue({
id: recoveryActionId,
ownerAgentId,
});
const res = await request(await createApp(peerActor())).patch(`/api/issues/${issueId}`).send({ status: "todo" });
expect(res.status, JSON.stringify(res.body)).toBe(403);
expect(res.body.error).toBe("Agent cannot resolve another owner's recovery action");
expect(mockIssueService.update).not.toHaveBeenCalled();
});
it("rejects peer-agent recovery resolution on a board-owned source issue", async () => {
mockIssueService.getById.mockResolvedValue(
makeIssue({ status: "blocked", assigneeAgentId: null, assigneeUserId: "board-user" }),
);
mockIssueRecoveryActionService.getActiveForIssue.mockResolvedValue({
id: recoveryActionId,
ownerAgentId,
});
const res = await request(await createApp(peerActor()))
.post(`/api/issues/${issueId}/recovery-actions/resolve`)
.send({
actionId: recoveryActionId,
outcome: "restored",
sourceIssueStatus: "done",
});
expect(res.status, JSON.stringify(res.body)).toBe(403);
expect(res.body.error).toBe("Agent cannot resolve another owner's recovery action");
expect(mockIssueRecoveryActionService.resolveActiveForIssue).not.toHaveBeenCalled();
});
it("allows the named recovery owner to resolve a board-owned source issue", async () => {
mockIssueService.getById.mockResolvedValue(
makeIssue({ status: "blocked", assigneeAgentId: null, assigneeUserId: "board-user" }),
);
mockIssueService.update.mockImplementation(async (_id: string, patch: Record<string, unknown>) => ({
...makeIssue({ status: "blocked", assigneeAgentId: null, assigneeUserId: "board-user" }),
...patch,
}));
mockIssueRecoveryActionService.getActiveForIssue.mockResolvedValue({
id: recoveryActionId,
ownerAgentId,
});
const res = await request(await createApp(ownerActor()))
.post(`/api/issues/${issueId}/recovery-actions/resolve`)
.send({
actionId: recoveryActionId,
outcome: "restored",
sourceIssueStatus: "done",
});
expect(res.status, JSON.stringify(res.body)).toBe(200);
expect(mockIssueService.update).toHaveBeenCalled();
expect(mockIssueRecoveryActionService.resolveActiveForIssue).toHaveBeenCalled();
});
it("wakes the assigned agent when recovery resolution restores a source issue to todo", async () => {
mockIssueService.getById.mockResolvedValue(
makeIssue({ status: "blocked", assigneeAgentId: ownerAgentId }),
);
mockIssueService.update.mockImplementation(async (_id: string, patch: Record<string, unknown>) => ({
...makeIssue({ status: "blocked", assigneeAgentId: ownerAgentId }),
...patch,
}));
mockIssueRecoveryActionService.getActiveForIssue.mockResolvedValue({
id: recoveryActionId,
ownerAgentId,
});
const res = await request(await createApp(ownerActor()))
.post(`/api/issues/${issueId}/recovery-actions/resolve`)
.send({
actionId: recoveryActionId,
outcome: "restored",
sourceIssueStatus: "todo",
});
expect(res.status, JSON.stringify(res.body)).toBe(200);
expect(mockHeartbeatService.wakeup).toHaveBeenCalledWith(
ownerAgentId,
expect.objectContaining({
reason: "issue_recovery_action_restored",
payload: expect.objectContaining({
issueId,
recoveryActionId,
mutation: "recovery_action_resolution",
}),
}),
);
});
});
@@ -76,6 +76,11 @@ vi.mock("../services/index.js", () => ({
syncDocument: async () => undefined,
syncIssue: async () => undefined,
}),
issueThreadInteractionService: () => ({
listForIssue: vi.fn(async () => []),
expireRequestConfirmationsSupersededByComment: vi.fn(async () => []),
expireStaleRequestConfirmationsForIssueDocument: vi.fn(async () => []),
}),
issueService: () => mockIssueService,
logActivity: mockLogActivity,
projectService: () => ({
@@ -81,6 +81,11 @@ function registerRouteMocks() {
syncDocument: async () => undefined,
syncIssue: async () => undefined,
}),
issueThreadInteractionService: () => ({
listForIssue: vi.fn(async () => []),
expireRequestConfirmationsSupersededByComment: vi.fn(async () => []),
expireStaleRequestConfirmationsForIssueDocument: vi.fn(async () => []),
}),
issueRecoveryActionService: () => ({
getActiveForIssue: vi.fn(async () => null),
listActiveForIssues: vi.fn(async () => new Map()),
@@ -116,6 +116,11 @@ function registerServiceMocks() {
syncDocument: async () => undefined,
syncIssue: async () => undefined,
}),
issueThreadInteractionService: () => ({
listForIssue: vi.fn(async () => []),
expireRequestConfirmationsSupersededByComment: vi.fn(async () => []),
expireStaleRequestConfirmationsForIssueDocument: vi.fn(async () => []),
}),
issueRecoveryActionService: () => ({
getActiveForIssue: vi.fn(async () => null),
listActiveForIssues: vi.fn(async () => new Map()),
@@ -65,6 +65,11 @@ vi.mock("../services/index.js", () => ({
getActiveForIssue: vi.fn(async () => null),
listActiveForIssues: vi.fn(async () => new Map()),
}),
issueThreadInteractionService: () => ({
listForIssue: vi.fn(async () => []),
expireRequestConfirmationsSupersededByComment: vi.fn(async () => []),
expireStaleRequestConfirmationsForIssueDocument: vi.fn(async () => []),
}),
issueService: () => mockIssueService,
logActivity: vi.fn(async () => undefined),
projectService: () => ({
@@ -5,9 +5,13 @@ import { and, eq } from "drizzle-orm";
import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest";
import {
agents,
agentWakeupRequests,
activityLog,
companies,
createDb,
environmentLeases,
environments,
heartbeatRuns,
issueComments,
issueRecoveryActions,
issueRelations,
@@ -130,7 +134,11 @@ describeEmbeddedPostgres("issue recovery actions", () => {
afterEach(async () => {
await db.delete(issueRecoveryActions);
await db.delete(issueComments);
await db.delete(environmentLeases);
await db.delete(activityLog);
await db.delete(heartbeatRuns);
await db.delete(agentWakeupRequests);
await db.delete(environments);
await db.delete(issues);
await db.delete(agents);
await db.delete(companies);
@@ -191,6 +199,24 @@ describeEmbeddedPostgres("issue recovery actions", () => {
return { companyId, managerId, coderId, sourceIssueId, prefix, sourceIssue: sourceIssue! };
}
async function seedHeartbeatRun(input: {
companyId: string;
agentId: string;
runId: string;
issueId?: string;
status?: string;
}) {
await db.insert(heartbeatRuns).values({
id: input.runId,
companyId: input.companyId,
agentId: input.agentId,
invocationSource: "manual",
status: input.status ?? "running",
startedAt: new Date("2026-05-13T18:00:00.000Z"),
contextSnapshot: input.issueId ? { issueId: input.issueId } : undefined,
});
}
function createApp(actor: any = { type: "board", source: "local_implicit" }) {
const app = express();
app.use(express.json());
@@ -545,6 +571,390 @@ describeEmbeddedPostgres("issue recovery actions", () => {
);
});
it("resolves an active recovery action by returning the source issue to todo", async () => {
const { companyId, managerId, sourceIssueId } = await seedCompany();
await db.update(issues).set({ status: "blocked" }).where(eq(issues.id, sourceIssueId));
const recoveryActionSvc = issueRecoveryActionService(db);
const action = await recoveryActionSvc.upsertSourceScoped({
companyId,
sourceIssueId,
kind: "issue_graph_liveness",
ownerType: "agent",
ownerAgentId: managerId,
cause: "issue_graph_liveness",
fingerprint: "graph-liveness:try-again",
evidence: { latestIssueStatus: "blocked" },
nextAction: "Restore a live execution path.",
wakePolicy: { type: "manual" },
});
const app = createApp();
const resolved = await request(app)
.post(`/api/issues/${sourceIssueId}/recovery-actions/resolve`)
.send({
actionId: action.id,
outcome: "restored",
sourceIssueStatus: "todo",
resolutionNote: "Try the source issue again.",
})
.expect(200);
expect(resolved.body.issue).toMatchObject({
id: sourceIssueId,
status: "todo",
activeRecoveryAction: null,
});
expect(resolved.body.recoveryAction).toMatchObject({
id: action.id,
status: "resolved",
outcome: "restored",
resolutionNote: "Try the source issue again.",
});
expect(await recoveryActionSvc.getActiveForIssue(companyId, sourceIssueId)).toBeNull();
});
it("marks a recovery action stale when a blocked source issue is manually moved to todo", async () => {
const { companyId, managerId, sourceIssueId } = await seedCompany();
await db
.update(issues)
.set({ status: "blocked", assigneeAgentId: null, assigneeUserId: "board-user" })
.where(eq(issues.id, sourceIssueId));
const recoveryActionSvc = issueRecoveryActionService(db);
const action = await recoveryActionSvc.upsertSourceScoped({
companyId,
sourceIssueId,
kind: "issue_graph_liveness",
ownerType: "agent",
ownerAgentId: managerId,
cause: "issue_graph_liveness",
fingerprint: "graph-liveness:manual-restore",
evidence: { latestIssueStatus: "blocked" },
nextAction: "Restore a live execution path.",
wakePolicy: { type: "manual" },
});
const app = createApp();
const patched = await request(app)
.patch(`/api/issues/${sourceIssueId}`)
.send({ status: "todo" })
.expect(200);
expect(patched.body).toMatchObject({
id: sourceIssueId,
status: "todo",
activeRecoveryAction: null,
});
const [actionRow] = await db
.select()
.from(issueRecoveryActions)
.where(eq(issueRecoveryActions.id, action.id));
expect(actionRow).toMatchObject({
status: "cancelled",
outcome: "cancelled",
resolutionNote: "Recovery action became stale because the source issue was manually moved from blocked to todo.",
});
expect(actionRow?.resolvedAt).toBeTruthy();
expect(await recoveryActionSvc.getActiveForIssue(companyId, sourceIssueId)).toBeNull();
const detail = await request(app).get(`/api/issues/${sourceIssueId}`).expect(200);
expect(detail.body.activeRecoveryAction).toBeNull();
const activityRows = await db
.select()
.from(activityLog)
.where(eq(activityLog.entityId, sourceIssueId));
expect(activityRows.map((row) => row.action)).toEqual(
expect.arrayContaining(["issue.updated", "issue.recovery_action_resolved"]),
);
expect(activityRows.find((row) => row.action === "issue.recovery_action_resolved")?.details).toMatchObject({
source: "source_revalidation",
trigger: "issue_update",
});
});
it("folds stale recovery during read projection after the source issue reaches done", async () => {
const { companyId, managerId, sourceIssueId } = await seedCompany();
const recoveryActionSvc = issueRecoveryActionService(db);
const action = await recoveryActionSvc.upsertSourceScoped({
companyId,
sourceIssueId,
kind: "issue_graph_liveness",
ownerType: "agent",
ownerAgentId: managerId,
cause: "issue_graph_liveness",
fingerprint: "graph-liveness:done-projection",
evidence: { latestIssueStatus: "in_progress" },
nextAction: "Restore a live execution path.",
wakePolicy: { type: "manual" },
});
await db.update(issues).set({ status: "done" }).where(eq(issues.id, sourceIssueId));
const app = createApp();
const detail = await request(app).get(`/api/issues/${sourceIssueId}`).expect(200);
expect(detail.body).toMatchObject({
id: sourceIssueId,
status: "done",
activeRecoveryAction: null,
});
const [actionRow] = await db
.select()
.from(issueRecoveryActions)
.where(eq(issueRecoveryActions.id, action.id));
expect(actionRow).toMatchObject({
status: "cancelled",
outcome: "cancelled",
resolutionNote: "Recovery action became stale because the source issue reached done.",
});
expect(actionRow?.resolvedAt).toBeTruthy();
const activityRows = await db
.select()
.from(activityLog)
.where(eq(activityLog.entityId, sourceIssueId));
expect(activityRows.find((row) => row.action === "issue.recovery_action_resolved")?.details).toMatchObject({
source: "source_revalidation",
trigger: "read_projection",
recoveryActionId: action.id,
});
});
it("keeps active recovery visible when a plain comment does not create a live path", async () => {
const { companyId, managerId, sourceIssueId } = await seedCompany();
await db
.update(issues)
.set({ assigneeAgentId: null, assigneeUserId: "board-user" })
.where(eq(issues.id, sourceIssueId));
const recoveryActionSvc = issueRecoveryActionService(db);
const action = await recoveryActionSvc.upsertSourceScoped({
companyId,
sourceIssueId,
kind: "issue_graph_liveness",
ownerType: "agent",
ownerAgentId: managerId,
cause: "issue_graph_liveness",
fingerprint: "graph-liveness:plain-comment",
evidence: { latestIssueStatus: "in_progress" },
nextAction: "Restore a live execution path.",
wakePolicy: { type: "manual" },
});
const app = createApp();
await request(app)
.post(`/api/issues/${sourceIssueId}/comments`)
.send({ body: "I am looking at this, but not changing the disposition." })
.expect(201);
expect(await recoveryActionSvc.getActiveForIssue(companyId, sourceIssueId)).toMatchObject({
id: action.id,
status: "active",
});
const detail = await request(app).get(`/api/issues/${sourceIssueId}`).expect(200);
expect(detail.body.activeRecoveryAction).toMatchObject({ id: action.id });
});
it("folds stale recovery when a structured resume comment restores todo dispatch", async () => {
const { companyId, managerId, sourceIssueId } = await seedCompany();
await db
.update(issues)
.set({ status: "blocked", assigneeAgentId: null, assigneeUserId: "board-user" })
.where(eq(issues.id, sourceIssueId));
const recoveryActionSvc = issueRecoveryActionService(db);
const action = await recoveryActionSvc.upsertSourceScoped({
companyId,
sourceIssueId,
kind: "issue_graph_liveness",
ownerType: "agent",
ownerAgentId: managerId,
cause: "issue_graph_liveness",
fingerprint: "graph-liveness:resume-comment",
evidence: { latestIssueStatus: "blocked" },
nextAction: "Restore a live execution path.",
wakePolicy: { type: "manual" },
});
const app = createApp();
await request(app)
.post(`/api/issues/${sourceIssueId}/comments`)
.send({ body: "Resume this now.", resume: true })
.expect(201);
const [sourceIssue] = await db.select().from(issues).where(eq(issues.id, sourceIssueId));
expect(sourceIssue?.status).toBe("todo");
const [actionRow] = await db
.select()
.from(issueRecoveryActions)
.where(eq(issueRecoveryActions.id, action.id));
expect(actionRow).toMatchObject({
status: "cancelled",
outcome: "cancelled",
resolutionNote: "Recovery action became stale because the source issue was manually moved from blocked to todo.",
});
expect(await recoveryActionSvc.getActiveForIssue(companyId, sourceIssueId)).toBeNull();
const activityRows = await db
.select()
.from(activityLog)
.where(eq(activityLog.entityId, sourceIssueId));
expect(activityRows.find((row) => row.action === "issue.recovery_action_resolved")?.details).toMatchObject({
source: "source_revalidation",
trigger: "comment",
recoveryActionId: action.id,
});
});
it("rejects peer-agent source issue updates that would hide another owner's recovery action", async () => {
const { companyId, managerId, coderId, sourceIssueId } = await seedCompany();
await db
.update(issues)
.set({ status: "blocked", assigneeAgentId: null, assigneeUserId: "board-user" })
.where(eq(issues.id, sourceIssueId));
const recoveryActionSvc = issueRecoveryActionService(db);
const action = await recoveryActionSvc.upsertSourceScoped({
companyId,
sourceIssueId,
kind: "issue_graph_liveness",
ownerType: "agent",
ownerAgentId: managerId,
cause: "issue_graph_liveness",
fingerprint: "graph-liveness:peer-status-update",
evidence: { latestIssueStatus: "blocked" },
nextAction: "Restore a live execution path.",
wakePolicy: { type: "manual" },
});
const app = createApp({
type: "agent",
agentId: coderId,
companyId,
runId: randomUUID(),
source: "agent_jwt",
});
await request(app)
.patch(`/api/issues/${sourceIssueId}`)
.send({ status: "todo" })
.expect(403);
const [sourceIssue] = await db.select().from(issues).where(eq(issues.id, sourceIssueId));
expect(sourceIssue?.status).toBe("blocked");
const [actionRow] = await db
.select()
.from(issueRecoveryActions)
.where(eq(issueRecoveryActions.id, action.id));
expect(actionRow).toMatchObject({
status: "active",
outcome: null,
resolvedAt: null,
});
});
it("rejects peer-agent recovery action resolution on a board-owned source issue", async () => {
const { companyId, managerId, coderId, sourceIssueId } = await seedCompany();
await db
.update(issues)
.set({ status: "blocked", assigneeAgentId: null, assigneeUserId: "board-user" })
.where(eq(issues.id, sourceIssueId));
const recoveryActionSvc = issueRecoveryActionService(db);
const action = await recoveryActionSvc.upsertSourceScoped({
companyId,
sourceIssueId,
kind: "issue_graph_liveness",
ownerType: "agent",
ownerAgentId: managerId,
cause: "issue_graph_liveness",
fingerprint: "graph-liveness:peer-resolution",
evidence: { latestIssueStatus: "blocked" },
nextAction: "Restore a live execution path.",
wakePolicy: { type: "manual" },
});
const app = createApp({
type: "agent",
agentId: coderId,
companyId,
runId: randomUUID(),
source: "agent_jwt",
});
await request(app)
.post(`/api/issues/${sourceIssueId}/recovery-actions/resolve`)
.send({
actionId: action.id,
outcome: "restored",
sourceIssueStatus: "done",
resolutionNote: "Peer agent should not be able to clear this recovery.",
})
.expect(403);
const [sourceIssue] = await db.select().from(issues).where(eq(issues.id, sourceIssueId));
expect(sourceIssue?.status).toBe("blocked");
const [actionRow] = await db
.select()
.from(issueRecoveryActions)
.where(eq(issueRecoveryActions.id, action.id));
expect(actionRow).toMatchObject({
status: "active",
outcome: null,
resolvedAt: null,
});
});
it("allows the named recovery owner to resolve a board-owned source recovery action", async () => {
const { companyId, managerId, sourceIssueId } = await seedCompany();
await db
.update(issues)
.set({ status: "blocked", assigneeAgentId: null, assigneeUserId: "board-user" })
.where(eq(issues.id, sourceIssueId));
const recoveryActionSvc = issueRecoveryActionService(db);
const action = await recoveryActionSvc.upsertSourceScoped({
companyId,
sourceIssueId,
kind: "issue_graph_liveness",
ownerType: "agent",
ownerAgentId: managerId,
cause: "issue_graph_liveness",
fingerprint: "graph-liveness:owner-resolution",
evidence: { latestIssueStatus: "blocked" },
nextAction: "Restore a live execution path.",
wakePolicy: { type: "manual" },
});
const runId = randomUUID();
const app = createApp({
type: "agent",
agentId: managerId,
companyId,
runId,
source: "agent_jwt",
});
await seedHeartbeatRun({
companyId,
agentId: managerId,
runId,
issueId: sourceIssueId,
});
const resolved = await request(app)
.post(`/api/issues/${sourceIssueId}/recovery-actions/resolve`)
.send({
actionId: action.id,
outcome: "restored",
sourceIssueStatus: "done",
resolutionNote: "Recovery owner verified the work was intentionally completed.",
})
.expect(200);
expect(resolved.body.issue).toMatchObject({
id: sourceIssueId,
status: "done",
activeRecoveryAction: null,
});
expect(resolved.body.recoveryAction).toMatchObject({
id: action.id,
status: "resolved",
outcome: "restored",
});
});
it("rejects blocked recovery resolution when the source issue has no first-class blockers", async () => {
const { companyId, managerId, sourceIssueId } = await seedCompany();
const recoveryActionSvc = issueRecoveryActionService(db);
@@ -58,6 +58,11 @@ function registerModuleMocks() {
syncDocument: async () => undefined,
syncIssue: async () => undefined,
}),
issueThreadInteractionService: () => ({
listForIssue: vi.fn(async () => []),
expireRequestConfirmationsSupersededByComment: vi.fn(async () => []),
expireStaleRequestConfirmationsForIssueDocument: vi.fn(async () => []),
}),
issueRecoveryActionService: () => ({
getActiveForIssue: vi.fn(async () => null),
listActiveForIssues: vi.fn(async () => new Map()),
@@ -106,6 +106,7 @@ function createIssue(overrides: Record<string, unknown> = {}) {
id: "aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa",
companyId: "company-1",
status: "in_progress",
workMode: "standard",
priority: "medium",
projectId: null,
goalId: null,
@@ -481,6 +482,57 @@ describe.sequential("issue thread interaction routes", () => {
);
});
it("forces a fresh workspace-aware session when accepting a planning confirmation", async () => {
mockIssueService.getById.mockResolvedValueOnce(createIssue({ workMode: "planning" }));
mockInteractionService.acceptInteraction.mockResolvedValueOnce({
interaction: {
id: "interaction-plan",
companyId: "company-1",
issueId: "aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa",
kind: "request_confirmation",
status: "accepted",
continuationPolicy: "wake_assignee_on_accept",
idempotencyKey: "confirmation:issue:plan:revision",
sourceCommentId: null,
sourceRunId: "run-plan",
payload: {
version: 1,
prompt: "Approve this plan?",
},
result: {
version: 1,
outcome: "accepted",
},
createdAt: "2026-04-20T12:00:00.000Z",
updatedAt: "2026-04-20T12:05:00.000Z",
resolvedAt: "2026-04-20T12:05:00.000Z",
},
createdIssues: [],
});
const app = await createApp();
const res = await request(app)
.post("/api/issues/aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa/interactions/interaction-plan/accept")
.send({});
expect(res.status).toBe(200);
expect(mockHeartbeatService.wakeup).toHaveBeenCalledTimes(1);
expect(mockHeartbeatService.wakeup).toHaveBeenCalledWith(
ASSIGNEE_AGENT_ID,
expect.objectContaining({
reason: "issue_commented",
contextSnapshot: expect.objectContaining({
issueId: "aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa",
interactionId: "interaction-plan",
interactionKind: "request_confirmation",
interactionStatus: "accepted",
forceFreshSession: true,
workspaceRefreshReason: "accepted_plan_confirmation",
}),
}),
);
});
it("wakes the returned agent when accepting an agent-authored confirmation from a board review assignee", async () => {
mockIssueService.getById.mockResolvedValueOnce(createIssue({
status: "in_review",
@@ -119,6 +119,11 @@ function registerRouteMocks() {
getActiveForIssue: vi.fn(async () => null),
listActiveForIssues: vi.fn(async () => new Map()),
}),
issueThreadInteractionService: () => ({
listForIssue: vi.fn(async () => []),
expireRequestConfirmationsSupersededByComment: vi.fn(async () => []),
expireStaleRequestConfirmationsForIssueDocument: vi.fn(async () => []),
}),
issueService: () => mockIssueService,
logActivity: mockLogActivity,
projectService: () => ({}),
@@ -115,6 +115,11 @@ vi.mock("../services/index.js", () => ({
getActiveForIssue: vi.fn(async () => null),
listActiveForIssues: vi.fn(async () => new Map()),
}),
issueThreadInteractionService: () => ({
listForIssue: vi.fn(async () => []),
expireRequestConfirmationsSupersededByComment: vi.fn(async () => []),
expireStaleRequestConfirmationsForIssueDocument: vi.fn(async () => []),
}),
issueReferenceService: () => mockIssueReferenceService,
issueService: () => mockIssueService,
logActivity: mockLogActivity,
@@ -2408,6 +2408,52 @@ describeEmbeddedPostgres("issueService blockers and dependency wake readiness",
});
});
it("unblocks a source issue when a liveness escalation recovery issue is marked done", async () => {
const companyId = randomUUID();
await db.insert(companies).values({
id: companyId,
name: "Paperclip",
issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`,
requireBoardApprovalForNewAgents: false,
});
const sourceIssueId = randomUUID();
const recoveryIssueId = randomUUID();
await db.insert(issues).values([
{
id: sourceIssueId,
companyId,
title: "Source issue",
status: "blocked",
priority: "medium",
},
{
id: recoveryIssueId,
companyId,
title: "Liveness escalation issue",
status: "in_progress",
priority: "high",
originKind: "harness_liveness_escalation",
originId: `harness_liveness:${companyId}:${sourceIssueId}:invalid_review_participant:none`,
},
]);
await svc.update(sourceIssueId, {
blockedByIssueIds: [recoveryIssueId],
});
await expect(svc.getRelationSummaries(sourceIssueId)).resolves.toMatchObject({
blockedBy: [expect.objectContaining({ id: recoveryIssueId })],
});
await svc.update(recoveryIssueId, {
status: "done",
});
await expect(svc.getRelationSummaries(sourceIssueId)).resolves.toMatchObject({
blockedBy: [],
});
});
it("rejects execution when unresolved blockers remain", async () => {
const companyId = randomUUID();
const assigneeAgentId = randomUUID();
@@ -219,6 +219,14 @@ describe("plugin local folders", () => {
expect(leftovers.filter((name) => name.includes(".paperclip-"))).toEqual([]);
});
it("creates missing nested parent directories for atomic writes", async () => {
const root = await makeRoot();
await writePluginLocalFolderTextAtomic(root, "cases/active/smoke/README.md", "hello");
await expect(readPluginLocalFolderText(root, "cases/active/smoke/README.md")).resolves.toBe("hello");
});
it("returns the real folder key after deleting a file", async () => {
const root = await makeRoot();
await fs.writeFile(path.join(root, "stale.md"), "delete me", "utf8");
+4
View File
@@ -70,7 +70,9 @@ describe("redaction", () => {
const input = [
"Authorization: Bearer live-bearer-token-value",
`payload {"apiKey":"json-secret-value"}`,
`paperclip {"PAPERCLIP_API_KEY":"paperclip-json-secret"}`,
`escaped {\\"apiKey\\":\\"escaped-json-secret\\"}`,
`export PAPERCLIP_API_KEY='paperclip-shell-secret'`,
`GITHUB_TOKEN=${githubToken}`,
`session=${jwt}`,
].join("\n");
@@ -80,7 +82,9 @@ describe("redaction", () => {
expect(result).toContain(REDACTED_EVENT_VALUE);
expect(result).not.toContain("live-bearer-token-value");
expect(result).not.toContain("json-secret-value");
expect(result).not.toContain("paperclip-json-secret");
expect(result).not.toContain("escaped-json-secret");
expect(result).not.toContain("paperclip-shell-secret");
expect(result).not.toContain(githubToken);
expect(result).not.toContain(jwt);
});
+39 -8
View File
@@ -1,19 +1,49 @@
import { redactCommandText } from "@paperclipai/adapter-utils";
const SECRET_PAYLOAD_KEY_RE =
/(api[-_]?key|access[-_]?token|auth(?:_?token)?|authorization|bearer|secret|passwd|password|credential|jwt|private[-_]?key|cookie|connectionstring)/i;
const SECRET_FIELD_NAME_PATTERN =
String.raw`[A-Za-z0-9_-]*(?:api[-_]?key|access[-_]?token|auth(?:_?token)?|token|authorization|bearer|secret|passwd|password|credential|jwt|private[-_]?key|cookie|connectionstring)[A-Za-z0-9_-]*`;
const SECRET_PAYLOAD_KEY_RE = new RegExp(SECRET_FIELD_NAME_PATTERN, "i");
const COMMAND_PAYLOAD_KEY_RE =
/(^command$|^cmd$|command[-_]?line|resolved[-_]?command|PAPERCLIP_RESOLVED_COMMAND)/i;
const COMMAND_ARGS_PAYLOAD_KEY_RE = /^(commandArgs|command_?args|argv)$/i;
const JWT_VALUE_RE = /^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+(?:\.[A-Za-z0-9_-]+)?$/;
const CLI_SECRET_FLAG_RE =
/^-{1,2}(?:api[-_]?key|(?:access[-_]?|auth[-_]?)?token|token|authorization|bearer|secret|passwd|password|credential|jwt|private[-_]?key|cookie|connectionstring)$/i;
const JSON_SECRET_FIELD_TEXT_RE =
/((?:"|')?(?:api[-_]?key|access[-_]?token|auth(?:_?token)?|authorization|bearer|secret|passwd|password|credential|jwt|private[-_]?key|cookie|connectionstring)(?:"|')?\s*:\s*(?:"|'))[^"'`\r\n]+((?:"|'))/gi;
const ESCAPED_JSON_SECRET_FIELD_TEXT_RE =
/((?:\\")?(?:api[-_]?key|access[-_]?token|auth(?:_?token)?|authorization|bearer|secret|passwd|password|credential|jwt|private[-_]?key|cookie|connectionstring)(?:\\")?\s*:\s*(?:\\"))[^\\\r\n]+((?:\\"))/gi;
const CLI_SECRET_FLAG_RE = new RegExp(String.raw`^-{1,2}${SECRET_FIELD_NAME_PATTERN}$`, "i");
const JSON_SECRET_FIELD_TEXT_RE = new RegExp(
String.raw`((?:"|')?${SECRET_FIELD_NAME_PATTERN}(?:"|')?\s*:\s*(?:"|'))[^"'` + "`" + String.raw`\r\n]+((?:"|'))`,
"gi",
);
const ESCAPED_JSON_SECRET_FIELD_TEXT_RE = new RegExp(
String.raw`((?:\\")?${SECRET_FIELD_NAME_PATTERN}(?:\\")?\s*:\s*(?:\\"))[^\\\r\n]+((?:\\"))`,
"gi",
);
const SECRET_TEXT_HINTS = [
"api",
"key",
"token",
"auth",
"bearer",
"secret",
"pass",
"credential",
"jwt",
"private",
"cookie",
"connectionstring",
"sk-",
"ghp_",
"gho_",
"ghu_",
"ghs_",
"ghr_",
] as const;
export const REDACTED_EVENT_VALUE = "***REDACTED***";
function maybeContainsSecretText(input: string) {
const lower = input.toLowerCase();
return SECRET_TEXT_HINTS.some((hint) => lower.includes(hint)) || input.includes(".");
}
function isPlainObject(value: unknown): value is Record<string, unknown> {
if (typeof value !== "object" || value === null || Array.isArray(value)) return false;
const proto = Object.getPrototypeOf(value);
@@ -94,6 +124,7 @@ export function redactEventPayload(payload: Record<string, unknown> | null): Rec
}
export function redactSensitiveText(input: string): string {
if (!maybeContainsSecretText(input)) return input;
return redactCommandText(
input
.replace(JSON_SECRET_FIELD_TEXT_RE, `$1${REDACTED_EVENT_VALUE}$2`)
+412 -7
View File
@@ -117,6 +117,13 @@ const updateIssueRouteSchema = updateIssueSchema.extend({
type ParsedExecutionState = NonNullable<ReturnType<typeof parseIssueExecutionState>>;
type NormalizedExecutionPolicy = NonNullable<ReturnType<typeof normalizeIssueExecutionPolicy>>;
type IssueRouteSnapshot = typeof issueRows.$inferSelect;
type RecoveryRevalidationTrigger =
| "issue_update"
| "comment"
| "document"
| "work_product"
| "read_projection";
type CompanySearchService = {
search(companyId: string, query: CompanySearchQuery): Promise<CompanySearchResponse>;
};
@@ -636,6 +643,8 @@ function queueResolvedInteractionContinuationWakeup(input: {
};
actor: { actorType: "user" | "agent"; actorId: string };
source: string;
forceFreshSession?: boolean;
workspaceRefreshReason?: string | null;
}) {
if (
input.interaction.continuationPolicy !== "wake_assignee"
@@ -648,6 +657,8 @@ function queueResolvedInteractionContinuationWakeup(input: {
if (input.interaction.status === "expired") return;
if (!input.issue.assigneeAgentId || isClosedIssueStatus(input.issue.status)) return;
const forceFreshSession = input.forceFreshSession === true;
const workspaceRefreshReason = readNonEmptyString(input.workspaceRefreshReason);
void input.heartbeat.wakeup(input.issue.assigneeAgentId, {
source: "automation",
triggerDetail: "system",
@@ -673,6 +684,8 @@ function queueResolvedInteractionContinuationWakeup(input: {
sourceRunId: input.interaction.sourceRunId ?? null,
wakeReason: "issue_commented",
source: input.source,
...(forceFreshSession ? { forceFreshSession: true } : {}),
...(workspaceRefreshReason ? { workspaceRefreshReason } : {}),
},
}).catch((err) => logger.warn({
err,
@@ -843,6 +856,7 @@ export function issueRoutes(
const workProductsSvc = workProductService(db);
const documentsSvc = documentService(db);
const issueReferencesSvc = issueReferenceService(db);
const issueThreadInteractionsSvc = issueThreadInteractionService(db);
const routinesSvc = routineService(db, {
pluginWorkerManager: opts.pluginWorkerManager,
});
@@ -857,6 +871,182 @@ export function issueRoutes(
};
const feedbackExportService = opts?.feedbackExportService;
const environmentsSvc = environmentService(db);
async function classifySourceRecoveryRevalidation(input: {
issue: IssueRouteSnapshot;
trigger: RecoveryRevalidationTrigger;
statusChanged?: boolean;
assigneeChanged?: boolean;
blockersChanged?: boolean;
executionPolicyChanged?: boolean;
monitorChanged?: boolean;
documentChanged?: boolean;
workProductChanged?: boolean;
resumeRequested?: boolean;
reopened?: boolean;
blockedToTodoRecovery?: boolean;
}): Promise<string | null> {
const { issue } = input;
if (issue.status === "done" || issue.status === "cancelled") {
return `Recovery action became stale because the source issue reached ${issue.status}.`;
}
if (input.blockedToTodoRecovery === true) {
return "Recovery action became stale because the source issue was manually moved from blocked to todo.";
}
if (input.trigger === "read_projection") return null;
if (
input.trigger === "comment" &&
input.resumeRequested !== true &&
input.reopened !== true &&
input.statusChanged !== true
) {
return null;
}
const durableSourceChange =
input.statusChanged === true ||
input.assigneeChanged === true ||
input.blockersChanged === true ||
input.executionPolicyChanged === true ||
input.monitorChanged === true ||
input.documentChanged === true ||
input.workProductChanged === true ||
input.resumeRequested === true ||
input.reopened === true;
if (!durableSourceChange) return null;
if (issue.status === "blocked") {
const readiness = await svc.getDependencyReadiness(issue.id);
if (readiness.unresolvedBlockerCount > 0) {
return "Recovery action became stale because the source issue now has unresolved first-class blockers.";
}
return null;
}
if (issue.assigneeUserId && issue.status !== "done" && issue.status !== "cancelled") {
return "Recovery action became stale because the source issue now has a human owner.";
}
if ((issue.status === "todo" || issue.status === "in_progress") && issue.assigneeAgentId) {
return `Recovery action became stale because the source issue is ${issue.status} with an agent owner.`;
}
if (issue.status === "in_review") {
const executionState = parseIssueExecutionState(issue.executionState);
const participant = executionState?.status === "pending" ? executionState.currentParticipant : null;
if (
(participant?.type === "agent" && readNonEmptyString(participant.agentId)) ||
(participant?.type === "user" && readNonEmptyString(participant.userId))
) {
return "Recovery action became stale because the source issue now has a typed review participant.";
}
const interactions = await issueThreadInteractionsSvc.listForIssue(issue.id);
if (interactions.some((interaction) => interaction.status === "pending")) {
return "Recovery action became stale because the source issue now has a pending issue interaction.";
}
const approvals = await issueApprovalsSvc.listApprovalsForIssue(issue.id);
if (approvals.some((approval) => approval.status === "pending" || approval.status === "revision_requested")) {
return "Recovery action became stale because the source issue now has a pending approval.";
}
}
const monitor = summarizeIssueMonitor(issue, normalizeIssueExecutionPolicy(issue.executionPolicy ?? null));
if (monitor.nextCheckAt && Date.parse(monitor.nextCheckAt) > Date.now()) {
return "Recovery action became stale because the source issue now has a scheduled monitor.";
}
return null;
}
async function revalidateActiveSourceRecovery(input: {
issue: IssueRouteSnapshot;
trigger: RecoveryRevalidationTrigger;
actor?: ReturnType<typeof getActorInfo> | null;
activeRecoveryAction?: Awaited<ReturnType<typeof recoveryActionsSvc.getActiveForIssue>> | null;
statusChanged?: boolean;
assigneeChanged?: boolean;
blockersChanged?: boolean;
executionPolicyChanged?: boolean;
monitorChanged?: boolean;
documentChanged?: boolean;
workProductChanged?: boolean;
resumeRequested?: boolean;
reopened?: boolean;
blockedToTodoRecovery?: boolean;
}) {
const activeRecoveryAction =
input.activeRecoveryAction === undefined
? await recoveryActionsSvc.getActiveForIssue(input.issue.companyId, input.issue.id)
: input.activeRecoveryAction;
if (!activeRecoveryAction) return null;
const resolutionNote = await classifySourceRecoveryRevalidation(input);
if (!resolutionNote) return activeRecoveryAction;
const resolved = await recoveryActionsSvc.resolveActiveForIssue({
companyId: input.issue.companyId,
sourceIssueId: input.issue.id,
actionId: activeRecoveryAction.id,
status: "cancelled",
outcome: "cancelled",
resolutionNote,
});
if (!resolved) return activeRecoveryAction;
const actor = input.actor;
await logActivity(db, {
companyId: input.issue.companyId,
actorType: actor?.actorType ?? "system",
actorId: actor?.actorId ?? "system",
agentId: actor?.agentId ?? null,
runId: actor?.runId ?? null,
action: "issue.recovery_action_resolved",
entityType: "issue",
entityId: input.issue.id,
details: {
identifier: input.issue.identifier,
recoveryActionId: resolved.id,
recoveryActionStatus: resolved.status,
outcome: resolved.outcome,
sourceIssueStatus: input.issue.status,
resolutionNote: resolved.resolutionNote,
source: "source_revalidation",
trigger: input.trigger,
},
});
return null;
}
async function revalidateActiveSourceRecoveryForRead(input: Parameters<typeof revalidateActiveSourceRecovery>[0]) {
try {
return await revalidateActiveSourceRecovery(input);
} catch (err) {
logger.warn(
{ err, issueId: input.issue.id, trigger: input.trigger },
"failed to revalidate recovery action during read projection",
);
return input.activeRecoveryAction ?? null;
}
}
async function revalidateActiveSourceRecoveryAfterCommittedWrite(
input: Parameters<typeof revalidateActiveSourceRecovery>[0],
) {
try {
return await revalidateActiveSourceRecovery(input);
} catch (err) {
logger.warn(
{ err, issueId: input.issue.id, trigger: input.trigger },
"failed to revalidate recovery action after committed issue write",
);
return input.activeRecoveryAction ?? null;
}
}
function withContentPath<T extends { id: string }>(attachment: T) {
return {
...attachment,
@@ -1240,6 +1430,51 @@ export function issueRoutes(
return false;
}
async function assertRecoveryActionAuthority(
req: Request,
res: Response,
issue: { id: string; companyId: string; assigneeAgentId: string | null },
activeRecoveryAction: Awaited<ReturnType<typeof recoveryActionsSvc.getActiveForIssue>>,
input: { source: "issue_update" | "recovery_action_resolution" },
) {
if (req.actor.type !== "agent") return true;
if (!activeRecoveryAction) return true;
const actorAgentId = req.actor.agentId;
if (!actorAgentId) {
res.status(403).json({ error: "Agent authentication required" });
return false;
}
if (issue.assigneeAgentId === actorAgentId) return true;
if (
issue.assigneeAgentId &&
await hasActiveCheckoutManagementOverride(actorAgentId, issue.companyId, issue.assigneeAgentId)
) {
return true;
}
if (activeRecoveryAction.ownerAgentId === actorAgentId) return true;
if (
activeRecoveryAction.ownerAgentId &&
await hasActiveCheckoutManagementOverride(actorAgentId, issue.companyId, activeRecoveryAction.ownerAgentId)
) {
return true;
}
res.status(403).json({
error: "Agent cannot resolve another owner's recovery action",
details: {
issueId: issue.id,
recoveryActionId: activeRecoveryAction.id,
actorAgentId,
assigneeAgentId: issue.assigneeAgentId,
recoveryOwnerAgentId: activeRecoveryAction.ownerAgentId,
source: input.source,
securityPrinciples: ["Least Privilege", "Complete Mediation", "Secure Defaults"],
},
});
return false;
}
async function resolveActiveIssueRun(issue: {
id: string;
assigneeAgentId: string | null;
@@ -1512,6 +1747,19 @@ export function issueRoutes(
listSuccessfulRunHandoffStates(db, companyId, issueIds),
recoveryActionsSvc.listActiveForIssues(companyId, issueIds),
]);
const actor = getActorInfo(req);
await Promise.all(result.map(async (issue) => {
const activeRecoveryAction = recoveryActionByIssue.get(issue.id) ?? null;
if (!activeRecoveryAction) return;
const revalidated = await revalidateActiveSourceRecoveryForRead({
issue,
trigger: "read_projection",
actor,
activeRecoveryAction,
});
if (revalidated) recoveryActionByIssue.set(issue.id, revalidated);
else recoveryActionByIssue.delete(issue.id);
}));
res.json(result.map((issue) => ({
...issue,
successfulRunHandoff: handoffStates.get(issue.id) ?? null,
@@ -1668,6 +1916,12 @@ export function issueRoutes(
relations,
recoveryActionsByRelationIssue,
);
const revalidatedActiveRecoveryAction = await revalidateActiveSourceRecoveryForRead({
issue,
trigger: "read_projection",
actor: getActorInfo(req),
activeRecoveryAction,
});
res.json({
issue: {
@@ -1680,7 +1934,7 @@ export function issueRoutes(
...(blockerAttention ? { blockerAttention } : {}),
productivityReview,
scheduledRetry,
activeRecoveryAction,
activeRecoveryAction: revalidatedActiveRecoveryAction,
priority: issue.priority,
projectId: issue.projectId,
goalId: goal?.id ?? issue.goalId,
@@ -1786,6 +2040,12 @@ export function issueRoutes(
relations,
recoveryActionsByRelationIssue,
);
const revalidatedActiveRecoveryAction = await revalidateActiveSourceRecoveryForRead({
issue,
trigger: "read_projection",
actor: getActorInfo(req),
activeRecoveryAction,
});
const mentionedProjects = mentionedProjectIds.length > 0
? await projectsSvc.listByIds(issue.companyId, mentionedProjectIds)
: [];
@@ -1801,7 +2061,7 @@ export function issueRoutes(
productivityReview,
successfulRunHandoff: successfulRunHandoffStates.get(issue.id) ?? null,
scheduledRetry,
activeRecoveryAction,
activeRecoveryAction: revalidatedActiveRecoveryAction,
blockedBy: relationsWithRecoveryActions.blockedBy,
blocks: relationsWithRecoveryActions.blocks,
relatedWork: referenceSummary,
@@ -1823,7 +2083,11 @@ export function issueRoutes(
return;
}
assertCompanyAccess(req, issue.companyId);
const active = await recoveryActionsSvc.getActiveForIssue(issue.companyId, issue.id);
const active = await revalidateActiveSourceRecoveryForRead({
issue,
trigger: "read_projection",
actor: getActorInfo(req),
});
res.json({
active,
actions: active ? [active] : [],
@@ -1839,6 +2103,18 @@ export function issueRoutes(
}
assertCompanyAccess(req, existing.companyId);
if (!(await assertAgentIssueMutationAllowed(req, res, existing))) return;
const activeRecoveryAction = await recoveryActionsSvc.getActiveForIssue(existing.companyId, existing.id);
if (
!(await assertRecoveryActionAuthority(
req,
res,
existing,
activeRecoveryAction,
{ source: "recovery_action_resolution" },
))
) {
return;
}
const { actionId, outcome, sourceIssueStatus, resolutionNote } = req.body;
if (outcome === "false_positive" || outcome === "cancelled") {
@@ -1948,6 +2224,36 @@ export function issueRoutes(
},
});
if (
sourceIssueStatus === "todo" &&
existing.status !== result.issue.status &&
result.issue.assigneeAgentId
) {
void heartbeat.wakeup(result.issue.assigneeAgentId, {
source: "automation",
triggerDetail: "system",
reason: "issue_recovery_action_restored",
payload: {
issueId: result.issue.id,
recoveryActionId: result.recoveryAction.id,
mutation: "recovery_action_resolution",
},
requestedByActorType: actor.actorType,
requestedByActorId: actor.actorId,
contextSnapshot: {
issueId: result.issue.id,
taskId: result.issue.id,
wakeReason: "issue_recovery_action_restored",
source: "issue.recovery_action_resolution",
recoveryActionId: result.recoveryAction.id,
},
}).catch((err) =>
logger.warn(
{ err, issueId: result.issue.id, agentId: result.issue.assigneeAgentId },
"failed to wake agent after recovery action restored issue",
));
}
res.json({
issue: {
...result.issue,
@@ -2087,6 +2393,13 @@ export function issueRoutes(
});
}
await revalidateActiveSourceRecoveryAfterCommittedWrite({
issue,
trigger: "document",
actor,
documentChanged: true,
});
res.status(result.created ? 201 : 200).json(doc);
});
@@ -2274,6 +2587,13 @@ export function issueRoutes(
source: "issue.document_restored",
});
await revalidateActiveSourceRecoveryAfterCommittedWrite({
issue,
trigger: "document",
actor,
documentChanged: true,
});
res.json(result.document);
},
);
@@ -2344,6 +2664,12 @@ export function issueRoutes(
actor,
source: "issue.document_deleted",
});
await revalidateActiveSourceRecoveryAfterCommittedWrite({
issue,
trigger: "document",
actor,
documentChanged: true,
});
res.json({ ok: true });
});
@@ -2376,6 +2702,12 @@ export function issueRoutes(
entityId: issue.id,
details: { workProductId: product.id, type: product.type, provider: product.provider },
});
await revalidateActiveSourceRecoveryAfterCommittedWrite({
issue,
trigger: "work_product",
actor,
workProductChanged: true,
});
res.status(201).json(product);
});
@@ -2410,6 +2742,12 @@ export function issueRoutes(
entityId: existing.issueId,
details: { workProductId: product.id, changedKeys: Object.keys(req.body).sort() },
});
await revalidateActiveSourceRecoveryAfterCommittedWrite({
issue,
trigger: "work_product",
actor,
workProductChanged: true,
});
res.json(product);
});
@@ -2444,6 +2782,12 @@ export function issueRoutes(
entityId: existing.issueId,
details: { workProductId: removed.id, type: removed.type },
});
await revalidateActiveSourceRecoveryAfterCommittedWrite({
issue,
trigger: "work_product",
actor,
workProductChanged: true,
});
res.json(removed);
});
@@ -2931,6 +3275,28 @@ export function issueRoutes(
const requestedAssigneeAgentId =
normalizedAssigneeAgentId === undefined ? existing.assigneeAgentId : normalizedAssigneeAgentId;
const explicitMoveToTodoRequested = reopenRequested || resumeRequested === true;
const recoveryRelevantSourceMutationRequested =
req.body.status !== undefined ||
normalizedAssigneeAgentId !== undefined ||
req.body.assigneeUserId !== undefined ||
Array.isArray(req.body.blockedByIssueIds) ||
req.body.executionPolicy !== undefined ||
explicitMoveToTodoRequested;
const activeRecoveryActionBeforeUpdate = recoveryRelevantSourceMutationRequested
? await recoveryActionsSvc.getActiveForIssue(existing.companyId, existing.id)
: null;
if (
recoveryRelevantSourceMutationRequested &&
!(await assertRecoveryActionAuthority(
req,
res,
existing,
activeRecoveryActionBeforeUpdate,
{ source: "issue_update" },
))
) {
return;
}
const effectiveMoveToTodoRequested =
explicitMoveToTodoRequested ||
(!!commentBody &&
@@ -3207,6 +3573,7 @@ export function issueRoutes(
let issueResponse: typeof issue & {
blockedBy?: unknown;
blocks?: unknown;
activeRecoveryAction?: unknown;
relatedWork?: Awaited<ReturnType<typeof issueReferencesSvc.listIssueReferenceSummary>>;
referencedIssueIdentifiers?: string[];
} = issue;
@@ -3258,6 +3625,32 @@ export function issueRoutes(
previous.status !== undefined &&
issue.status === "todo";
const reopenFromStatus = reopened ? existing.status : null;
const statusChangedFromBlockedToTodo =
existing.status === "blocked" &&
issue.status === "todo" &&
(req.body.status !== undefined || reopened);
const revalidatedRecoveryAction = await revalidateActiveSourceRecoveryAfterCommittedWrite({
issue,
trigger: "issue_update",
actor,
activeRecoveryAction: activeRecoveryActionBeforeUpdate ?? undefined,
statusChanged: existing.status !== issue.status,
assigneeChanged:
existing.assigneeAgentId !== issue.assigneeAgentId ||
existing.assigneeUserId !== issue.assigneeUserId,
blockersChanged: Array.isArray(req.body.blockedByIssueIds),
executionPolicyChanged: req.body.executionPolicy !== undefined,
monitorChanged,
resumeRequested: resumeRequested === true,
reopened,
blockedToTodoRecovery: statusChangedFromBlockedToTodo,
});
if (activeRecoveryActionBeforeUpdate && !revalidatedRecoveryAction) {
issueResponse = {
...issueResponse,
activeRecoveryAction: null,
};
}
await logActivity(db, {
companyId: issue.companyId,
actorType: actor.actorType,
@@ -3531,10 +3924,6 @@ export function issueRoutes(
existing.status === "backlog" &&
issue.status !== "backlog" &&
req.body.status !== undefined;
const statusChangedFromBlockedToTodo =
existing.status === "blocked" &&
issue.status === "todo" &&
(req.body.status !== undefined || reopened);
const statusChangedFromClosedToTodo =
isClosedIssueStatus(existing.status) &&
issue.status === "todo" &&
@@ -4126,12 +4515,18 @@ export function issueRoutes(
});
}
const acceptedPlanConfirmation =
interaction.kind === "request_confirmation" &&
interaction.status === "accepted" &&
issue.workMode === "planning";
queueResolvedInteractionContinuationWakeup({
heartbeat,
issue: continuationWakeIssue,
interaction,
actor,
source: "issue.interaction.accept",
forceFreshSession: acceptedPlanConfirmation,
workspaceRefreshReason: acceptedPlanConfirmation ? "accepted_plan_confirmation" : null,
});
res.json(interaction);
@@ -4630,6 +5025,16 @@ export function issueRoutes(
source: "issue.comment",
});
await revalidateActiveSourceRecoveryAfterCommittedWrite({
issue: currentIssue,
trigger: "comment",
actor,
statusChanged: reopened,
resumeRequested: resumeRequested === true,
reopened,
blockedToTodoRecovery: reopened && reopenFromStatus === "blocked" && currentIssue.status === "todo",
});
// Merge all wakeups from this comment into one enqueue per agent to avoid duplicate runs.
void (async () => {
const wakeups = new Map<string, Parameters<typeof heartbeat.wakeup>[1]>();
+1 -1
View File
@@ -1000,7 +1000,7 @@ function redactInlineBase64ImageData(chunk: string) {
}
export function compactRunLogChunk(chunk: string, maxChars = MAX_PERSISTED_LOG_CHUNK_CHARS) {
const normalized = redactInlineBase64ImageData(chunk);
const normalized = redactSensitiveText(redactInlineBase64ImageData(chunk));
if (normalized.length <= maxChars) return normalized;
const headChars = Math.max(0, Math.floor(maxChars * 0.6));
+23 -1
View File
@@ -73,7 +73,10 @@ import {
issueTreeControlService,
type ActiveIssueTreePauseHoldGate,
} from "./issue-tree-control.js";
import { parseIssueGraphLivenessIncidentKey } from "./recovery/origins.js";
import {
parseIssueGraphLivenessIncidentKey,
RECOVERY_ORIGIN_KINDS,
} from "./recovery/origins.js";
import { classifyIssueGraphLiveness, type IssueLivenessFinding } from "./recovery/issue-graph-liveness.js";
const ALL_ISSUE_STATUSES = ["backlog", "todo", "in_progress", "in_review", "blocked", "done", "cancelled"];
@@ -4515,6 +4518,25 @@ export function issueService(db: Db) {
}
}
const [enriched] = await withIssueLabels(tx, [updated]);
if (
(issueData.status === "done" || issueData.status === "cancelled") &&
existing.status !== issueData.status &&
existing.originKind === RECOVERY_ORIGIN_KINDS.issueGraphLivenessEscalation
) {
const parsedIncident = parseIssueGraphLivenessIncidentKey(existing.originId);
if (parsedIncident?.issueId && parsedIncident.companyId === existing.companyId) {
await tx
.delete(issueRelations)
.where(
and(
eq(issueRelations.companyId, existing.companyId),
eq(issueRelations.issueId, existing.id),
eq(issueRelations.relatedIssueId, parsedIncident.issueId),
eq(issueRelations.type, "blocks"),
),
);
}
}
return enriched;
};
+6 -2
View File
@@ -486,8 +486,12 @@ export async function writePluginLocalFolderTextAtomic(
contents: string,
) {
const rootRealPath = await fs.realpath(rootPath);
const resolved = await resolvePluginLocalFolderPath(rootPath, relativePath);
await fs.mkdir(path.dirname(resolved.absolutePath), { recursive: true });
const normalized = normalizeRelativePath(relativePath);
const parentRelativePath = path.dirname(normalized);
if (parentRelativePath !== ".") {
await ensureDirectoryInsideRoot(rootRealPath, parentRelativePath);
}
const resolved = await resolvePluginLocalFolderPath(rootRealPath, normalized);
await assertPathInsideRoot(rootRealPath, path.dirname(resolved.absolutePath));
const tempPath = path.join(
path.dirname(resolved.absolutePath),
+384 -4
View File
@@ -1,4 +1,4 @@
import { and, asc, desc, eq, gt, inArray, isNull, notInArray, sql } from "drizzle-orm";
import { and, asc, desc, eq, gt, gte, inArray, isNull, notInArray, sql } from "drizzle-orm";
import type { Db } from "@paperclipai/db";
import {
DEFAULT_ISSUE_GRAPH_LIVENESS_AUTO_RECOVERY_LOOKBACK_HOURS,
@@ -11,11 +11,12 @@ import {
agents,
agentWakeupRequests,
approvals,
activityLog,
companies,
issueComments,
heartbeatRunEvents,
heartbeatRunWatchdogDecisions,
heartbeatRuns,
issueComments,
issueApprovals,
issueRecoveryActions,
issueRelations,
@@ -26,6 +27,7 @@ import { parseObject, asBoolean, asNumber } from "../../adapters/utils.js";
import { runningProcesses } from "../../adapters/index.js";
import { forbidden, notFound } from "../../errors.js";
import { logger } from "../../middleware/logger.js";
import { isPidAlive, isProcessGroupAlive, terminateLocalService } from "../local-service-supervisor.js";
import { redactCurrentUserText } from "../../log-redaction.js";
import { redactSensitiveText } from "../../redaction.js";
import { logActivity } from "../activity-log.js";
@@ -68,6 +70,15 @@ const ACTIVE_RUN_OUTPUT_EVIDENCE_TAIL_BYTES = 8 * 1024;
const STRANDED_ISSUE_RECOVERY_ORIGIN_KIND = RECOVERY_ORIGIN_KINDS.strandedIssueRecovery;
const STALE_ACTIVE_RUN_EVALUATION_ORIGIN_KIND = RECOVERY_ORIGIN_KINDS.staleActiveRunEvaluation;
const DEFERRED_WAKE_CONTEXT_KEY = "_paperclipWakeContext";
const SESSIONED_LOCAL_ADAPTERS = new Set([
"claude_local",
"codex_local",
"cursor",
"gemini_local",
"hermes_local",
"opencode_local",
"pi_local",
]);
type RecoveryWakeupOptions = {
source?: "timer" | "assignment" | "on_demand" | "automation";
@@ -673,6 +684,16 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
return `stale_active_run:${companyId}:${runId}`;
}
function isTerminalIssueStatus(status: string | null | undefined) {
return status === "done" || status === "cancelled";
}
function isRecoveryOriginIssue(issue: typeof issues.$inferSelect) {
return Object.values(RECOVERY_ORIGIN_KINDS).includes(
issue.originKind as typeof RECOVERY_ORIGIN_KINDS[keyof typeof RECOVERY_ORIGIN_KINDS],
);
}
function silenceStartedAtForRun(run: Pick<typeof heartbeatRuns.$inferSelect, "lastOutputAt" | "processStartedAt" | "startedAt" | "createdAt">) {
return run.lastOutputAt ?? run.processStartedAt ?? run.startedAt ?? run.createdAt ?? null;
}
@@ -798,6 +819,309 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
return issue ?? null;
}
async function latestSameRunSourceTerminalEvidence(input: {
run: typeof heartbeatRuns.$inferSelect;
sourceIssue: typeof issues.$inferSelect;
evidenceAfter: Date | null;
}) {
if (!isTerminalIssueStatus(input.sourceIssue.status)) return null;
const after = input.evidenceAfter ?? input.run.startedAt ?? input.run.createdAt ?? null;
const activityPredicates = [
eq(activityLog.companyId, input.run.companyId),
eq(activityLog.runId, input.run.id),
eq(activityLog.action, "issue.updated"),
eq(activityLog.entityType, "issue"),
eq(activityLog.entityId, input.sourceIssue.id),
sql`${activityLog.details} ->> 'status' = ${input.sourceIssue.status}`,
];
if (after) {
activityPredicates.push(gte(activityLog.createdAt, after));
}
const activity = await db
.select({
id: activityLog.id,
createdAt: activityLog.createdAt,
action: activityLog.action,
})
.from(activityLog)
.where(and(...activityPredicates))
.orderBy(desc(activityLog.createdAt))
.limit(1)
.then((rows) => rows[0] ?? null);
if (activity) {
return {
kind: "activity" as const,
id: activity.id,
createdAt: activity.createdAt,
action: activity.action,
};
}
return null;
}
async function nextRunEventSeq(runId: string) {
const [row] = await db
.select({ maxSeq: sql<number | null>`max(${heartbeatRunEvents.seq})` })
.from(heartbeatRunEvents)
.where(eq(heartbeatRunEvents.runId, runId));
return Number(row?.maxSeq ?? 0) + 1;
}
async function appendRecoveryRunEvent(
run: typeof heartbeatRuns.$inferSelect,
event: {
level: "info" | "warn" | "error";
message: string;
payload?: Record<string, unknown>;
},
) {
await db.insert(heartbeatRunEvents).values({
companyId: run.companyId,
runId: run.id,
agentId: run.agentId,
seq: await nextRunEventSeq(run.id),
eventType: "lifecycle",
stream: "system",
level: event.level,
message: event.message,
payload: event.payload ?? null,
});
}
async function cleanupSourceResolvedRunProcess(input: {
run: typeof heartbeatRuns.$inferSelect;
runningAgent: typeof agents.$inferSelect;
}) {
if (!SESSIONED_LOCAL_ADAPTERS.has(input.runningAgent.adapterType)) {
return {
attempted: false,
outcome: "skipped_non_local_adapter",
adapterType: input.runningAgent.adapterType,
};
}
const running = runningProcesses.get(input.run.id);
const pid = running?.child.pid ?? input.run.processPid ?? null;
const processGroupId = running?.processGroupId ?? input.run.processGroupId ?? null;
if (typeof pid !== "number" && typeof processGroupId !== "number") {
return {
attempted: false,
outcome: "no_process_metadata",
adapterType: input.runningAgent.adapterType,
};
}
const wasAlive =
(typeof pid === "number" && isPidAlive(pid)) ||
(typeof processGroupId === "number" && isProcessGroupAlive(processGroupId));
if (!wasAlive) {
runningProcesses.delete(input.run.id);
return {
attempted: false,
outcome: "not_running",
adapterType: input.runningAgent.adapterType,
pid,
processGroupId,
};
}
try {
await terminateLocalService(
{
pid: typeof pid === "number" && Number.isInteger(pid) && pid > 0
? pid
: (processGroupId ?? 0),
processGroupId: typeof processGroupId === "number" && Number.isInteger(processGroupId) && processGroupId > 0
? processGroupId
: null,
},
running ? { forceAfterMs: Math.max(1, running.graceSec) * 1000 } : undefined,
);
runningProcesses.delete(input.run.id);
const stillAlive =
(typeof pid === "number" && isPidAlive(pid)) ||
(typeof processGroupId === "number" && isProcessGroupAlive(processGroupId));
return {
attempted: true,
outcome: stillAlive ? "termination_sent_still_running" : "terminated",
adapterType: input.runningAgent.adapterType,
pid,
processGroupId,
};
} catch (error) {
return {
attempted: true,
outcome: "failed",
adapterType: input.runningAgent.adapterType,
pid,
processGroupId,
error: error instanceof Error ? error.message : String(error),
};
}
}
async function finalizeAgentAfterSourceResolvedRun(run: typeof heartbeatRuns.$inferSelect, status: "succeeded" | "cancelled") {
const [runningCountRow] = await db
.select({ count: sql<number>`count(*)::int` })
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, run.agentId), eq(heartbeatRuns.status, "running")));
const runningCount = Number(runningCountRow?.count ?? 0);
const nextStatus = runningCount > 0 ? "running" : status === "succeeded" || status === "cancelled" ? "idle" : "error";
await db
.update(agents)
.set({
status: nextStatus,
lastHeartbeatAt: new Date(),
updatedAt: new Date(),
})
.where(and(eq(agents.id, run.agentId), notInArray(agents.status, ["paused", "terminated"])));
}
async function foldSourceResolvedStaleRun(input: {
run: typeof heartbeatRuns.$inferSelect;
runningAgent: typeof agents.$inferSelect;
sourceIssue: typeof issues.$inferSelect;
evidence: Awaited<ReturnType<typeof latestSameRunSourceTerminalEvidence>>;
existingEvaluation: Awaited<ReturnType<typeof findOpenStaleRunEvaluation>>;
silenceStartedAt: Date | null;
silenceAgeMs: number | null;
now: Date;
}) {
if (!input.evidence) return { kind: "skipped" as const };
const cleanup = await cleanupSourceResolvedRunProcess({ run: input.run, runningAgent: input.runningAgent });
const finalRunStatus = input.sourceIssue.status === "cancelled" ? "cancelled" : "succeeded";
const resultJson = {
...parseObject(input.run.resultJson),
sourceResolvedWatchdogFold: {
sourceIssueId: input.sourceIssue.id,
sourceIssueIdentifier: input.sourceIssue.identifier,
sourceIssueStatus: input.sourceIssue.status,
sameRunEvidenceKind: input.evidence.kind,
sameRunEvidenceId: input.evidence.id,
sameRunEvidenceAt: input.evidence.createdAt.toISOString(),
silenceStartedAt: input.silenceStartedAt?.toISOString() ?? null,
silenceAgeMs: input.silenceAgeMs,
evaluationIssueId: input.existingEvaluation?.id ?? null,
evaluationIssueIdentifier: input.existingEvaluation?.identifier ?? null,
cleanup,
},
};
const finalizedRun = await db.transaction(async (tx) => {
const [updatedRun] = await tx
.update(heartbeatRuns)
.set({
status: finalRunStatus,
finishedAt: input.now,
error: null,
errorCode: null,
resultJson,
updatedAt: input.now,
})
.where(and(eq(heartbeatRuns.id, input.run.id), eq(heartbeatRuns.companyId, input.run.companyId), eq(heartbeatRuns.status, "running")))
.returning();
if (!updatedRun) return null;
if (input.run.wakeupRequestId) {
await tx
.update(agentWakeupRequests)
.set({
status: finalRunStatus === "succeeded" ? "completed" : "cancelled",
finishedAt: input.now,
error: null,
updatedAt: input.now,
})
.where(and(eq(agentWakeupRequests.id, input.run.wakeupRequestId), eq(agentWakeupRequests.companyId, input.run.companyId)));
}
await tx
.update(issues)
.set({
executionRunId: null,
executionAgentNameKey: null,
executionLockedAt: null,
updatedAt: input.now,
})
.where(
and(
eq(issues.id, input.sourceIssue.id),
eq(issues.companyId, input.run.companyId),
eq(issues.executionRunId, input.run.id),
),
);
return updatedRun;
});
if (!finalizedRun) return { kind: "skipped" as const };
if (input.existingEvaluation && !isTerminalIssueStatus(input.existingEvaluation.status)) {
await issuesSvc.update(input.existingEvaluation.id, { status: "done" });
await issuesSvc.addComment(input.existingEvaluation.id, [
"Source-resolved watchdog fold.",
"",
`- Source issue: ${input.sourceIssue.identifier ?? input.sourceIssue.id}`,
`- Run: \`${input.run.id}\``,
`- Same-run evidence: \`${input.evidence.kind}:${input.evidence.id}\` at ${input.evidence.createdAt.toISOString()}`,
"- Outcome: false positive; the source issue already reached a terminal disposition from this run.",
].join("\n"), { runId: input.run.id });
}
const activeRecoveryAction = await recoveryActionsSvc.getActiveForIssue(input.run.companyId, input.sourceIssue.id);
if (activeRecoveryAction?.kind === "active_run_watchdog") {
await recoveryActionsSvc.resolveActiveForIssue({
companyId: input.run.companyId,
sourceIssueId: input.sourceIssue.id,
actionId: activeRecoveryAction.id,
status: "resolved",
outcome: "false_positive",
resolutionNote: "Source issue reached a terminal disposition through durable same-run activity; watchdog folded as source-resolved.",
});
}
const [decision] = await db
.insert(heartbeatRunWatchdogDecisions)
.values({
companyId: input.run.companyId,
runId: input.run.id,
evaluationIssueId: input.existingEvaluation?.id ?? null,
decision: "dismissed_false_positive",
reason: "Source issue already reached a terminal disposition through durable same-run activity.",
createdByRunId: input.run.id,
})
.returning();
await appendRecoveryRunEvent(finalizedRun, {
level: cleanup.outcome === "failed" ? "warn" : "info",
message: "Source-resolved watchdog fold finalized stale active run",
payload: resultJson.sourceResolvedWatchdogFold,
});
await logActivity(db, {
companyId: input.run.companyId,
actorType: "system",
actorId: "system",
agentId: input.run.agentId,
runId: input.run.id,
action: "heartbeat.output_stale_source_resolved",
entityType: "heartbeat_run",
entityId: input.run.id,
details: {
source: "recovery.scan_silent_active_runs",
sourceIssueId: input.sourceIssue.id,
sourceIssueIdentifier: input.sourceIssue.identifier,
sourceIssueStatus: input.sourceIssue.status,
evaluationIssueId: input.existingEvaluation?.id ?? null,
watchdogDecisionId: decision.id,
sameRunEvidenceKind: input.evidence.kind,
sameRunEvidenceId: input.evidence.id,
sameRunEvidenceAt: input.evidence.createdAt.toISOString(),
cleanup,
},
});
await finalizeAgentAfterSourceResolvedRun(finalizedRun, finalRunStatus);
return { kind: "folded" as const, evaluationIssueId: input.existingEvaluation?.id ?? null };
}
async function resolveStaleRunOwnerAgentId(input: {
run: typeof heartbeatRuns.$inferSelect;
runningAgent: typeof agents.$inferSelect;
@@ -1030,6 +1354,47 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
const runningAgent = await getAgent(input.run.agentId);
if (!runningAgent || runningAgent.companyId !== input.run.companyId) return { kind: "skipped" as const };
const sourceIssue = await resolveStaleRunSourceIssue(input.run);
const existing = await findOpenStaleRunEvaluation(input.run.companyId, input.run.id);
if (sourceIssue && isRecoveryOriginIssue(sourceIssue)) {
await logActivity(db, {
companyId: input.run.companyId,
actorType: "system",
actorId: "system",
agentId: input.run.agentId,
runId: input.run.id,
action: "heartbeat.output_stale_recovery_recursion_refused",
entityType: "heartbeat_run",
entityId: input.run.id,
details: {
source: "recovery.scan_silent_active_runs",
sourceIssueId: sourceIssue.id,
sourceIssueIdentifier: sourceIssue.identifier,
sourceIssueOriginKind: sourceIssue.originKind,
existingEvaluationIssueId: existing?.id ?? null,
},
});
return { kind: "skipped" as const };
}
const silenceStartedAt = silenceStartedAtForRun(input.run);
if (sourceIssue && isTerminalIssueStatus(sourceIssue.status)) {
const terminalEvidence = await latestSameRunSourceTerminalEvidence({
run: input.run,
sourceIssue,
evidenceAfter: silenceStartedAt,
});
if (terminalEvidence) {
return foldSourceResolvedStaleRun({
run: input.run,
runningAgent,
sourceIssue,
evidence: terminalEvidence,
existingEvaluation: existing,
silenceStartedAt,
silenceAgeMs: silenceAgeMsForRun(input.run, input.now),
now: input.now,
});
}
}
const prefix = await getCompanyIssuePrefix(input.run.companyId);
const evidence = await collectStaleRunEvidence({
run: input.run,
@@ -1039,7 +1404,6 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
now: input.now,
});
const level = (evidence.silenceAgeMs ?? 0) >= ACTIVE_RUN_OUTPUT_CRITICAL_THRESHOLD_MS ? "critical" : "suspicious";
const existing = await findOpenStaleRunEvaluation(input.run.companyId, input.run.id);
if (existing) {
if (level === "critical" && existing.priority !== "high") {
await issuesSvc.update(existing.id, {
@@ -1174,6 +1538,7 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
created: 0,
existing: 0,
escalated: 0,
folded: 0,
snoozed: 0,
skipped: 0,
evaluationIssueIds: [] as string[],
@@ -1188,6 +1553,7 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
if (outcome.kind === "created") result.created += 1;
else if (outcome.kind === "existing") result.existing += 1;
else if (outcome.kind === "escalated") result.escalated += 1;
else if (outcome.kind === "folded") result.folded += 1;
else result.skipped += 1;
if ("evaluationIssueId" in outcome && outcome.evaluationIssueId) {
result.evaluationIssueIds.push(outcome.evaluationIssueId);
@@ -2382,7 +2748,6 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
if (row.originKind === RECOVERY_ORIGIN_KINDS.issueGraphLivenessEscalation) {
const parsed = parseIssueGraphLivenessIncidentKey(row.originId);
if (!parsed || parsed.companyId !== row.companyId) return [];
if (parsed.state !== "blocked_by_assigned_backlog_issue") return [];
return [
{
companyId: row.companyId,
@@ -2575,6 +2940,21 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
) {
continue;
}
const sourceIssue = await db
.select({
id: issues.id,
status: issues.status,
})
.from(issues)
.where(and(eq(issues.companyId, parsed.companyId), eq(issues.id, parsed.issueId)))
.then((rows) => rows[0] ?? null);
if (sourceIssue && !["done", "cancelled"].includes(sourceIssue.status)) {
const blockerIds = await existingBlockerIssueIds(parsed.companyId, sourceIssue.id);
if (blockerIds.includes(recovery.id)) {
result.activeSkipped += 1;
continue;
}
}
if (await removeRecoveryBlockerFromSource(recovery)) {
result.blockerRelationsRemoved += 1;
}