From ec75cabcd81d6ab5ddb809e26705218383d775c6 Mon Sep 17 00:00:00 2001 From: dotta Date: Wed, 8 Apr 2026 08:05:35 -0500 Subject: [PATCH] Enforce execution-policy stage handoffs --- packages/adapter-utils/src/server-utils.ts | 106 +++++++- .../src/__tests__/codex-local-execute.test.ts | 246 ++++++++++++++++++ .../heartbeat-workspace-session.test.ts | 12 + .../issue-comment-reopen-routes.test.ts | 161 +++++++++++- .../__tests__/issue-execution-policy.test.ts | 131 ++++++++-- server/src/routes/issues.ts | 191 ++++++++++++-- server/src/services/heartbeat.ts | 52 ++-- server/src/services/issue-execution-policy.ts | 188 ++++++++----- 8 files changed, 949 insertions(+), 138 deletions(-) diff --git a/packages/adapter-utils/src/server-utils.ts b/packages/adapter-utils/src/server-utils.ts index 629924d9..1c6a2795 100644 --- a/packages/adapter-utils/src/server-utils.ts +++ b/packages/adapter-utils/src/server-utils.ts @@ -201,6 +201,22 @@ type PaperclipWakeIssue = { priority: string | null; }; +type PaperclipWakeExecutionPrincipal = { + type: "agent" | "user" | null; + agentId: string | null; + userId: string | null; +}; + +type PaperclipWakeExecutionStage = { + wakeRole: "reviewer" | "approver" | "executor" | null; + stageId: string | null; + stageType: string | null; + currentParticipant: PaperclipWakeExecutionPrincipal | null; + returnAssignee: PaperclipWakeExecutionPrincipal | null; + lastDecisionOutcome: string | null; + allowedActions: string[]; +}; + type PaperclipWakeComment = { id: string | null; issueId: string | null; @@ -214,6 +230,7 @@ type PaperclipWakeComment = { type PaperclipWakePayload = { reason: string | null; issue: PaperclipWakeIssue | null; + executionStage: PaperclipWakeExecutionStage | null; commentIds: string[]; latestCommentId: string | null; comments: PaperclipWakeComment[]; @@ -257,6 +274,50 @@ function normalizePaperclipWakeComment(value: unknown): PaperclipWakeComment | n }; } +function normalizePaperclipWakeExecutionPrincipal(value: unknown): PaperclipWakeExecutionPrincipal | null { + const principal = parseObject(value); + const typeRaw = asString(principal.type, "").trim().toLowerCase(); + if (typeRaw !== "agent" && typeRaw !== "user") return null; + return { + type: typeRaw, + agentId: asString(principal.agentId, "").trim() || null, + userId: asString(principal.userId, "").trim() || null, + }; +} + +function normalizePaperclipWakeExecutionStage(value: unknown): PaperclipWakeExecutionStage | null { + const stage = parseObject(value); + const wakeRoleRaw = asString(stage.wakeRole, "").trim().toLowerCase(); + const wakeRole = + wakeRoleRaw === "reviewer" || wakeRoleRaw === "approver" || wakeRoleRaw === "executor" + ? wakeRoleRaw + : null; + const allowedActions = Array.isArray(stage.allowedActions) + ? stage.allowedActions + .filter((entry): entry is string => typeof entry === "string" && entry.trim().length > 0) + .map((entry) => entry.trim()) + : []; + const currentParticipant = normalizePaperclipWakeExecutionPrincipal(stage.currentParticipant); + const returnAssignee = normalizePaperclipWakeExecutionPrincipal(stage.returnAssignee); + const stageId = asString(stage.stageId, "").trim() || null; + const stageType = asString(stage.stageType, "").trim() || null; + const lastDecisionOutcome = asString(stage.lastDecisionOutcome, "").trim() || null; + + if (!wakeRole && !stageId && !stageType && !currentParticipant && !returnAssignee && !lastDecisionOutcome && allowedActions.length === 0) { + return null; + } + + return { + wakeRole, + stageId, + stageType, + currentParticipant, + returnAssignee, + lastDecisionOutcome, + allowedActions, + }; +} + export function normalizePaperclipWakePayload(value: unknown): PaperclipWakePayload | null { const payload = parseObject(value); const comments = Array.isArray(payload.comments) @@ -270,12 +331,16 @@ export function normalizePaperclipWakePayload(value: unknown): PaperclipWakePayl .filter((entry): entry is string => typeof entry === "string" && entry.trim().length > 0) .map((entry) => entry.trim()) : []; + const executionStage = normalizePaperclipWakeExecutionStage(payload.executionStage); - if (comments.length === 0 && commentIds.length === 0) return null; + if (comments.length === 0 && commentIds.length === 0 && !executionStage && !normalizePaperclipWakeIssue(payload.issue)) { + return null; + } return { reason: asString(payload.reason, "").trim() || null, issue: normalizePaperclipWakeIssue(payload.issue), + executionStage, commentIds, latestCommentId: asString(payload.latestCommentId, "").trim() || null, comments, @@ -300,6 +365,12 @@ export function renderPaperclipWakePrompt( const normalized = normalizePaperclipWakePayload(value); if (!normalized) return ""; const resumedSession = options.resumedSession === true; + const executionStage = normalized.executionStage; + const principalLabel = (principal: PaperclipWakeExecutionPrincipal | null) => { + if (!principal || !principal.type) return "unknown"; + if (principal.type === "agent") return principal.agentId ? `agent ${principal.agentId}` : "agent"; + return principal.userId ? `user ${principal.userId}` : "user"; + }; const lines = resumedSession ? [ @@ -342,7 +413,38 @@ export function renderPaperclipWakePrompt( lines.push(`- omitted comments: ${normalized.missingCount}`); } - lines.push("", "New comments in order:"); + if (executionStage) { + lines.push( + `- execution wake role: ${executionStage.wakeRole ?? "unknown"}`, + `- execution stage: ${executionStage.stageType ?? "unknown"}`, + `- execution participant: ${principalLabel(executionStage.currentParticipant)}`, + `- execution return assignee: ${principalLabel(executionStage.returnAssignee)}`, + `- last decision outcome: ${executionStage.lastDecisionOutcome ?? "none"}`, + ); + if (executionStage.allowedActions.length > 0) { + lines.push(`- allowed actions: ${executionStage.allowedActions.join(", ")}`); + } + lines.push(""); + if (executionStage.wakeRole === "reviewer" || executionStage.wakeRole === "approver") { + lines.push( + `You are waking as the active ${executionStage.wakeRole} for this issue.`, + "Do not execute the task itself or continue executor work.", + "Review the issue and choose one of the allowed actions above.", + "If you request changes, the workflow routes back to the stored return assignee.", + "", + ); + } else if (executionStage.wakeRole === "executor") { + lines.push( + "You are waking because changes were requested in the execution workflow.", + "Address the requested changes on this issue and resubmit when the work is ready.", + "", + ); + } + } + + if (normalized.comments.length > 0) { + lines.push("New comments in order:"); + } for (const [index, comment] of normalized.comments.entries()) { const authorLabel = comment.authorId diff --git a/server/src/__tests__/codex-local-execute.test.ts b/server/src/__tests__/codex-local-execute.test.ts index da648367..9514a977 100644 --- a/server/src/__tests__/codex-local-execute.test.ts +++ b/server/src/__tests__/codex-local-execute.test.ts @@ -369,6 +369,252 @@ describe("codex execute", () => { } }); + it("renders execution-stage wake instructions for reviewer and executor roles", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-codex-execute-stage-wake-")); + const workspace = path.join(root, "workspace"); + const commandPath = path.join(root, "codex"); + const capturePath = path.join(root, "capture.json"); + await fs.mkdir(workspace, { recursive: true }); + await writeFakeCodexCommand(commandPath); + + const previousHome = process.env.HOME; + process.env.HOME = root; + + try { + const result = await execute({ + runId: "run-stage-wake", + agent: { + id: "agent-1", + companyId: "company-1", + name: "Codex Coder", + adapterType: "codex_local", + adapterConfig: {}, + }, + runtime: { + sessionId: null, + sessionParams: null, + sessionDisplayId: null, + taskKey: null, + }, + config: { + command: commandPath, + cwd: workspace, + env: { + PAPERCLIP_TEST_CAPTURE_PATH: capturePath, + }, + promptTemplate: "Follow the paperclip heartbeat.", + }, + context: { + issueId: "issue-1", + taskId: "issue-1", + wakeReason: "execution_review_requested", + paperclipWake: { + reason: "execution_review_requested", + issue: { + id: "issue-1", + identifier: "PAP-1207", + title: "implement the plan of PAP-1200", + status: "in_review", + priority: "medium", + }, + executionStage: { + wakeRole: "reviewer", + stageId: "stage-1", + stageType: "review", + currentParticipant: { type: "agent", agentId: "qa-agent" }, + returnAssignee: { type: "agent", agentId: "coder-agent" }, + lastDecisionOutcome: null, + allowedActions: ["approve", "request_changes"], + }, + commentIds: [], + latestCommentId: null, + comments: [], + commentWindow: { + requestedCount: 0, + includedCount: 0, + missingCount: 0, + }, + truncated: false, + fallbackFetchNeeded: false, + }, + }, + authToken: "run-jwt-token", + onLog: async () => {}, + }); + + expect(result.exitCode).toBe(0); + const capture = JSON.parse(await fs.readFile(capturePath, "utf8")) as CapturePayload; + expect(capture.prompt).toContain("execution wake role: reviewer"); + expect(capture.prompt).toContain("You are waking as the active reviewer for this issue."); + expect(capture.prompt).toContain("Do not execute the task itself or continue executor work."); + expect(capture.prompt).toContain("allowed actions: approve, request_changes"); + + const executorCapturePath = path.join(root, "capture-executor.json"); + const executorResult = await execute({ + runId: "run-stage-wake-executor", + agent: { + id: "agent-1", + companyId: "company-1", + name: "Codex Coder", + adapterType: "codex_local", + adapterConfig: {}, + }, + runtime: { + sessionId: null, + sessionParams: null, + sessionDisplayId: null, + taskKey: null, + }, + config: { + command: commandPath, + cwd: workspace, + env: { + PAPERCLIP_TEST_CAPTURE_PATH: executorCapturePath, + }, + promptTemplate: "Follow the paperclip heartbeat.", + }, + context: { + issueId: "issue-1", + taskId: "issue-1", + wakeReason: "execution_changes_requested", + paperclipWake: { + reason: "execution_changes_requested", + issue: { + id: "issue-1", + identifier: "PAP-1207", + title: "implement the plan of PAP-1200", + status: "in_progress", + priority: "medium", + }, + executionStage: { + wakeRole: "executor", + stageId: "stage-1", + stageType: "review", + currentParticipant: { type: "agent", agentId: "qa-agent" }, + returnAssignee: { type: "agent", agentId: "coder-agent" }, + lastDecisionOutcome: "changes_requested", + allowedActions: ["address_changes", "resubmit"], + }, + commentIds: [], + latestCommentId: null, + comments: [], + commentWindow: { + requestedCount: 0, + includedCount: 0, + missingCount: 0, + }, + truncated: false, + fallbackFetchNeeded: false, + }, + }, + authToken: "run-jwt-token", + onLog: async () => {}, + }); + + expect(executorResult.exitCode).toBe(0); + const executorCapture = JSON.parse(await fs.readFile(executorCapturePath, "utf8")) as CapturePayload; + expect(executorCapture.prompt).toContain("execution wake role: executor"); + expect(executorCapture.prompt).toContain("You are waking because changes were requested in the execution workflow."); + expect(executorCapture.prompt).toContain("allowed actions: address_changes, resubmit"); + } finally { + if (previousHome === undefined) delete process.env.HOME; + else process.env.HOME = previousHome; + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("renders an issue-scoped wake prompt even when the wake has no comments yet", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-codex-execute-issue-wake-")); + const workspace = path.join(root, "workspace"); + const commandPath = path.join(root, "codex"); + const capturePath = path.join(root, "capture.json"); + await fs.mkdir(workspace, { recursive: true }); + await writeFakeCodexCommand(commandPath); + + const previousHome = process.env.HOME; + process.env.HOME = root; + + try { + const result = await execute({ + runId: "run-issue-wake", + agent: { + id: "agent-1", + companyId: "company-1", + name: "Codex Coder", + adapterType: "codex_local", + adapterConfig: {}, + }, + runtime: { + sessionId: null, + sessionParams: null, + sessionDisplayId: null, + taskKey: null, + }, + config: { + command: commandPath, + cwd: workspace, + env: { + PAPERCLIP_TEST_CAPTURE_PATH: capturePath, + }, + promptTemplate: "Follow the paperclip heartbeat.", + }, + context: { + issueId: "issue-1", + taskId: "issue-1", + wakeReason: "issue_assigned", + paperclipWake: { + reason: "issue_assigned", + issue: { + id: "issue-1", + identifier: "PAP-1201", + title: "Fix gallery opening for inline images", + status: "todo", + priority: "medium", + }, + commentIds: [], + latestCommentId: null, + comments: [], + commentWindow: { + requestedCount: 0, + includedCount: 0, + missingCount: 0, + }, + truncated: false, + fallbackFetchNeeded: false, + }, + }, + authToken: "run-jwt-token", + onLog: async () => {}, + }); + + expect(result.exitCode).toBe(0); + expect(result.errorMessage).toBeNull(); + + const capture = JSON.parse(await fs.readFile(capturePath, "utf8")) as CapturePayload; + expect(capture.paperclipEnvKeys).toContain("PAPERCLIP_WAKE_PAYLOAD_JSON"); + expect(capture.paperclipWakePayloadJson).not.toBeNull(); + expect(JSON.parse(capture.paperclipWakePayloadJson ?? "{}")).toMatchObject({ + reason: "issue_assigned", + issue: { + identifier: "PAP-1201", + title: "Fix gallery opening for inline images", + status: "todo", + priority: "medium", + }, + commentIds: [], + }); + expect(capture.prompt).toContain("## Paperclip Wake Payload"); + expect(capture.prompt).toContain("Do not switch to another issue until you have handled this wake."); + expect(capture.prompt).toContain("- issue: PAP-1201 Fix gallery opening for inline images"); + expect(capture.prompt).toContain("- pending comments: 0/0"); + expect(capture.prompt).toContain("- issue status: todo"); + } finally { + if (previousHome === undefined) delete process.env.HOME; + else process.env.HOME = previousHome; + await fs.rm(root, { recursive: true, force: true }); + } + }); + it("uses a compact wake delta instead of the full heartbeat prompt when resuming a session", async () => { const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-codex-execute-resume-wake-")); const workspace = path.join(root, "workspace"); diff --git a/server/src/__tests__/heartbeat-workspace-session.test.ts b/server/src/__tests__/heartbeat-workspace-session.test.ts index 859c8960..d97f63c7 100644 --- a/server/src/__tests__/heartbeat-workspace-session.test.ts +++ b/server/src/__tests__/heartbeat-workspace-session.test.ts @@ -272,6 +272,18 @@ describe("shouldResetTaskSessionForWake", () => { expect(shouldResetTaskSessionForWake({ wakeReason: "issue_assigned" })).toBe(true); }); + it("resets session context on execution review wakes", () => { + expect(shouldResetTaskSessionForWake({ wakeReason: "execution_review_requested" })).toBe(true); + }); + + it("resets session context on execution approval wakes", () => { + expect(shouldResetTaskSessionForWake({ wakeReason: "execution_approval_requested" })).toBe(true); + }); + + it("resets session context on execution changes-requested wakes", () => { + expect(shouldResetTaskSessionForWake({ wakeReason: "execution_changes_requested" })).toBe(true); + }); + it("preserves session context on timer heartbeats", () => { expect(shouldResetTaskSessionForWake({ wakeSource: "timer" })).toBe(false); }); diff --git a/server/src/__tests__/issue-comment-reopen-routes.test.ts b/server/src/__tests__/issue-comment-reopen-routes.test.ts index 2594e36e..135d8066 100644 --- a/server/src/__tests__/issue-comment-reopen-routes.test.ts +++ b/server/src/__tests__/issue-comment-reopen-routes.test.ts @@ -7,6 +7,7 @@ import { normalizeIssueExecutionPolicy } from "../services/issue-execution-polic const mockIssueService = vi.hoisted(() => ({ getById: vi.fn(), + assertCheckoutOwner: vi.fn(), update: vi.fn(), addComment: vi.fn(), findMentionedAgents: vi.fn(), @@ -75,8 +76,12 @@ vi.mock("../services/index.js", () => ({ function createApp() { const app = express(); app.use(express.json()); + return app; +} + +function installActor(app: express.Express, actor?: Record) { app.use((req, _res, next) => { - (req as any).actor = { + (req as any).actor = actor ?? { type: "board", userId: "local-board", companyIds: ["company-1"], @@ -119,6 +124,10 @@ describe("issue comment reopen routes", () => { mockIssueService.findMentionedAgents.mockResolvedValue([]); mockIssueService.listWakeableBlockedDependents.mockResolvedValue([]); mockIssueService.getWakeableParentAfterChildCompletion.mockResolvedValue(null); + mockIssueService.assertCheckoutOwner.mockResolvedValue({ adoptedFromRunId: null }); + mockAccessService.canUser.mockResolvedValue(false); + mockAccessService.hasPermission.mockResolvedValue(false); + mockAgentService.getById.mockResolvedValue(null); }); it("treats reopen=true as a no-op when the issue is already open", async () => { @@ -128,7 +137,7 @@ describe("issue comment reopen routes", () => { ...patch, })); - const res = await request(createApp()) + const res = await request(installActor(createApp())) .patch("/api/issues/11111111-1111-4111-8111-111111111111") .send({ comment: "hello", reopen: true, assigneeAgentId: "33333333-3333-4333-8333-333333333333" }); @@ -157,7 +166,7 @@ describe("issue comment reopen routes", () => { ...patch, })); - const res = await request(createApp()) + const res = await request(installActor(createApp())) .patch("/api/issues/11111111-1111-4111-8111-111111111111") .send({ comment: "hello", reopen: true, assigneeAgentId: "33333333-3333-4333-8333-333333333333" }); @@ -207,7 +216,7 @@ describe("issue comment reopen routes", () => { status: "cancelled", }); - const res = await request(createApp()) + const res = await request(installActor(createApp())) .patch("/api/issues/11111111-1111-4111-8111-111111111111") .send({ comment: "hello", interrupt: true, assigneeAgentId: "33333333-3333-4333-8333-333333333333" }); @@ -265,7 +274,7 @@ describe("issue comment reopen routes", () => { _tx: tx, })); - const res = await request(createApp()) + const res = await request(installActor(createApp())) .patch("/api/issues/11111111-1111-4111-8111-111111111111") .send({ status: "done", comment: "Approved for ship" }); @@ -294,4 +303,146 @@ describe("issue comment reopen routes", () => { }), ); }); + + it("coerces executor handoff patches into workflow-controlled review wakes", async () => { + const policy = normalizeIssueExecutionPolicy({ + stages: [ + { + id: "aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa", + type: "review", + participants: [{ type: "agent", agentId: "33333333-3333-4333-8333-333333333333" }], + }, + ], + })!; + const issue = { + ...makeIssue("todo"), + status: "in_progress", + assigneeAgentId: "22222222-2222-4222-8222-222222222222", + executionPolicy: policy, + executionState: null, + }; + mockIssueService.getById.mockResolvedValue(issue); + mockIssueService.update.mockImplementation(async (_id: string, patch: Record) => ({ + ...issue, + ...patch, + updatedAt: new Date(), + })); + + const res = await request( + installActor(createApp(), { + type: "agent", + agentId: "22222222-2222-4222-8222-222222222222", + companyId: "company-1", + runId: "run-1", + }), + ) + .patch("/api/issues/11111111-1111-4111-8111-111111111111") + .send({ + status: "in_review", + assigneeAgentId: null, + assigneeUserId: "local-board", + }); + + expect(res.status).toBe(200); + expect(mockIssueService.update).toHaveBeenCalledWith( + "11111111-1111-4111-8111-111111111111", + expect.objectContaining({ + status: "in_review", + assigneeAgentId: "33333333-3333-4333-8333-333333333333", + assigneeUserId: null, + executionState: expect.objectContaining({ + status: "pending", + currentStageType: "review", + currentParticipant: expect.objectContaining({ + type: "agent", + agentId: "33333333-3333-4333-8333-333333333333", + }), + returnAssignee: expect.objectContaining({ + type: "agent", + agentId: "22222222-2222-4222-8222-222222222222", + }), + }), + }), + ); + expect(mockHeartbeatService.wakeup).toHaveBeenCalledWith( + "33333333-3333-4333-8333-333333333333", + expect.objectContaining({ + reason: "execution_review_requested", + payload: expect.objectContaining({ + issueId: "11111111-1111-4111-8111-111111111111", + executionStage: expect.objectContaining({ + wakeRole: "reviewer", + stageType: "review", + allowedActions: ["approve", "request_changes"], + }), + }), + }), + ); + }); + + it("wakes the return assignee with execution_changes_requested", async () => { + const policy = normalizeIssueExecutionPolicy({ + stages: [ + { + id: "aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa", + type: "review", + participants: [{ type: "agent", agentId: "33333333-3333-4333-8333-333333333333" }], + }, + ], + })!; + const issue = { + ...makeIssue("todo"), + status: "in_review", + assigneeAgentId: "33333333-3333-4333-8333-333333333333", + executionPolicy: policy, + executionState: { + status: "pending", + currentStageId: policy.stages[0].id, + currentStageIndex: 0, + currentStageType: "review", + currentParticipant: { type: "agent", agentId: "33333333-3333-4333-8333-333333333333" }, + returnAssignee: { type: "agent", agentId: "22222222-2222-4222-8222-222222222222" }, + completedStageIds: [], + lastDecisionId: null, + lastDecisionOutcome: null, + }, + }; + mockIssueService.getById.mockResolvedValue(issue); + mockIssueService.update.mockImplementation(async (_id: string, patch: Record) => ({ + ...issue, + ...patch, + updatedAt: new Date(), + })); + + const res = await request( + installActor(createApp(), { + type: "agent", + agentId: "33333333-3333-4333-8333-333333333333", + companyId: "company-1", + runId: "run-2", + }), + ) + .patch("/api/issues/11111111-1111-4111-8111-111111111111") + .send({ + status: "in_progress", + comment: "Needs another pass", + }); + + expect(res.status).toBe(200); + expect(mockHeartbeatService.wakeup).toHaveBeenCalledWith( + "22222222-2222-4222-8222-222222222222", + expect.objectContaining({ + reason: "execution_changes_requested", + payload: expect.objectContaining({ + issueId: "11111111-1111-4111-8111-111111111111", + executionStage: expect.objectContaining({ + wakeRole: "executor", + stageType: "review", + lastDecisionOutcome: "changes_requested", + allowedActions: ["address_changes", "resubmit"], + }), + }), + }), + ); + }); }); diff --git a/server/src/__tests__/issue-execution-policy.test.ts b/server/src/__tests__/issue-execution-policy.test.ts index aedc4305..9f5fb80b 100644 --- a/server/src/__tests__/issue-execution-policy.test.ts +++ b/server/src/__tests__/issue-execution-policy.test.ts @@ -413,33 +413,45 @@ describe("issue execution policy transitions", () => { const policy = twoStagePolicy(); const reviewStageId = policy.stages[0].id; - it("non-participant cannot advance stage via status change", () => { - expect(() => - applyIssueExecutionPolicyTransition({ - issue: { - status: "in_review", - assigneeAgentId: qaAgentId, - assigneeUserId: null, - executionPolicy: policy, - executionState: { - status: "pending", - currentStageId: reviewStageId, - currentStageIndex: 0, - currentStageType: "review", - currentParticipant: { type: "agent", agentId: qaAgentId }, - returnAssignee: { type: "agent", agentId: coderAgentId }, - completedStageIds: [], - lastDecisionId: null, - lastDecisionOutcome: null, - }, + it("non-participant stage updates are coerced back to the active stage", () => { + const result = applyIssueExecutionPolicyTransition({ + issue: { + status: "in_review", + assigneeAgentId: qaAgentId, + assigneeUserId: null, + executionPolicy: policy, + executionState: { + status: "pending", + currentStageId: reviewStageId, + currentStageIndex: 0, + currentStageType: "review", + currentParticipant: { type: "agent", agentId: qaAgentId }, + returnAssignee: { type: "agent", agentId: coderAgentId }, + completedStageIds: [], + lastDecisionId: null, + lastDecisionOutcome: null, }, - policy, - requestedStatus: "done", - requestedAssigneePatch: {}, - actor: { agentId: coderAgentId }, - commentBody: "Trying to bypass review", - }), - ).toThrow("Only the active reviewer or approver can advance"); + }, + policy, + requestedStatus: "done", + requestedAssigneePatch: { assigneeUserId: boardUserId }, + actor: { agentId: coderAgentId }, + commentBody: "Trying to bypass review", + }); + + expect(result.patch).toMatchObject({ + status: "in_review", + assigneeAgentId: qaAgentId, + assigneeUserId: null, + executionState: { + status: "pending", + currentStageId: reviewStageId, + currentStageType: "review", + currentParticipant: { type: "agent", agentId: qaAgentId }, + returnAssignee: { type: "agent", agentId: coderAgentId }, + }, + }); + expect(result.decision).toBeUndefined(); }); it("non-participant can still post non-advancing updates", () => { @@ -663,6 +675,7 @@ describe("issue execution policy transitions", () => { describe("no-op transitions", () => { const policy = twoStagePolicy(); + const reviewStageId = policy.stages[0].id; it("non-done status change without review context is a no-op", () => { const result = applyIssueExecutionPolicyTransition({ @@ -682,6 +695,72 @@ describe("issue execution policy transitions", () => { expect(result.patch).toEqual({}); }); + it("coerces a malformed executor in_review patch into the first policy stage", () => { + const result = applyIssueExecutionPolicyTransition({ + issue: { + status: "in_progress", + assigneeAgentId: coderAgentId, + assigneeUserId: null, + executionPolicy: policy, + executionState: null, + }, + policy, + requestedStatus: "in_review", + requestedAssigneePatch: { assigneeUserId: boardUserId }, + actor: { agentId: coderAgentId }, + }); + + expect(result.patch).toMatchObject({ + status: "in_review", + assigneeAgentId: qaAgentId, + assigneeUserId: null, + executionState: { + status: "pending", + currentStageType: "review", + currentParticipant: { type: "agent", agentId: qaAgentId }, + returnAssignee: { type: "agent", agentId: coderAgentId }, + }, + }); + }); + + it("reasserts the active stage when issue status drifted out of in_review", () => { + const result = applyIssueExecutionPolicyTransition({ + issue: { + status: "in_progress", + assigneeAgentId: coderAgentId, + assigneeUserId: null, + executionPolicy: policy, + executionState: { + status: "pending", + currentStageId: reviewStageId, + currentStageIndex: 0, + currentStageType: "review", + currentParticipant: { type: "agent", agentId: qaAgentId }, + returnAssignee: { type: "agent", agentId: coderAgentId }, + completedStageIds: [], + lastDecisionId: null, + lastDecisionOutcome: null, + }, + }, + policy, + requestedStatus: "in_progress", + requestedAssigneePatch: { assigneeAgentId: coderAgentId }, + actor: { agentId: coderAgentId }, + }); + + expect(result.patch).toMatchObject({ + status: "in_review", + assigneeAgentId: qaAgentId, + assigneeUserId: null, + executionState: { + status: "pending", + currentStageId: reviewStageId, + currentStageType: "review", + currentParticipant: { type: "agent", agentId: qaAgentId }, + }, + }); + }); + it("no policy and no state is a no-op", () => { const result = applyIssueExecutionPolicyTransition({ issue: { diff --git a/server/src/routes/issues.ts b/server/src/routes/issues.ts index d133e5fe..f72b19cd 100644 --- a/server/src/routes/issues.ts +++ b/server/src/routes/issues.ts @@ -56,13 +56,149 @@ import { SVG_CONTENT_TYPE, } from "../attachment-types.js"; import { queueIssueAssignmentWakeup } from "../services/issue-assignment-wakeup.js"; -import { applyIssueExecutionPolicyTransition, normalizeIssueExecutionPolicy } from "../services/issue-execution-policy.js"; +import { + applyIssueExecutionPolicyTransition, + normalizeIssueExecutionPolicy, + parseIssueExecutionState, +} from "../services/issue-execution-policy.js"; const MAX_ISSUE_COMMENT_LIMIT = 500; const updateIssueRouteSchema = updateIssueSchema.extend({ interrupt: z.boolean().optional(), }); +type ParsedExecutionState = NonNullable>; +type ExecutionStageWakeContext = { + wakeRole: "reviewer" | "approver" | "executor"; + stageId: string | null; + stageType: ParsedExecutionState["currentStageType"]; + currentParticipant: ParsedExecutionState["currentParticipant"]; + returnAssignee: ParsedExecutionState["returnAssignee"]; + lastDecisionOutcome: ParsedExecutionState["lastDecisionOutcome"]; + allowedActions: string[]; +}; + +function executionPrincipalsEqual( + left: ParsedExecutionState["currentParticipant"] | null, + right: ParsedExecutionState["currentParticipant"] | null, +) { + if (!left || !right || left.type !== right.type) return false; + return left.type === "agent" ? left.agentId === right.agentId : left.userId === right.userId; +} + +function buildExecutionStageWakeContext(input: { + state: ParsedExecutionState; + wakeRole: ExecutionStageWakeContext["wakeRole"]; + allowedActions: string[]; +}): ExecutionStageWakeContext { + return { + wakeRole: input.wakeRole, + stageId: input.state.currentStageId, + stageType: input.state.currentStageType, + currentParticipant: input.state.currentParticipant, + returnAssignee: input.state.returnAssignee, + lastDecisionOutcome: input.state.lastDecisionOutcome, + allowedActions: input.allowedActions, + }; +} + +function buildExecutionStageWakeup(input: { + issueId: string; + previousState: ParsedExecutionState | null; + nextState: ParsedExecutionState | null; + interruptedRunId: string | null; + requestedByActorType: "user" | "agent"; + requestedByActorId: string; +}) { + const { issueId, previousState, nextState, interruptedRunId } = input; + if (!nextState) return null; + + if (nextState.status === "pending") { + const agentId = + nextState.currentParticipant?.type === "agent" ? (nextState.currentParticipant.agentId ?? null) : null; + const stageChanged = + previousState?.status !== "pending" || + previousState?.currentStageId !== nextState.currentStageId || + !executionPrincipalsEqual(previousState?.currentParticipant ?? null, nextState.currentParticipant ?? null); + if (!agentId || !stageChanged) return null; + + const reason = + nextState.currentStageType === "approval" ? "execution_approval_requested" : "execution_review_requested"; + const executionStage = buildExecutionStageWakeContext({ + state: nextState, + wakeRole: nextState.currentStageType === "approval" ? "approver" : "reviewer", + allowedActions: ["approve", "request_changes"], + }); + + return { + agentId, + wakeup: { + source: "assignment" as const, + triggerDetail: "system" as const, + reason, + payload: { + issueId, + mutation: "update", + executionStage, + ...(interruptedRunId ? { interruptedRunId } : {}), + }, + requestedByActorType: input.requestedByActorType, + requestedByActorId: input.requestedByActorId, + contextSnapshot: { + issueId, + taskId: issueId, + wakeReason: reason, + source: "issue.execution_stage", + executionStage, + ...(interruptedRunId ? { interruptedRunId } : {}), + }, + }, + }; + } + + if (nextState.status === "changes_requested") { + const agentId = nextState.returnAssignee?.type === "agent" ? (nextState.returnAssignee.agentId ?? null) : null; + const becameChangesRequested = + previousState?.status !== "changes_requested" || + previousState?.lastDecisionId !== nextState.lastDecisionId || + !executionPrincipalsEqual(previousState?.returnAssignee ?? null, nextState.returnAssignee ?? null); + if (!agentId || !becameChangesRequested) return null; + + const executionStage = buildExecutionStageWakeContext({ + state: nextState, + wakeRole: "executor", + allowedActions: ["address_changes", "resubmit"], + }); + + return { + agentId, + wakeup: { + source: "assignment" as const, + triggerDetail: "system" as const, + reason: "execution_changes_requested", + payload: { + issueId, + mutation: "update", + executionStage, + ...(interruptedRunId ? { interruptedRunId } : {}), + }, + requestedByActorType: input.requestedByActorType, + requestedByActorId: input.requestedByActorId, + contextSnapshot: { + issueId, + taskId: issueId, + wakeReason: "execution_changes_requested", + source: "issue.execution_stage", + executionStage, + ...(interruptedRunId ? { interruptedRunId } : {}), + }, + }, + }; + } + + return null; +} + export function issueRoutes( db: Db, storage: StorageService, @@ -1110,24 +1246,6 @@ export function issueRoutes( return; } assertCompanyAccess(req, existing.companyId); - const assigneeWillChange = - (req.body.assigneeAgentId !== undefined && req.body.assigneeAgentId !== existing.assigneeAgentId) || - (req.body.assigneeUserId !== undefined && req.body.assigneeUserId !== existing.assigneeUserId); - - const isAgentReturningIssueToCreator = - req.actor.type === "agent" && - !!req.actor.agentId && - existing.assigneeAgentId === req.actor.agentId && - req.body.assigneeAgentId === null && - typeof req.body.assigneeUserId === "string" && - !!existing.createdByUserId && - req.body.assigneeUserId === existing.createdByUserId; - - if (assigneeWillChange) { - if (!isAgentReturningIssueToCreator) { - await assertCanAssignTasks(req, existing.companyId); - } - } if (!(await assertAgentRunCheckoutOwnership(req, res, existing))) return; const actor = getActorInfo(req); @@ -1224,6 +1342,27 @@ export function issueRoutes( } Object.assign(updateFields, transition.patch); + const nextAssigneeAgentId = + updateFields.assigneeAgentId === undefined ? existing.assigneeAgentId : (updateFields.assigneeAgentId as string | null); + const nextAssigneeUserId = + updateFields.assigneeUserId === undefined ? existing.assigneeUserId : (updateFields.assigneeUserId as string | null); + const assigneeWillChange = + nextAssigneeAgentId !== existing.assigneeAgentId || nextAssigneeUserId !== existing.assigneeUserId; + const isAgentReturningIssueToCreator = + req.actor.type === "agent" && + !!req.actor.agentId && + existing.assigneeAgentId === req.actor.agentId && + nextAssigneeAgentId === null && + typeof nextAssigneeUserId === "string" && + !!existing.createdByUserId && + nextAssigneeUserId === existing.createdByUserId; + + if (assigneeWillChange && !transition.workflowControlledAssignment) { + if (!isAgentReturningIssueToCreator) { + await assertCanAssignTasks(req, existing.companyId); + } + } + let issue; try { if (transition.decision && decisionId) { @@ -1414,6 +1553,16 @@ export function issueRoutes( existing.status === "backlog" && issue.status !== "backlog" && req.body.status !== undefined; + const previousExecutionState = parseIssueExecutionState(existing.executionState); + const nextExecutionState = parseIssueExecutionState(issue.executionState); + const executionStageWakeup = buildExecutionStageWakeup({ + issueId: issue.id, + previousState: previousExecutionState, + nextState: nextExecutionState, + interruptedRunId, + requestedByActorType: actor.actorType, + requestedByActorId: actor.actorId, + }); // Merge all wakeups from this update into one enqueue per agent to avoid duplicate runs. void (async () => { @@ -1427,7 +1576,9 @@ export function issueRoutes( wakeups.set(`${agentId}:${wakeIssueId}`, { agentId, wakeup }); }; - if (assigneeChanged && issue.assigneeAgentId && issue.status !== "backlog") { + if (executionStageWakeup) { + addWakeup(executionStageWakeup.agentId, executionStageWakeup.wakeup); + } else if (assigneeChanged && issue.assigneeAgentId && issue.status !== "backlog") { addWakeup(issue.assigneeAgentId, { source: "assignment", triggerDetail: "system", diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 32dc4913..d94922a0 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -696,7 +696,14 @@ export function shouldResetTaskSessionForWake( if (contextSnapshot?.forceFreshSession === true) return true; const wakeReason = readNonEmptyString(contextSnapshot?.wakeReason); - if (wakeReason === "issue_assigned") return true; + if ( + wakeReason === "issue_assigned" || + wakeReason === "execution_review_requested" || + wakeReason === "execution_approval_requested" || + wakeReason === "execution_changes_requested" + ) { + return true; + } return false; } @@ -714,6 +721,9 @@ function describeSessionResetReason( const wakeReason = readNonEmptyString(contextSnapshot?.wakeReason); if (wakeReason === "issue_assigned") return "wake reason is issue_assigned"; + if (wakeReason === "execution_review_requested") return "wake reason is execution_review_requested"; + if (wakeReason === "execution_approval_requested") return "wake reason is execution_approval_requested"; + if (wakeReason === "execution_changes_requested") return "wake reason is execution_changes_requested"; return null; } @@ -867,9 +877,8 @@ async function buildPaperclipWakePayload(input: { } | null; }) { + const executionStage = parseObject(input.contextSnapshot.executionStage); const commentIds = extractWakeCommentIds(input.contextSnapshot); - if (commentIds.length === 0) return null; - const issueId = readNonEmptyString(input.contextSnapshot.issueId); const issueSummary = input.issueSummary ?? @@ -886,23 +895,27 @@ async function buildPaperclipWakePayload(input: { .where(and(eq(issues.id, issueId), eq(issues.companyId, input.companyId))) .then((rows) => rows[0] ?? null) : null); + if (commentIds.length === 0 && Object.keys(executionStage).length === 0 && !issueSummary) return null; - const commentRows = await input.db - .select({ - id: issueComments.id, - issueId: issueComments.issueId, - body: issueComments.body, - authorAgentId: issueComments.authorAgentId, - authorUserId: issueComments.authorUserId, - createdAt: issueComments.createdAt, - }) - .from(issueComments) - .where( - and( - eq(issueComments.companyId, input.companyId), - inArray(issueComments.id, commentIds), - ), - ); + const commentRows = + commentIds.length === 0 + ? [] + : await input.db + .select({ + id: issueComments.id, + issueId: issueComments.issueId, + body: issueComments.body, + authorAgentId: issueComments.authorAgentId, + authorUserId: issueComments.authorUserId, + createdAt: issueComments.createdAt, + }) + .from(issueComments) + .where( + and( + eq(issueComments.companyId, input.companyId), + inArray(issueComments.id, commentIds), + ), + ); const commentsById = new Map(commentRows.map((comment) => [comment.id, comment])); const comments: Array> = []; @@ -959,6 +972,7 @@ async function buildPaperclipWakePayload(input: { priority: issueSummary.priority, } : null, + executionStage: Object.keys(executionStage).length > 0 ? executionStage : null, commentIds, latestCommentId: commentIds[commentIds.length - 1] ?? null, comments, diff --git a/server/src/services/issue-execution-policy.ts b/server/src/services/issue-execution-policy.ts index 86de20e4..3bc21c03 100644 --- a/server/src/services/issue-execution-policy.ts +++ b/server/src/services/issue-execution-policy.ts @@ -36,6 +36,7 @@ type TransitionInput = { type TransitionResult = { patch: Record; decision?: Pick; + workflowControlledAssignment?: boolean; }; const COMPLETED_STATUS: IssueExecutionState["status"] = "completed"; @@ -198,14 +199,36 @@ function buildChangesRequestedState(previous: IssueExecutionState, currentStage: }; } +function buildPendingStagePatch(input: { + patch: Record; + previous: IssueExecutionState | null; + policy: IssueExecutionPolicy; + stage: IssueExecutionStage; + participant: IssueExecutionStagePrincipal; + returnAssignee: IssueExecutionStagePrincipal | null; +}) { + input.patch.status = "in_review"; + Object.assign(input.patch, patchForPrincipal(input.participant)); + input.patch.executionState = buildPendingState({ + previous: input.previous, + stage: input.stage, + stageIndex: input.policy.stages.findIndex((candidate) => candidate.id === input.stage.id), + participant: input.participant, + returnAssignee: input.returnAssignee, + }); +} + export function applyIssueExecutionPolicyTransition(input: TransitionInput): TransitionResult { const patch: Record = {}; const existingState = parseIssueExecutionState(input.issue.executionState); const currentAssignee = assigneePrincipal(input.issue); const actor = actorPrincipal(input.actor); + const requestedAssigneePatchProvided = + input.requestedAssigneePatch.assigneeAgentId !== undefined || input.requestedAssigneePatch.assigneeUserId !== undefined; const explicitAssignee = assigneePrincipal(input.requestedAssigneePatch); const currentStage = input.policy ? findStageById(input.policy, existingState?.currentStageId) : null; const requestedStatus = input.requestedStatus; + const activeStage = currentStage && existingState?.status === PENDING_STATUS ? currentStage : null; if (!input.policy) { if (existingState) { @@ -228,90 +251,121 @@ export function applyIssueExecutionPolicyTransition(input: TransitionInput): Tra return { patch }; } - if (currentStage && input.issue.status === "in_review") { - if (!principalsEqual(existingState?.currentParticipant ?? null, actor)) { - if (requestedStatus && requestedStatus !== "in_review") { - throw unprocessable("Only the active reviewer or approver can advance the current execution stage"); - } - return { patch }; + if (activeStage) { + const currentParticipant = + existingState?.currentParticipant ?? + selectStageParticipant(activeStage, { + exclude: existingState?.returnAssignee ?? null, + }); + if (!currentParticipant) { + throw unprocessable(`No eligible ${activeStage.type} participant is configured for this issue`); } - if (requestedStatus === "done") { - if (!input.commentBody?.trim()) { - throw unprocessable("Approving a review or approval stage requires a comment"); - } - const approvedState = buildCompletedState(existingState, currentStage); - const nextStage = nextPendingStage( - input.policy, - { ...approvedState, completedStageIds: approvedState.completedStageIds }, - ); + if (principalsEqual(currentParticipant, actor)) { + if (requestedStatus === "done") { + if (!input.commentBody?.trim()) { + throw unprocessable("Approving a review or approval stage requires a comment"); + } + const approvedState = buildCompletedState(existingState, activeStage); + const nextStage = nextPendingStage( + input.policy, + { ...approvedState, completedStageIds: approvedState.completedStageIds }, + ); - if (!nextStage) { - patch.executionState = approvedState; + if (!nextStage) { + patch.executionState = approvedState; + return { + patch, + decision: { + stageId: activeStage.id, + stageType: activeStage.type, + outcome: "approved", + body: input.commentBody.trim(), + }, + }; + } + + const participant = selectStageParticipant(nextStage, { + preferred: explicitAssignee, + exclude: existingState?.returnAssignee ?? null, + }); + if (!participant) { + throw unprocessable(`No eligible ${nextStage.type} participant is configured for this issue`); + } + + buildPendingStagePatch({ + patch, + previous: approvedState, + policy: input.policy, + stage: nextStage, + participant, + returnAssignee: existingState?.returnAssignee ?? currentAssignee ?? actor, + }); return { patch, decision: { - stageId: currentStage.id, - stageType: currentStage.type, + stageId: activeStage.id, + stageType: activeStage.type, outcome: "approved", body: input.commentBody.trim(), }, + workflowControlledAssignment: true, }; } - const participant = selectStageParticipant(nextStage, { - preferred: explicitAssignee, - exclude: existingState?.returnAssignee ?? null, - }); - if (!participant) { - throw unprocessable(`No eligible ${nextStage.type} participant is configured for this issue`); + if (requestedStatus && requestedStatus !== "in_review") { + if (!input.commentBody?.trim()) { + throw unprocessable("Requesting changes requires a comment"); + } + if (!existingState?.returnAssignee) { + throw unprocessable("This execution stage has no return assignee"); + } + patch.status = "in_progress"; + Object.assign(patch, patchForPrincipal(existingState.returnAssignee)); + patch.executionState = buildChangesRequestedState(existingState, activeStage); + return { + patch, + decision: { + stageId: activeStage.id, + stageType: activeStage.type, + outcome: "changes_requested", + body: input.commentBody.trim(), + }, + workflowControlledAssignment: true, + }; } - - patch.status = "in_review"; - Object.assign(patch, patchForPrincipal(participant)); - patch.executionState = buildPendingState({ - previous: approvedState, - stage: nextStage, - stageIndex: input.policy.stages.findIndex((stage) => stage.id === nextStage.id), - participant, - returnAssignee: existingState?.returnAssignee ?? currentAssignee, - }); - return { - patch, - decision: { - stageId: currentStage.id, - stageType: currentStage.type, - outcome: "approved", - body: input.commentBody.trim(), - }, - }; } - if (requestedStatus && requestedStatus !== "in_review") { - if (!input.commentBody?.trim()) { - throw unprocessable("Requesting changes requires a comment"); - } - if (!existingState?.returnAssignee) { - throw unprocessable("This execution stage has no return assignee"); - } - patch.status = "in_progress"; - Object.assign(patch, patchForPrincipal(existingState.returnAssignee)); - patch.executionState = buildChangesRequestedState(existingState, currentStage); + if ( + input.issue.status !== "in_review" || + !principalsEqual(currentAssignee, currentParticipant) || + !principalsEqual(existingState?.currentParticipant ?? null, currentParticipant) || + (requestedStatus !== undefined && requestedStatus !== "in_review") || + (requestedAssigneePatchProvided && !principalsEqual(explicitAssignee, currentParticipant)) + ) { + buildPendingStagePatch({ + patch, + previous: existingState, + policy: input.policy, + stage: activeStage, + participant: currentParticipant, + returnAssignee: existingState?.returnAssignee ?? currentAssignee ?? actor, + }); return { patch, - decision: { - stageId: currentStage.id, - stageType: currentStage.type, - outcome: "changes_requested", - body: input.commentBody.trim(), - }, + workflowControlledAssignment: true, }; } return { patch }; } - if (requestedStatus !== "done") { + const shouldStartWorkflow = + requestedStatus === "done" || + requestedStatus === "in_review" || + (input.issue.status === "in_review" && existingState == null); + + if (!shouldStartWorkflow) { return { patch }; } @@ -333,14 +387,16 @@ export function applyIssueExecutionPolicyTransition(input: TransitionInput): Tra throw unprocessable(`No eligible ${pendingStage.type} participant is configured for this issue`); } - patch.status = "in_review"; - Object.assign(patch, patchForPrincipal(participant)); - patch.executionState = buildPendingState({ + buildPendingStagePatch({ + patch, previous: existingState, + policy: input.policy, stage: pendingStage, - stageIndex: input.policy.stages.findIndex((stage) => stage.id === pendingStage.id), participant, returnAssignee, }); - return { patch }; + return { + patch, + workflowControlledAssignment: true, + }; }