diff --git a/server/src/__tests__/agent-live-run-routes.test.ts b/server/src/__tests__/agent-live-run-routes.test.ts index 0bfa209c..2ac8658d 100644 --- a/server/src/__tests__/agent-live-run-routes.test.ts +++ b/server/src/__tests__/agent-live-run-routes.test.ts @@ -10,7 +10,6 @@ const mockHeartbeatService = vi.hoisted(() => ({ buildRunOutputSilence: vi.fn(), getRunIssueSummary: vi.fn(), getActiveRunIssueSummaryForAgent: vi.fn(), - buildRunOutputSilence: vi.fn(), getRunLogAccess: vi.fn(), readLog: vi.fn(), })); @@ -71,7 +70,7 @@ function registerModuleMocks() { })); } -async function createApp() { +async function createApp(db: Record = {}) { const [{ agentRoutes }, { errorHandler }] = await Promise.all([ vi.importActual("../routes/agents.js"), vi.importActual("../middleware/index.js"), @@ -88,11 +87,32 @@ async function createApp() { }; next(); }); - app.use("/api", agentRoutes({} as any)); + app.use("/api", agentRoutes(db as any)); app.use(errorHandler); return app; } +function createLiveRunsDbStub(rows: Array>) { + const limit = vi.fn(async (value: number) => rows.slice(0, value)); + const orderedQuery = { + limit, + then: (resolve: (value: Array>) => unknown) => Promise.resolve(rows).then(resolve), + }; + const query = { + from: vi.fn().mockReturnThis(), + innerJoin: vi.fn().mockReturnThis(), + where: vi.fn().mockReturnThis(), + orderBy: vi.fn().mockReturnValue(orderedQuery), + }; + + return { + db: { + select: vi.fn().mockReturnValue(query), + }, + limit, + }; +} + async function requestApp( app: express.Express, buildRequest: (baseUrl: string) => request.Test, @@ -284,4 +304,81 @@ describe("agent live run routes", () => { nextOffset: 5, }); }); + + it("caps company live run polling by default", async () => { + const rows = Array.from({ length: 75 }, (_, index) => ({ + id: `run-${index}`, + companyId: "company-1", + status: "running", + invocationSource: "on_demand", + triggerDetail: "manual", + startedAt: new Date("2026-04-10T09:30:00.000Z"), + finishedAt: null, + createdAt: new Date(`2026-04-10T09:${String(index % 60).padStart(2, "0")}:00.000Z`), + agentId: "agent-1", + agentName: "Builder", + adapterType: "codex_local", + logBytes: 0, + livenessState: "healthy", + livenessReason: null, + continuationAttempt: 0, + lastUsefulActionAt: null, + nextAction: null, + lastOutputAt: null, + lastOutputSeq: null, + lastOutputStream: null, + lastOutputBytes: 0, + processStartedAt: null, + issueId: "issue-1", + })); + const { db, limit } = createLiveRunsDbStub(rows); + + const res = await requestApp( + await createApp(db), + (baseUrl) => request(baseUrl).get("/api/companies/company-1/live-runs"), + ); + + expect(res.status, JSON.stringify(res.body)).toBe(200); + expect(limit).toHaveBeenCalledWith(50); + expect(res.body).toHaveLength(50); + expect(mockHeartbeatService.buildRunOutputSilence).toHaveBeenCalledTimes(50); + }); + + it("treats explicit zero live run limits as the capped default", async () => { + const rows = Array.from({ length: 75 }, (_, index) => ({ + id: `run-${index}`, + companyId: "company-1", + status: "running", + invocationSource: "on_demand", + triggerDetail: "manual", + startedAt: new Date("2026-04-10T09:30:00.000Z"), + finishedAt: null, + createdAt: new Date(`2026-04-10T09:${String(index % 60).padStart(2, "0")}:00.000Z`), + agentId: "agent-1", + agentName: "Builder", + adapterType: "codex_local", + logBytes: 0, + livenessState: "healthy", + livenessReason: null, + continuationAttempt: 0, + lastUsefulActionAt: null, + nextAction: null, + lastOutputAt: null, + lastOutputSeq: null, + lastOutputStream: null, + lastOutputBytes: 0, + processStartedAt: null, + issueId: "issue-1", + })); + const { db, limit } = createLiveRunsDbStub(rows); + + const res = await requestApp( + await createApp(db), + (baseUrl) => request(baseUrl).get("/api/companies/company-1/live-runs?limit=0&minCount=0"), + ); + + expect(res.status, JSON.stringify(res.body)).toBe(200); + expect(limit).toHaveBeenCalledWith(50); + expect(res.body).toHaveLength(50); + }); }); diff --git a/server/src/__tests__/heartbeat-issue-liveness-escalation.test.ts b/server/src/__tests__/heartbeat-issue-liveness-escalation.test.ts index 369bb05c..10ad103c 100644 --- a/server/src/__tests__/heartbeat-issue-liveness-escalation.test.ts +++ b/server/src/__tests__/heartbeat-issue-liveness-escalation.test.ts @@ -289,10 +289,23 @@ describeEmbeddedPostgres("heartbeat issue graph liveness escalation", () => { const heartbeat = heartbeatService(db); const first = await heartbeat.reconcileIssueGraphLiveness(); - const second = await heartbeat.reconcileIssueGraphLiveness(); expect(first.escalationsCreated).toBe(1); + const [sourceAfterFirst] = await db + .select({ updatedAt: issues.updatedAt }) + .from(issues) + .where(eq(issues.id, blockedIssueId)); + const eventsAfterFirst = await db.select().from(activityLog).where(eq(activityLog.companyId, companyId)); + expect(eventsAfterFirst.filter((event) => event.action === "issue.blockers.updated")).toHaveLength(1); + + const second = await heartbeat.reconcileIssueGraphLiveness(); + expect(second.escalationsCreated).toBe(0); + const [sourceAfterSecond] = await db + .select({ updatedAt: issues.updatedAt }) + .from(issues) + .where(eq(issues.id, blockedIssueId)); + expect(sourceAfterSecond?.updatedAt.getTime()).toBe(sourceAfterFirst?.updatedAt.getTime()); const escalations = await db .select() @@ -345,7 +358,7 @@ describeEmbeddedPostgres("heartbeat issue graph liveness escalation", () => { projectWorkspaceSourceIssueId: blockerIssueId, }, }); - expect(events.some((event) => event.action === "issue.blockers.updated")).toBe(true); + expect(events.filter((event) => event.action === "issue.blockers.updated")).toHaveLength(1); }); it("skips budget-blocked direct owners and assigns recovery to the manager fallback", async () => { diff --git a/server/src/__tests__/issue-blocker-attention.test.ts b/server/src/__tests__/issue-blocker-attention.test.ts index 61192191..66df6959 100644 --- a/server/src/__tests__/issue-blocker-attention.test.ts +++ b/server/src/__tests__/issue-blocker-attention.test.ts @@ -76,6 +76,9 @@ describeEmbeddedPostgres("issue blocker attention", () => { status: string; parentId?: string | null; assigneeAgentId?: string | null; + originKind?: string | null; + originId?: string | null; + originFingerprint?: string | null; }) { const id = input.id ?? randomUUID(); await db.insert(issues).values({ @@ -87,6 +90,9 @@ describeEmbeddedPostgres("issue blocker attention", () => { priority: "medium", parentId: input.parentId ?? null, assigneeAgentId: input.assigneeAgentId ?? null, + originKind: input.originKind ?? "manual", + originId: input.originId ?? null, + originFingerprint: input.originFingerprint ?? "default", }); return id; } @@ -356,6 +362,52 @@ describeEmbeddedPostgres("issue blocker attention", () => { }); }); + it("treats open liveness escalation blockers as covered waiting paths", async () => { + const { companyId, agentId } = await createCompany("PBL"); + const parentId = await insertIssue({ companyId, identifier: "PBL-1", title: "Parent", status: "blocked" }); + const cancelledLeafId = await insertIssue({ + companyId, + identifier: "PBL-2", + title: "Cancelled blocker", + status: "cancelled", + assigneeAgentId: agentId, + }); + const incidentKey = [ + "harness_liveness", + companyId, + parentId, + "blocked_by_cancelled_issue", + cancelledLeafId, + ].join(":"); + const escalationId = await insertIssue({ + companyId, + identifier: "PBL-3", + title: "Liveness escalation", + status: "todo", + assigneeAgentId: agentId, + originKind: "harness_liveness_escalation", + originId: incidentKey, + originFingerprint: [ + "harness_liveness_leaf", + companyId, + "blocked_by_cancelled_issue", + cancelledLeafId, + ].join(":"), + }); + await block({ companyId, blockerIssueId: cancelledLeafId, blockedIssueId: parentId }); + await block({ companyId, blockerIssueId: escalationId, blockedIssueId: parentId }); + + const parent = (await svc.list(companyId, { status: "blocked,todo" })).find((issue) => issue.id === parentId); + + expect(parent?.blockerAttention).toMatchObject({ + state: "covered", + reason: "active_dependency", + unresolvedBlockerCount: 2, + coveredBlockerCount: 2, + attentionBlockerCount: 0, + }); + }); + it("does not treat a scheduled retry as actively covered work", async () => { const { companyId, agentId } = await createCompany("PBY"); const parentId = await insertIssue({ companyId, identifier: "PBY-1", title: "Parent", status: "blocked" }); diff --git a/server/src/routes/agents.ts b/server/src/routes/agents.ts index 3fb95eda..2dbe8ad8 100644 --- a/server/src/routes/agents.ts +++ b/server/src/routes/agents.ts @@ -99,7 +99,8 @@ function readRunLogLimitBytes(value: unknown) { function readLiveRunsQueryInt(value: unknown, max: number, fallback = 0) { const parsed = Number(value); if (!Number.isFinite(parsed)) return fallback; - return Math.max(0, Math.min(max, Math.trunc(parsed))); + if (parsed <= 0) return fallback; + return Math.min(max, Math.trunc(parsed)); } export function agentRoutes( @@ -2821,8 +2822,8 @@ export function agentRoutes( const companyId = req.params.companyId as string; assertCompanyAccess(req, companyId); - const minCount = readLiveRunsQueryInt(req.query.minCount, 50); - const limit = readLiveRunsQueryInt(req.query.limit, 50); + const minCount = readLiveRunsQueryInt(req.query.minCount, 50, 50); + const limit = readLiveRunsQueryInt(req.query.limit, 50, 50); const columns = { id: heartbeatRuns.id, @@ -2862,8 +2863,8 @@ export function agentRoutes( ) .orderBy(desc(heartbeatRuns.createdAt)); - const liveRuns = limit > 0 ? await liveRunsQuery.limit(limit) : await liveRunsQuery; - const targetRunCount = limit > 0 ? Math.min(minCount, limit) : minCount; + const liveRuns = await liveRunsQuery.limit(limit); + const targetRunCount = Math.min(minCount, limit); if (targetRunCount > 0 && liveRuns.length < targetRunCount) { const activeIds = liveRuns.map((r) => r.id); diff --git a/server/src/services/issues.ts b/server/src/services/issues.ts index cce66d09..8990a730 100644 --- a/server/src/services/issues.ts +++ b/server/src/services/issues.ts @@ -52,6 +52,7 @@ import { issueTreeControlService, type ActiveIssueTreePauseHoldGate, } from "./issue-tree-control.js"; +import { parseIssueGraphLivenessIncidentKey } from "./recovery/origins.js"; const ALL_ISSUE_STATUSES = ["backlog", "todo", "in_progress", "in_review", "blocked", "done", "cancelled"]; const MAX_ISSUE_COMMENT_PAGE_LIMIT = 500; @@ -1174,12 +1175,12 @@ async function listIssueBlockerAttentionMap( } } - const reviewNodeIds = [...nodesById.values()] - .filter((node) => node.status === "in_review") + const explicitWaitCandidateIds = [...nodesById.values()] + .filter((node) => node.status !== "done") .map((node) => node.id); const explicitWaitingIssueIds = new Set(); - if (reviewNodeIds.length > 0) { - for (const chunk of chunkList(reviewNodeIds, ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) { + if (explicitWaitCandidateIds.length > 0) { + for (const chunk of chunkList(explicitWaitCandidateIds, ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) { const interactionRows: Array<{ issueId: string }> = await dbOrTx .select({ issueId: issueThreadInteractions.issueId }) .from(issueThreadInteractions) @@ -1204,22 +1205,28 @@ async function listIssueBlockerAttentionMap( ), ); for (const row of approvalRows) explicitWaitingIssueIds.add(row.issueId); + } - const recoveryRows: Array<{ originId: string | null }> = await dbOrTx - .select({ originId: issues.originId }) - .from(issues) - .where( - and( - eq(issues.companyId, companyId), - eq(issues.originKind, BLOCKER_ATTENTION_OPEN_RECOVERY_ORIGIN_KIND), - isNull(issues.hiddenAt), - inArray(issues.originId, chunk), - notInArray(issues.status, BLOCKER_ATTENTION_OPEN_RECOVERY_TERMINAL_STATUSES), - ), - ); - for (const row of recoveryRows) { - if (row.originId) explicitWaitingIssueIds.add(row.originId); - } + // Recovery rows are intentionally company-wide: a liveness escalation for + // the same leaf blocker represents an active waiting path even when that + // blocker is reached through another blocked graph. + const recoveryRows: Array<{ id: string; originId: string | null }> = await dbOrTx + .select({ id: issues.id, originId: issues.originId }) + .from(issues) + .where( + and( + eq(issues.companyId, companyId), + eq(issues.originKind, BLOCKER_ATTENTION_OPEN_RECOVERY_ORIGIN_KIND), + isNull(issues.hiddenAt), + notInArray(issues.status, BLOCKER_ATTENTION_OPEN_RECOVERY_TERMINAL_STATUSES), + ), + ); + for (const row of recoveryRows) { + const parsed = parseIssueGraphLivenessIncidentKey(row.originId); + if (!parsed || parsed.companyId !== companyId) continue; + explicitWaitingIssueIds.add(row.id); + explicitWaitingIssueIds.add(parsed.issueId); + explicitWaitingIssueIds.add(parsed.leafIssueId); } } @@ -1257,8 +1264,11 @@ async function listIssueBlockerAttentionMap( if (node.status === "done") { return { covered: true, stalled: false, sampleBlockerIdentifier: nodeSample, sampleStalledBlockerIdentifier: null }; } + if (explicitWaitingIssueIds.has(node.id)) { + return { covered: true, stalled: false, sampleBlockerIdentifier: nodeSample, sampleStalledBlockerIdentifier: null }; + } if (node.status === "in_review") { - const hasWaitingPath = activeIssueIds.has(node.id) || Boolean(node.assigneeUserId) || explicitWaitingIssueIds.has(node.id); + const hasWaitingPath = activeIssueIds.has(node.id) || Boolean(node.assigneeUserId); if (hasWaitingPath) { return { covered: true, stalled: false, sampleBlockerIdentifier: nodeSample, sampleStalledBlockerIdentifier: null }; } diff --git a/server/src/services/recovery/run-liveness-continuations.ts b/server/src/services/recovery/run-liveness-continuations.ts index 1b4d2cf4..b23625c1 100644 --- a/server/src/services/recovery/run-liveness-continuations.ts +++ b/server/src/services/recovery/run-liveness-continuations.ts @@ -127,7 +127,6 @@ export function decideRunLivenessContinuation(input: { if (budgetBlocked) { return { kind: "skip", reason: "budget hard stop blocks continuation" }; } - const currentAttempt = readContinuationAttempt(run.continuationAttempt); if (currentAttempt >= maxAttempts) { return { diff --git a/server/src/services/recovery/service.ts b/server/src/services/recovery/service.ts index fc9fb7e1..ec721351 100644 --- a/server/src/services/recovery/service.ts +++ b/server/src/services/recovery/service.ts @@ -2250,10 +2250,16 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup }) }) { const blockerIds = await existingBlockerIssueIds(input.issue.companyId, input.issue.id); const nextBlockerIds = [...new Set([...blockerIds, input.escalationIssueId])]; + const isAlreadyBlockedByEscalation = blockerIds.includes(input.escalationIssueId); + const isAlreadyBlocked = input.issue.status === "blocked"; + if (isAlreadyBlockedByEscalation && isAlreadyBlocked) { + return input.issue; + } + const update: Partial & { blockedByIssueIds: string[] } = { blockedByIssueIds: nextBlockerIds, }; - if (input.issue.status !== "blocked") { + if (!isAlreadyBlocked) { update.status = "blocked"; }