forked from farhoodlabs/paperclip
[codex] Improve agent runtime recovery and governance (#4086)
## Thinking Path > - Paperclip orchestrates AI agents for zero-human companies. > - The heartbeat runtime, agent import path, and agent configuration defaults determine whether work is dispatched safely and predictably. > - Several accumulated fixes all touched agent execution recovery, wake routing, import behavior, and runtime concurrency defaults. > - Those changes need to land together so the heartbeat service and agent creation defaults stay internally consistent. > - This pull request groups the runtime/governance changes from the split branch into one standalone branch. > - The benefit is safer recovery for stranded runs, bounded high-volume reads, imported-agent approval correctness, skill-template support, and a clearer default concurrency policy. ## What Changed - Fixed stranded continuation recovery so successful automatic retries are requeued instead of incorrectly blocking the issue. - Bounded high-volume issue/log reads across issue, heartbeat, agent, project, and workspace paths. - Fixed imported-agent approval and instruction-path permission handling. - Quarantined seeded worktree execution state during worktree provisioning. - Queued approval follow-up wakes and hardened SQL_ASCII heartbeat output handling. - Added reusable agent instruction templates for hiring flows. - Set the default max concurrent agent runs to five and updated related UI/tests/docs. ## Verification - `pnpm install --frozen-lockfile` - `pnpm exec vitest run server/src/__tests__/company-portability.test.ts server/src/__tests__/heartbeat-process-recovery.test.ts server/src/__tests__/heartbeat-comment-wake-batching.test.ts server/src/__tests__/heartbeat-list.test.ts server/src/__tests__/issues-service.test.ts server/src/__tests__/agent-permissions-routes.test.ts packages/adapter-utils/src/server-utils.test.ts ui/src/lib/new-agent-runtime-config.test.ts` - Split integration check: merged this branch first, followed by the other [PAP-1614](/PAP/issues/PAP-1614) branches, with no merge conflicts. - Confirmed this branch does not include `pnpm-lock.yaml`. ## Risks - Medium risk: touches heartbeat recovery, queueing, and issue list bounds in central runtime paths. - Imported-agent and concurrency default behavior changes may affect existing automation that assumes one-at-a-time default runs. - No database migrations are included. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex, GPT-5.4 tool-enabled coding model, agentic code-editing/runtime with local shell and GitHub CLI access; exact context window and reasoning mode are not exposed by the Paperclip harness. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -34,6 +34,7 @@ const mockAgentService = vi.hoisted(() => ({
|
||||
getById: vi.fn(),
|
||||
list: vi.fn(),
|
||||
create: vi.fn(),
|
||||
activatePendingApproval: vi.fn(),
|
||||
updatePermissions: vi.fn(),
|
||||
getChainOfCommand: vi.fn(),
|
||||
resolveByReference: vi.fn(),
|
||||
@@ -108,6 +109,7 @@ function registerModuleMocks() {
|
||||
companySkillService: () => mockCompanySkillService,
|
||||
budgetService: () => mockBudgetService,
|
||||
heartbeatService: () => mockHeartbeatService,
|
||||
ISSUE_LIST_DEFAULT_LIMIT: 500,
|
||||
issueApprovalService: () => mockIssueApprovalService,
|
||||
issueService: () => mockIssueService,
|
||||
logActivity: mockLogActivity,
|
||||
@@ -166,6 +168,7 @@ describe("agent permission routes", () => {
|
||||
mockAgentService.getChainOfCommand.mockResolvedValue([]);
|
||||
mockAgentService.resolveByReference.mockResolvedValue({ ambiguous: false, agent: baseAgent });
|
||||
mockAgentService.create.mockResolvedValue(baseAgent);
|
||||
mockAgentService.activatePendingApproval.mockResolvedValue(baseAgent);
|
||||
mockAgentService.updatePermissions.mockResolvedValue(baseAgent);
|
||||
mockAccessService.getMembership.mockResolvedValue({
|
||||
id: "membership-1",
|
||||
@@ -480,6 +483,7 @@ describe("agent permission routes", () => {
|
||||
heartbeat: {
|
||||
enabled: false,
|
||||
intervalSec: 3600,
|
||||
maxConcurrentRuns: 5,
|
||||
},
|
||||
},
|
||||
}),
|
||||
@@ -517,12 +521,73 @@ describe("agent permission routes", () => {
|
||||
heartbeat: {
|
||||
enabled: false,
|
||||
intervalSec: 3600,
|
||||
maxConcurrentRuns: 5,
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("allows board users to directly approve pending agents", async () => {
|
||||
const pendingAgent = {
|
||||
...baseAgent,
|
||||
status: "pending_approval",
|
||||
};
|
||||
const approvedAgent = {
|
||||
...baseAgent,
|
||||
status: "idle",
|
||||
};
|
||||
mockAgentService.getById.mockResolvedValue(pendingAgent);
|
||||
mockAgentService.activatePendingApproval.mockResolvedValue({
|
||||
agent: approvedAgent,
|
||||
activated: true,
|
||||
});
|
||||
|
||||
const app = await createApp({
|
||||
type: "board",
|
||||
userId: "board-user",
|
||||
source: "local_implicit",
|
||||
isInstanceAdmin: true,
|
||||
companyIds: [companyId],
|
||||
});
|
||||
|
||||
const res = await request(app)
|
||||
.post(`/api/agents/${agentId}/approve`)
|
||||
.send({});
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(mockAgentService.activatePendingApproval).toHaveBeenCalledWith(agentId);
|
||||
expect(mockLogActivity).toHaveBeenCalledWith(expect.anything(), expect.objectContaining({
|
||||
companyId,
|
||||
actorType: "user",
|
||||
actorId: "board-user",
|
||||
action: "agent.approved",
|
||||
entityType: "agent",
|
||||
entityId: agentId,
|
||||
details: { source: "agent_detail" },
|
||||
}));
|
||||
});
|
||||
|
||||
it("rejects direct approval for agents that are not pending approval", async () => {
|
||||
const app = await createApp({
|
||||
type: "board",
|
||||
userId: "board-user",
|
||||
source: "local_implicit",
|
||||
isInstanceAdmin: true,
|
||||
companyIds: [companyId],
|
||||
});
|
||||
|
||||
const res = await request(app)
|
||||
.post(`/api/agents/${agentId}/approve`)
|
||||
.send({});
|
||||
|
||||
expect(res.status).toBe(409);
|
||||
expect(mockAgentService.activatePendingApproval).not.toHaveBeenCalled();
|
||||
expect(mockLogActivity).not.toHaveBeenCalledWith(expect.anything(), expect.objectContaining({
|
||||
action: "agent.approved",
|
||||
}));
|
||||
});
|
||||
|
||||
it("exposes explicit task assignment access on agent detail", async () => {
|
||||
mockAccessService.listPrincipalGrants.mockResolvedValue([
|
||||
{
|
||||
@@ -615,6 +680,12 @@ describe("agent permission routes", () => {
|
||||
status: "todo",
|
||||
},
|
||||
]);
|
||||
expect(mockIssueService.list).toHaveBeenCalledWith(companyId, {
|
||||
touchedByUserId: "board-user",
|
||||
inboxArchivedByUserId: "board-user",
|
||||
status: "backlog,todo,in_progress,in_review,blocked,done",
|
||||
limit: 500,
|
||||
});
|
||||
});
|
||||
|
||||
it("rejects heartbeat cancellation outside the caller company scope", async () => {
|
||||
|
||||
@@ -1539,13 +1539,13 @@ describe("company portability", () => {
|
||||
expect(routineSvc.create).toHaveBeenCalledWith("company-imported", expect.objectContaining({
|
||||
projectId: "project-created",
|
||||
title: "Monday Review",
|
||||
assigneeAgentId: null,
|
||||
assigneeAgentId: "agent-created",
|
||||
priority: "high",
|
||||
status: "paused",
|
||||
concurrencyPolicy: "always_enqueue",
|
||||
catchUpPolicy: "enqueue_missed_with_cap",
|
||||
}), expect.any(Object));
|
||||
expect(result.warnings).toContain(
|
||||
expect(result.warnings).not.toContain(
|
||||
"Task monday-review assignee claudecoder is pending_approval; imported work was left unassigned.",
|
||||
);
|
||||
expect(routineSvc.createTrigger).toHaveBeenCalledTimes(2);
|
||||
@@ -2132,6 +2132,7 @@ describe("company portability", () => {
|
||||
runtimeConfig: {
|
||||
heartbeat: {
|
||||
enabled: false,
|
||||
maxConcurrentRuns: 5,
|
||||
},
|
||||
},
|
||||
});
|
||||
@@ -2210,6 +2211,7 @@ describe("company portability", () => {
|
||||
runtimeConfig: {
|
||||
heartbeat: {
|
||||
enabled: false,
|
||||
maxConcurrentRuns: 5,
|
||||
},
|
||||
},
|
||||
}));
|
||||
@@ -2489,7 +2491,7 @@ describe("company portability", () => {
|
||||
expect(agentSvc.create).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("imports new agents through approval and adapter-config normalization", async () => {
|
||||
it("imports new agents as active while preserving future hire approval settings", async () => {
|
||||
const portability = companyPortabilityService({} as any);
|
||||
const exported = await portability.exportBundle("company-1", {
|
||||
include: {
|
||||
@@ -2549,7 +2551,10 @@ describe("company portability", () => {
|
||||
adapterConfig: expect.objectContaining({
|
||||
normalized: true,
|
||||
}),
|
||||
status: "pending_approval",
|
||||
status: "idle",
|
||||
}));
|
||||
expect(companySvc.create).toHaveBeenCalledWith(expect.objectContaining({
|
||||
requireBoardApprovalForNewAgents: true,
|
||||
}));
|
||||
});
|
||||
|
||||
@@ -2614,4 +2619,154 @@ describe("company portability", () => {
|
||||
},
|
||||
}));
|
||||
});
|
||||
|
||||
it("nameOverrides applied after collision detection do not re-validate uniqueness", async () => {
|
||||
const portability = companyPortabilityService({} as any);
|
||||
|
||||
const exported = await portability.exportBundle("company-1", {
|
||||
include: { company: false, agents: true, projects: false, issues: false },
|
||||
});
|
||||
|
||||
// Simulate existing agents so collision detection triggers rename
|
||||
agentSvc.list.mockResolvedValue([
|
||||
{ id: "existing-1", name: "ClaudeCoder", status: "idle", role: "engineer", adapterType: "claude_local", adapterConfig: {}, runtimeConfig: {}, budgetMonthlyCents: 0, permissions: {}, metadata: null },
|
||||
]);
|
||||
|
||||
const preview = await portability.previewImport({
|
||||
source: { type: "inline", rootPath: exported.rootPath, files: exported.files },
|
||||
include: { company: false, agents: true, projects: false, issues: false },
|
||||
target: { mode: "existing_company", companyId: "company-1" },
|
||||
agents: ["claudecoder"],
|
||||
collisionStrategy: "rename",
|
||||
nameOverrides: { claudecoder: "ClaudeCoder" },
|
||||
});
|
||||
|
||||
// The override reverts the renamed agent back to its original collision name.
|
||||
// This is a known limitation: nameOverrides bypass collision checks.
|
||||
const plan = preview.plan.agentPlans.find((p) => p.slug === "claudecoder");
|
||||
expect(plan).toBeDefined();
|
||||
expect(plan!.action).toBe("create");
|
||||
expect(plan!.plannedName).toBe("ClaudeCoder");
|
||||
});
|
||||
|
||||
it("handles circular reportsTo chains without infinite recursion during export", async () => {
|
||||
const portability = companyPortabilityService({} as any);
|
||||
|
||||
agentSvc.list.mockResolvedValue([
|
||||
{
|
||||
id: "agent-a", name: "AgentA", status: "idle", role: "engineer", title: null, icon: null,
|
||||
reportsTo: "agent-b", capabilities: null, adapterType: "claude_local",
|
||||
adapterConfig: {}, runtimeConfig: {}, budgetMonthlyCents: 0, permissions: {}, metadata: null,
|
||||
},
|
||||
{
|
||||
id: "agent-b", name: "AgentB", status: "idle", role: "manager", title: null, icon: null,
|
||||
reportsTo: "agent-a", capabilities: null, adapterType: "claude_local",
|
||||
adapterConfig: {}, runtimeConfig: {}, budgetMonthlyCents: 0, permissions: {}, metadata: null,
|
||||
},
|
||||
]);
|
||||
agentInstructionsSvc.exportFiles.mockResolvedValue({
|
||||
files: { "AGENTS.md": "Instructions" }, entryFile: "AGENTS.md", warnings: [],
|
||||
});
|
||||
|
||||
// Export should complete without infinite recursion in org chart building
|
||||
const exported = await portability.exportBundle("company-1", {
|
||||
include: { company: true, agents: true, projects: false, issues: false },
|
||||
});
|
||||
|
||||
expect(exported.manifest.agents).toHaveLength(2);
|
||||
// Both agents should appear in the export
|
||||
const slugs = exported.manifest.agents.map((a) => a.slug);
|
||||
expect(slugs).toContain("agenta");
|
||||
expect(slugs).toContain("agentb");
|
||||
});
|
||||
|
||||
it("resolves issue assignee to existing agent when agent is skipped", async () => {
|
||||
const portability = companyPortabilityService({} as any);
|
||||
|
||||
projectSvc.list.mockResolvedValue([{
|
||||
id: "project-1", companyId: "company-1", name: "TestProject", urlKey: "testproject",
|
||||
description: null, leadAgentId: null, targetDate: null, color: null, status: "planned",
|
||||
executionWorkspacePolicy: null, archivedAt: null, workspaces: [],
|
||||
}]);
|
||||
issueSvc.list.mockResolvedValue([{
|
||||
id: "issue-1", companyId: "company-1", title: "Test task", identifier: "PAP-1",
|
||||
description: "A test task", status: "todo", priority: "medium",
|
||||
assigneeAgentId: "agent-1", projectId: "project-1", projectWorkspaceId: null,
|
||||
goalId: null, parentId: null, billingCode: null, labelIds: [],
|
||||
executionWorkspaceSettings: null, assigneeAdapterOverrides: null, metadata: null,
|
||||
}]);
|
||||
|
||||
const exported = await portability.exportBundle("company-1", {
|
||||
include: { company: false, agents: true, projects: true, issues: true },
|
||||
});
|
||||
|
||||
// Re-import into same company with skip collision strategy
|
||||
// Both agents exist so both will be skipped; the existing agent should resolve for issue assignment
|
||||
agentSvc.list.mockResolvedValue([
|
||||
{ id: "agent-1", name: "ClaudeCoder", status: "idle", role: "engineer", adapterType: "claude_local", adapterConfig: {}, runtimeConfig: {}, budgetMonthlyCents: 0, permissions: {}, metadata: null },
|
||||
{ id: "agent-2", name: "CMO", status: "idle", role: "cmo", adapterType: "claude_local", adapterConfig: {}, runtimeConfig: {}, budgetMonthlyCents: 0, permissions: {}, metadata: null },
|
||||
]);
|
||||
projectSvc.list.mockResolvedValue([]);
|
||||
issueSvc.list.mockResolvedValue([]);
|
||||
projectSvc.create.mockResolvedValue({ id: "project-new", companyId: "company-1", urlKey: "testproject" });
|
||||
issueSvc.create.mockResolvedValue({ id: "issue-new", identifier: "PAP-100" });
|
||||
|
||||
const result = await portability.importBundle({
|
||||
source: { type: "inline", rootPath: exported.rootPath, files: exported.files },
|
||||
include: { company: false, agents: true, projects: true, issues: true },
|
||||
target: { mode: "existing_company", companyId: "company-1" },
|
||||
agents: "all",
|
||||
collisionStrategy: "skip",
|
||||
}, "user-1");
|
||||
|
||||
// Both agents should be skipped (already exist)
|
||||
const agentResult = result.agents.find((a) => a.slug === "claudecoder");
|
||||
expect(agentResult).toBeDefined();
|
||||
expect(agentResult!.action).toBe("skipped");
|
||||
|
||||
// Issue should still be created and reference the existing agent
|
||||
expect(issueSvc.create).toHaveBeenCalled();
|
||||
const issueCreateCall = issueSvc.create.mock.calls[0];
|
||||
// The assigneeAgentId should resolve to the existing agent via existingSlugToAgentId
|
||||
expect(issueCreateCall[1]).toEqual(expect.objectContaining({
|
||||
assigneeAgentId: "agent-1",
|
||||
}));
|
||||
});
|
||||
|
||||
it("handles a package with only skills (no agents or projects)", async () => {
|
||||
const portability = companyPortabilityService({} as any);
|
||||
|
||||
const exported = await portability.exportBundle("company-1", {
|
||||
include: { company: false, agents: false, projects: false, issues: false, skills: true },
|
||||
expandReferencedSkills: true,
|
||||
});
|
||||
|
||||
expect(exported.manifest.agents).toHaveLength(0);
|
||||
expect(exported.manifest.projects).toHaveLength(0);
|
||||
expect(exported.manifest.issues).toHaveLength(0);
|
||||
// Skills should still be exported
|
||||
expect(exported.manifest.skills.length).toBeGreaterThanOrEqual(0);
|
||||
});
|
||||
|
||||
it("preview import detects no agents to import when agents are excluded", async () => {
|
||||
const portability = companyPortabilityService({} as any);
|
||||
|
||||
const exported = await portability.exportBundle("company-1", {
|
||||
include: { company: true, agents: true, projects: false, issues: false },
|
||||
});
|
||||
|
||||
agentSvc.list.mockResolvedValue([]);
|
||||
|
||||
const preview = await portability.previewImport({
|
||||
source: { type: "inline", rootPath: exported.rootPath, files: exported.files },
|
||||
include: { company: false, agents: false, projects: false, issues: false },
|
||||
target: { mode: "existing_company", companyId: "company-1" },
|
||||
agents: "all",
|
||||
collisionStrategy: "rename",
|
||||
});
|
||||
|
||||
expect(preview.plan.agentPlans).toHaveLength(0);
|
||||
expect(preview.plan.projectPlans).toHaveLength(0);
|
||||
expect(preview.plan.issuePlans).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -236,6 +236,115 @@ describe("heartbeat comment wake batching", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("defers approval-approved wakes for a running issue so the assignee resumes after the run", async () => {
|
||||
const companyId = randomUUID();
|
||||
const agentId = randomUUID();
|
||||
const issueId = randomUUID();
|
||||
const runId = randomUUID();
|
||||
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Paperclip",
|
||||
issuePrefix,
|
||||
requireBoardApprovalForNewAgents: false,
|
||||
});
|
||||
|
||||
await db.insert(agents).values({
|
||||
id: agentId,
|
||||
companyId,
|
||||
name: "CEO",
|
||||
role: "ceo",
|
||||
status: "running",
|
||||
adapterType: "process",
|
||||
adapterConfig: {},
|
||||
runtimeConfig: {},
|
||||
permissions: {},
|
||||
});
|
||||
|
||||
await db.insert(heartbeatRuns).values({
|
||||
id: runId,
|
||||
companyId,
|
||||
agentId,
|
||||
invocationSource: "assignment",
|
||||
triggerDetail: "system",
|
||||
status: "running",
|
||||
contextSnapshot: {
|
||||
issueId,
|
||||
taskId: issueId,
|
||||
wakeReason: "issue_assigned",
|
||||
},
|
||||
});
|
||||
|
||||
await db.insert(issues).values({
|
||||
id: issueId,
|
||||
companyId,
|
||||
title: "Hire an agent",
|
||||
status: "blocked",
|
||||
priority: "medium",
|
||||
assigneeAgentId: agentId,
|
||||
executionRunId: runId,
|
||||
executionAgentNameKey: "ceo",
|
||||
executionLockedAt: new Date(),
|
||||
issueNumber: 1,
|
||||
identifier: `${issuePrefix}-1`,
|
||||
});
|
||||
|
||||
const followupRun = await heartbeat.wakeup(agentId, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "approval_approved",
|
||||
payload: {
|
||||
issueId,
|
||||
approvalId: "approval-1",
|
||||
approvalStatus: "approved",
|
||||
},
|
||||
contextSnapshot: {
|
||||
issueId,
|
||||
taskId: issueId,
|
||||
approvalId: "approval-1",
|
||||
approvalStatus: "approved",
|
||||
wakeReason: "approval_approved",
|
||||
},
|
||||
requestedByActorType: "user",
|
||||
requestedByActorId: "local-board",
|
||||
});
|
||||
|
||||
expect(followupRun).toBeNull();
|
||||
|
||||
const deferred = await db
|
||||
.select()
|
||||
.from(agentWakeupRequests)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, companyId),
|
||||
eq(agentWakeupRequests.agentId, agentId),
|
||||
eq(agentWakeupRequests.status, "deferred_issue_execution"),
|
||||
),
|
||||
)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
expect(deferred).not.toBeNull();
|
||||
expect(deferred?.reason).toBe("issue_execution_deferred");
|
||||
expect(deferred?.payload).toMatchObject({
|
||||
issueId,
|
||||
approvalId: "approval-1",
|
||||
approvalStatus: "approved",
|
||||
});
|
||||
expect((deferred?.payload as Record<string, unknown>)._paperclipWakeContext).toMatchObject({
|
||||
issueId,
|
||||
taskId: issueId,
|
||||
approvalId: "approval-1",
|
||||
approvalStatus: "approved",
|
||||
wakeReason: "approval_approved",
|
||||
});
|
||||
|
||||
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
|
||||
expect(runs).toHaveLength(1);
|
||||
expect(runs[0]?.id).toBe(runId);
|
||||
});
|
||||
|
||||
it("batches deferred comment wakes and forwards the ordered batch to the next run", async () => {
|
||||
const gateway = await createControlledGatewayServer();
|
||||
const companyId = randomUUID();
|
||||
|
||||
@@ -5,7 +5,7 @@ import {
|
||||
getEmbeddedPostgresTestSupport,
|
||||
startEmbeddedPostgresTestDatabase,
|
||||
} from "./helpers/embedded-postgres.js";
|
||||
import { heartbeatService } from "../services/heartbeat.ts";
|
||||
import { boundHeartbeatRunEventPayloadForStorage, heartbeatService } from "../services/heartbeat.ts";
|
||||
|
||||
const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport();
|
||||
const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip;
|
||||
@@ -202,3 +202,25 @@ describeEmbeddedPostgres("heartbeat list", () => {
|
||||
expect(result).not.toHaveProperty("nestedHuge");
|
||||
});
|
||||
});
|
||||
|
||||
describe("heartbeat run event payload bounding", () => {
|
||||
it("truncates oversized adapter metadata before storage", () => {
|
||||
const payload = boundHeartbeatRunEventPayloadForStorage({
|
||||
adapterType: "codex_local",
|
||||
prompt: "x".repeat(40_000),
|
||||
context: {
|
||||
issueId: "issue-1",
|
||||
memory: "y".repeat(40_000),
|
||||
},
|
||||
});
|
||||
|
||||
expect(payload.adapterType).toBe("codex_local");
|
||||
expect(typeof payload.prompt).toBe("string");
|
||||
expect((payload.prompt as string).length).toBeLessThan(20_000);
|
||||
expect(payload.prompt).toContain("[truncated");
|
||||
expect(payload.context).toMatchObject({
|
||||
issueId: "issue-1",
|
||||
});
|
||||
expect(JSON.stringify(payload).length).toBeLessThan(45_000);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -702,6 +702,56 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("does not continue seeded in-progress work that has no run linkage", async () => {
|
||||
const companyId = randomUUID();
|
||||
const agentId = randomUUID();
|
||||
const issueId = randomUUID();
|
||||
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Paperclip",
|
||||
issuePrefix,
|
||||
requireBoardApprovalForNewAgents: false,
|
||||
});
|
||||
await db.insert(agents).values({
|
||||
id: agentId,
|
||||
companyId,
|
||||
name: "CodexCoder",
|
||||
role: "engineer",
|
||||
status: "idle",
|
||||
adapterType: "codex_local",
|
||||
adapterConfig: {},
|
||||
runtimeConfig: {},
|
||||
permissions: {},
|
||||
});
|
||||
await db.insert(issues).values({
|
||||
id: issueId,
|
||||
companyId,
|
||||
title: "Seeded in-flight work",
|
||||
status: "in_progress",
|
||||
priority: "medium",
|
||||
assigneeAgentId: agentId,
|
||||
checkoutRunId: null,
|
||||
executionRunId: null,
|
||||
issueNumber: 1,
|
||||
identifier: `${issuePrefix}-1`,
|
||||
startedAt: new Date("2026-03-19T00:00:00.000Z"),
|
||||
});
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.dispatchRequeued).toBe(0);
|
||||
expect(result.continuationRequeued).toBe(0);
|
||||
expect(result.escalated).toBe(0);
|
||||
expect(result.skipped).toBe(1);
|
||||
|
||||
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
|
||||
expect(runs).toHaveLength(0);
|
||||
const [issue] = await db.select().from(issues).where(eq(issues.id, issueId));
|
||||
expect(issue?.status).toBe("in_progress");
|
||||
expect(issue?.executionRunId).toBeNull();
|
||||
});
|
||||
|
||||
it("classifies actionable plan-only recovery and enqueues one liveness continuation", async () => {
|
||||
mockAdapterExecute.mockResolvedValueOnce({
|
||||
exitCode: 0,
|
||||
@@ -824,6 +874,39 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||
expect(comments[0]?.body).toContain("Latest retry failure: `process_lost` - run failed before issue advanced.");
|
||||
});
|
||||
|
||||
it("re-enqueues continuation when the latest automatic continuation succeeded without closing the issue", async () => {
|
||||
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "succeeded",
|
||||
retryReason: "issue_continuation_needed",
|
||||
});
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.continuationRequeued).toBe(1);
|
||||
expect(result.escalated).toBe(0);
|
||||
expect(result.issueIds).toEqual([issueId]);
|
||||
|
||||
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
|
||||
expect(issue?.status).toBe("in_progress");
|
||||
|
||||
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
|
||||
expect(comments).toHaveLength(0);
|
||||
|
||||
const runs = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(eq(heartbeatRuns.agentId, agentId));
|
||||
expect(runs).toHaveLength(2);
|
||||
|
||||
const retryRun = runs.find((row) => row.id !== runId);
|
||||
expect(retryRun?.id).toBeTruthy();
|
||||
expect((retryRun?.contextSnapshot as Record<string, unknown>)?.retryReason).toBe("issue_continuation_needed");
|
||||
if (retryRun) {
|
||||
await waitForRunToSettle(heartbeat, retryRun.id);
|
||||
}
|
||||
});
|
||||
|
||||
it("does not reconcile user-assigned work through the agent stranded-work recovery path", async () => {
|
||||
const { issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "todo",
|
||||
|
||||
@@ -22,12 +22,20 @@ import {
|
||||
startEmbeddedPostgresTestDatabase,
|
||||
} from "./helpers/embedded-postgres.js";
|
||||
import { instanceSettingsService } from "../services/instance-settings.ts";
|
||||
import { issueService } from "../services/issues.ts";
|
||||
import { clampIssueListLimit, ISSUE_LIST_MAX_LIMIT, issueService } from "../services/issues.ts";
|
||||
import { buildProjectMentionHref } from "@paperclipai/shared";
|
||||
|
||||
const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport();
|
||||
const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip;
|
||||
|
||||
describe("issue list limit helpers", () => {
|
||||
it("clamps untrusted issue-list limits to the server maximum", () => {
|
||||
expect(clampIssueListLimit(0)).toBe(1);
|
||||
expect(clampIssueListLimit(25.9)).toBe(25);
|
||||
expect(clampIssueListLimit(ISSUE_LIST_MAX_LIMIT + 10)).toBe(ISSUE_LIST_MAX_LIMIT);
|
||||
});
|
||||
});
|
||||
|
||||
async function ensureIssueRelationsTable(db: ReturnType<typeof createDb>) {
|
||||
await db.execute(sql.raw(`
|
||||
CREATE TABLE IF NOT EXISTS "issue_relations" (
|
||||
|
||||
@@ -24,6 +24,7 @@ export const asBoolean = serverUtils.asBoolean;
|
||||
export const asStringArray = serverUtils.asStringArray;
|
||||
export const parseJson = serverUtils.parseJson;
|
||||
export const appendWithCap = serverUtils.appendWithCap;
|
||||
export const appendWithByteCap = serverUtils.appendWithByteCap;
|
||||
export const resolvePathValue = serverUtils.resolvePathValue;
|
||||
export const renderTemplate = serverUtils.renderTemplate;
|
||||
export const redactEnvForLogs = serverUtils.redactEnvForLogs;
|
||||
|
||||
@@ -7,6 +7,7 @@ import { and, desc, eq, inArray, not, sql } from "drizzle-orm";
|
||||
import {
|
||||
agentSkillSyncSchema,
|
||||
agentMineInboxQuerySchema,
|
||||
AGENT_DEFAULT_MAX_CONCURRENT_RUNS,
|
||||
createAgentKeySchema,
|
||||
createAgentHireSchema,
|
||||
createAgentSchema,
|
||||
@@ -37,6 +38,7 @@ import {
|
||||
companySkillService,
|
||||
budgetService,
|
||||
heartbeatService,
|
||||
ISSUE_LIST_DEFAULT_LIMIT,
|
||||
issueApprovalService,
|
||||
issueService,
|
||||
logActivity,
|
||||
@@ -75,6 +77,15 @@ import {
|
||||
} from "../services/default-agent-instructions.js";
|
||||
import { getTelemetryClient } from "../telemetry.js";
|
||||
|
||||
const RUN_LOG_DEFAULT_LIMIT_BYTES = 256_000;
|
||||
const RUN_LOG_MAX_LIMIT_BYTES = 1024 * 1024;
|
||||
|
||||
function readRunLogLimitBytes(value: unknown) {
|
||||
const parsed = Number(value ?? RUN_LOG_DEFAULT_LIMIT_BYTES);
|
||||
if (!Number.isFinite(parsed)) return RUN_LOG_DEFAULT_LIMIT_BYTES;
|
||||
return Math.max(1, Math.min(RUN_LOG_MAX_LIMIT_BYTES, Math.trunc(parsed)));
|
||||
}
|
||||
|
||||
export function agentRoutes(db: Db) {
|
||||
// Legacy hardcoded maps — used as fallback when adapter module does not
|
||||
// declare capability flags explicitly.
|
||||
@@ -514,6 +525,9 @@ export function agentRoutes(db: Db) {
|
||||
if (parseBooleanLike(heartbeat.enabled) == null) {
|
||||
heartbeat.enabled = false;
|
||||
}
|
||||
if (parseNumberLike(heartbeat.maxConcurrentRuns) == null) {
|
||||
heartbeat.maxConcurrentRuns = AGENT_DEFAULT_MAX_CONCURRENT_RUNS;
|
||||
}
|
||||
|
||||
normalizedRuntimeConfig.heartbeat = heartbeat;
|
||||
return normalizedRuntimeConfig;
|
||||
@@ -1168,6 +1182,7 @@ export function agentRoutes(db: Db) {
|
||||
assigneeAgentId: req.actor.agentId,
|
||||
status: "todo,in_progress,blocked",
|
||||
includeRoutineExecutions: true,
|
||||
limit: ISSUE_LIST_DEFAULT_LIMIT,
|
||||
});
|
||||
|
||||
res.json(
|
||||
@@ -1198,6 +1213,7 @@ export function agentRoutes(db: Db) {
|
||||
touchedByUserId: query.userId,
|
||||
inboxArchivedByUserId: query.userId,
|
||||
status: query.status,
|
||||
limit: ISSUE_LIST_DEFAULT_LIMIT,
|
||||
});
|
||||
|
||||
res.json(rows);
|
||||
@@ -1682,6 +1698,10 @@ export function agentRoutes(db: Db) {
|
||||
});
|
||||
|
||||
router.patch("/agents/:id/instructions-path", validate(updateAgentInstructionsPathSchema), async (req, res) => {
|
||||
if (req.actor.type !== "board") {
|
||||
throw forbidden("Only board-authenticated callers can manage instructions path or bundle configuration");
|
||||
}
|
||||
|
||||
const id = req.params.id as string;
|
||||
const existing = await svc.getById(id);
|
||||
if (!existing) {
|
||||
@@ -2098,6 +2118,42 @@ export function agentRoutes(db: Db) {
|
||||
res.json(agent);
|
||||
});
|
||||
|
||||
router.post("/agents/:id/approve", async (req, res) => {
|
||||
assertBoard(req);
|
||||
const id = req.params.id as string;
|
||||
const existing = await getAccessibleAgent(req, res, id);
|
||||
if (!existing) {
|
||||
return;
|
||||
}
|
||||
if (existing.status !== "pending_approval") {
|
||||
res.status(409).json({ error: "Only pending approval agents can be approved" });
|
||||
return;
|
||||
}
|
||||
|
||||
const approval = await svc.activatePendingApproval(id);
|
||||
if (!approval) {
|
||||
res.status(404).json({ error: "Agent not found" });
|
||||
return;
|
||||
}
|
||||
if (!approval.activated) {
|
||||
res.status(409).json({ error: "Only pending approval agents can be approved" });
|
||||
return;
|
||||
}
|
||||
const { agent } = approval;
|
||||
|
||||
await logActivity(db, {
|
||||
companyId: agent.companyId,
|
||||
actorType: "user",
|
||||
actorId: req.actor.userId ?? "board",
|
||||
action: "agent.approved",
|
||||
entityType: "agent",
|
||||
entityId: agent.id,
|
||||
details: { source: "agent_detail" },
|
||||
});
|
||||
|
||||
res.json(agent);
|
||||
});
|
||||
|
||||
router.post("/agents/:id/terminate", async (req, res) => {
|
||||
assertBoard(req);
|
||||
const id = req.params.id as string;
|
||||
@@ -2492,10 +2548,10 @@ export function agentRoutes(db: Db) {
|
||||
assertCompanyAccess(req, run.companyId);
|
||||
|
||||
const offset = Number(req.query.offset ?? 0);
|
||||
const limitBytes = Number(req.query.limitBytes ?? 256000);
|
||||
const limitBytes = readRunLogLimitBytes(req.query.limitBytes);
|
||||
const result = await heartbeat.readLog(run, {
|
||||
offset: Number.isFinite(offset) ? offset : 0,
|
||||
limitBytes: Number.isFinite(limitBytes) ? limitBytes : 256000,
|
||||
limitBytes,
|
||||
});
|
||||
|
||||
res.set("Cache-Control", "no-cache, no-store");
|
||||
@@ -2527,10 +2583,10 @@ export function agentRoutes(db: Db) {
|
||||
assertCompanyAccess(req, operation.companyId);
|
||||
|
||||
const offset = Number(req.query.offset ?? 0);
|
||||
const limitBytes = Number(req.query.limitBytes ?? 256000);
|
||||
const limitBytes = readRunLogLimitBytes(req.query.limitBytes);
|
||||
const result = await workspaceOperations.readLog(operationId, {
|
||||
offset: Number.isFinite(offset) ? offset : 0,
|
||||
limitBytes: Number.isFinite(limitBytes) ? limitBytes : 256000,
|
||||
limitBytes,
|
||||
});
|
||||
|
||||
res.set("Cache-Control", "no-cache, no-store");
|
||||
|
||||
@@ -28,6 +28,9 @@ import {
|
||||
collectExecutionWorkspaceCommandPaths,
|
||||
} from "./workspace-command-authz.js";
|
||||
import { assertCanManageExecutionWorkspaceRuntimeServices } from "./workspace-runtime-service-authz.js";
|
||||
import { appendWithCap } from "../adapters/utils.js";
|
||||
|
||||
const WORKSPACE_CONTROL_OUTPUT_MAX_CHARS = 256 * 1024;
|
||||
|
||||
export function executionWorkspaceRoutes(db: Db) {
|
||||
const router = Router();
|
||||
@@ -209,8 +212,8 @@ export function executionWorkspaceRoutes(db: Db) {
|
||||
executionWorkspaceId: existing.id,
|
||||
});
|
||||
let runtimeServiceCount = existing.runtimeServices?.length ?? 0;
|
||||
const stdout: string[] = [];
|
||||
const stderr: string[] = [];
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
|
||||
const operation = await recorder.recordOperation({
|
||||
phase: action === "stop" ? "workspace_teardown" : "workspace_provision",
|
||||
@@ -310,8 +313,8 @@ export function executionWorkspaceRoutes(db: Db) {
|
||||
}
|
||||
|
||||
const onLog = async (stream: "stdout" | "stderr", chunk: string) => {
|
||||
if (stream === "stdout") stdout.push(chunk);
|
||||
else stderr.push(chunk);
|
||||
if (stream === "stdout") stdout = appendWithCap(stdout, chunk, WORKSPACE_CONTROL_OUTPUT_MAX_CHARS);
|
||||
else stderr = appendWithCap(stderr, chunk, WORKSPACE_CONTROL_OUTPUT_MAX_CHARS);
|
||||
};
|
||||
|
||||
if (action === "stop" || action === "restart") {
|
||||
@@ -382,8 +385,8 @@ export function executionWorkspaceRoutes(db: Db) {
|
||||
|
||||
return {
|
||||
status: "succeeded",
|
||||
stdout: stdout.join(""),
|
||||
stderr: stderr.join(""),
|
||||
stdout,
|
||||
stderr,
|
||||
system:
|
||||
action === "stop"
|
||||
? "Stopped execution workspace runtime services.\n"
|
||||
|
||||
@@ -40,7 +40,10 @@ import {
|
||||
heartbeatService,
|
||||
instanceSettingsService,
|
||||
issueApprovalService,
|
||||
ISSUE_LIST_DEFAULT_LIMIT,
|
||||
ISSUE_LIST_MAX_LIMIT,
|
||||
issueService,
|
||||
clampIssueListLimit,
|
||||
documentService,
|
||||
logActivity,
|
||||
projectService,
|
||||
@@ -618,8 +621,10 @@ export function issueRoutes(
|
||||
? req.actor.userId
|
||||
: unreadForUserFilterRaw;
|
||||
const rawLimit = req.query.limit as string | undefined;
|
||||
const parsedLimit = rawLimit ? Number.parseInt(rawLimit, 10) : null;
|
||||
const limit = parsedLimit ?? undefined;
|
||||
const parsedLimit = rawLimit !== undefined && /^\d+$/.test(rawLimit)
|
||||
? Number.parseInt(rawLimit, 10)
|
||||
: null;
|
||||
const limit = parsedLimit === null ? ISSUE_LIST_DEFAULT_LIMIT : clampIssueListLimit(parsedLimit);
|
||||
|
||||
if (assigneeUserFilterRaw === "me" && (!assigneeUserId || req.actor.type !== "board")) {
|
||||
res.status(403).json({ error: "assigneeUserId=me requires board authentication" });
|
||||
@@ -638,7 +643,7 @@ export function issueRoutes(
|
||||
return;
|
||||
}
|
||||
if (rawLimit !== undefined && (parsedLimit === null || !Number.isInteger(parsedLimit) || parsedLimit <= 0)) {
|
||||
res.status(400).json({ error: "limit must be a positive integer" });
|
||||
res.status(400).json({ error: `limit must be a positive integer up to ${ISSUE_LIST_MAX_LIMIT}` });
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,9 @@ import {
|
||||
} from "./workspace-command-authz.js";
|
||||
import { assertCanManageProjectWorkspaceRuntimeServices } from "./workspace-runtime-service-authz.js";
|
||||
import { getTelemetryClient } from "../telemetry.js";
|
||||
import { appendWithCap } from "../adapters/utils.js";
|
||||
|
||||
const WORKSPACE_CONTROL_OUTPUT_MAX_CHARS = 256 * 1024;
|
||||
|
||||
export function projectRoutes(db: Db) {
|
||||
const router = Router();
|
||||
@@ -377,8 +380,8 @@ export function projectRoutes(db: Db) {
|
||||
const actor = getActorInfo(req);
|
||||
const recorder = workspaceOperations.createRecorder({ companyId: project.companyId });
|
||||
let runtimeServiceCount = workspace.runtimeServices?.length ?? 0;
|
||||
const stdout: string[] = [];
|
||||
const stderr: string[] = [];
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
|
||||
const operation = await recorder.recordOperation({
|
||||
phase: action === "stop" ? "workspace_teardown" : "workspace_provision",
|
||||
@@ -440,8 +443,8 @@ export function projectRoutes(db: Db) {
|
||||
}
|
||||
|
||||
const onLog = async (stream: "stdout" | "stderr", chunk: string) => {
|
||||
if (stream === "stdout") stdout.push(chunk);
|
||||
else stderr.push(chunk);
|
||||
if (stream === "stdout") stdout = appendWithCap(stdout, chunk, WORKSPACE_CONTROL_OUTPUT_MAX_CHARS);
|
||||
else stderr = appendWithCap(stderr, chunk, WORKSPACE_CONTROL_OUTPUT_MAX_CHARS);
|
||||
};
|
||||
|
||||
if (action === "stop" || action === "restart") {
|
||||
@@ -514,8 +517,8 @@ export function projectRoutes(db: Db) {
|
||||
|
||||
return {
|
||||
status: "succeeded",
|
||||
stdout: stdout.join(""),
|
||||
stderr: stderr.join(""),
|
||||
stdout,
|
||||
stderr,
|
||||
system:
|
||||
action === "stop"
|
||||
? "Stopped project workspace runtime services.\n"
|
||||
|
||||
@@ -16,7 +16,7 @@ import {
|
||||
issues,
|
||||
issueComments,
|
||||
} from "@paperclipai/db";
|
||||
import { isUuidLike, normalizeAgentUrlKey } from "@paperclipai/shared";
|
||||
import { AGENT_DEFAULT_MAX_CONCURRENT_RUNS, isUuidLike, normalizeAgentUrlKey } from "@paperclipai/shared";
|
||||
import { conflict, notFound, unprocessable } from "../errors.js";
|
||||
import { normalizeAgentPermissions } from "./agent-permissions.js";
|
||||
import { REDACTED_EVENT_VALUE, sanitizeRecord } from "../redaction.js";
|
||||
@@ -114,6 +114,25 @@ function hasConfigPatchFields(data: Partial<typeof agents.$inferInsert>) {
|
||||
return CONFIG_REVISION_FIELDS.some((field) => Object.prototype.hasOwnProperty.call(data, field));
|
||||
}
|
||||
|
||||
function parseFiniteNumberLike(value: unknown): number | null {
|
||||
if (typeof value === "number" && Number.isFinite(value)) return value;
|
||||
if (typeof value !== "string") return null;
|
||||
const parsed = Number(value.trim());
|
||||
return Number.isFinite(parsed) ? parsed : null;
|
||||
}
|
||||
|
||||
function normalizeRuntimeConfigForNewAgent(runtimeConfig: unknown): Record<string, unknown> {
|
||||
const normalizedRuntimeConfig = isPlainRecord(runtimeConfig) ? { ...runtimeConfig } : {};
|
||||
const heartbeat = isPlainRecord(normalizedRuntimeConfig.heartbeat)
|
||||
? { ...normalizedRuntimeConfig.heartbeat }
|
||||
: {};
|
||||
if (parseFiniteNumberLike(heartbeat.maxConcurrentRuns) == null) {
|
||||
heartbeat.maxConcurrentRuns = AGENT_DEFAULT_MAX_CONCURRENT_RUNS;
|
||||
}
|
||||
normalizedRuntimeConfig.heartbeat = heartbeat;
|
||||
return normalizedRuntimeConfig;
|
||||
}
|
||||
|
||||
function diffConfigSnapshot(
|
||||
before: AgentConfigSnapshot,
|
||||
after: AgentConfigSnapshot,
|
||||
@@ -398,9 +417,10 @@ export function agentService(db: Db) {
|
||||
|
||||
const role = data.role ?? "general";
|
||||
const normalizedPermissions = normalizeAgentPermissions(data.permissions, role);
|
||||
const runtimeConfig = normalizeRuntimeConfigForNewAgent(data.runtimeConfig);
|
||||
const created = await db
|
||||
.insert(agents)
|
||||
.values({ ...data, name: uniqueName, companyId, role, permissions: normalizedPermissions })
|
||||
.values({ ...data, name: uniqueName, companyId, role, permissions: normalizedPermissions, runtimeConfig })
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
@@ -506,18 +526,19 @@ export function agentService(db: Db) {
|
||||
},
|
||||
|
||||
activatePendingApproval: async (id: string) => {
|
||||
const existing = await getById(id);
|
||||
if (!existing) return null;
|
||||
if (existing.status !== "pending_approval") return existing;
|
||||
|
||||
const updated = await db
|
||||
.update(agents)
|
||||
.set({ status: "idle", updatedAt: new Date() })
|
||||
.where(eq(agents.id, id))
|
||||
.where(and(eq(agents.id, id), eq(agents.status, "pending_approval")))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
return updated ? normalizeAgentRow(updated) : null;
|
||||
if (updated) {
|
||||
return { agent: normalizeAgentRow(updated), activated: true };
|
||||
}
|
||||
|
||||
const existing = await getById(id);
|
||||
return existing ? { agent: existing, activated: false } : null;
|
||||
},
|
||||
|
||||
updatePermissions: async (id: string, permissions: { canCreateAgents: boolean }) => {
|
||||
|
||||
@@ -31,6 +31,7 @@ import type {
|
||||
RoutineVariable,
|
||||
} from "@paperclipai/shared";
|
||||
import {
|
||||
AGENT_DEFAULT_MAX_CONCURRENT_RUNS,
|
||||
ISSUE_PRIORITIES,
|
||||
ISSUE_STATUSES,
|
||||
PROJECT_STATUSES,
|
||||
@@ -590,7 +591,7 @@ const RUNTIME_DEFAULT_RULES: Array<{ path: string[]; value: unknown }> = [
|
||||
{ path: ["heartbeat", "wakeOnAssignment"], value: true },
|
||||
{ path: ["heartbeat", "wakeOnAutomation"], value: true },
|
||||
{ path: ["heartbeat", "wakeOnDemand"], value: true },
|
||||
{ path: ["heartbeat", "maxConcurrentRuns"], value: 3 },
|
||||
{ path: ["heartbeat", "maxConcurrentRuns"], value: AGENT_DEFAULT_MAX_CONCURRENT_RUNS },
|
||||
];
|
||||
|
||||
const ADAPTER_DEFAULT_RULES_BY_TYPE: Record<string, Array<{ path: string[]; value: unknown }>> = {
|
||||
@@ -741,10 +742,20 @@ function clonePortableRecord(value: unknown) {
|
||||
return structuredClone(value) as Record<string, unknown>;
|
||||
}
|
||||
|
||||
function parseFiniteNumberLike(value: unknown): number | null {
|
||||
if (typeof value === "number" && Number.isFinite(value)) return value;
|
||||
if (typeof value !== "string") return null;
|
||||
const parsed = Number(value.trim());
|
||||
return Number.isFinite(parsed) ? parsed : null;
|
||||
}
|
||||
|
||||
function disableImportedTimerHeartbeat(runtimeConfig: unknown) {
|
||||
const next = clonePortableRecord(runtimeConfig) ?? {};
|
||||
const heartbeat = isPlainRecord(next.heartbeat) ? { ...next.heartbeat } : {};
|
||||
heartbeat.enabled = false;
|
||||
if (parseFiniteNumberLike(heartbeat.maxConcurrentRuns) == null) {
|
||||
heartbeat.maxConcurrentRuns = AGENT_DEFAULT_MAX_CONCURRENT_RUNS;
|
||||
}
|
||||
next.heartbeat = heartbeat;
|
||||
return next;
|
||||
}
|
||||
@@ -4209,13 +4220,7 @@ export function companyPortabilityService(db: Db, storage?: StorageService) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const requiresApproval =
|
||||
typeof targetCompany.requireBoardApprovalForNewAgents === "boolean"
|
||||
? targetCompany.requireBoardApprovalForNewAgents
|
||||
: include.company
|
||||
? (sourceManifest.company?.requireBoardApprovalForNewAgents ?? true)
|
||||
: true;
|
||||
const createdStatus = requiresApproval ? "pending_approval" : "idle";
|
||||
const createdStatus = "idle";
|
||||
let created = await agents.create(targetCompany.id, {
|
||||
...patch,
|
||||
status: createdStatus,
|
||||
|
||||
@@ -4,8 +4,14 @@ import { execFile as execFileCallback } from "node:child_process";
|
||||
import { promisify } from "node:util";
|
||||
import { and, asc, desc, eq, getTableColumns, gt, inArray, isNull, or, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import { ISSUE_CONTINUATION_SUMMARY_DOCUMENT_KEY } from "@paperclipai/shared";
|
||||
import type { BillingType, ExecutionWorkspace, ExecutionWorkspaceConfig, RunLivenessState } from "@paperclipai/shared";
|
||||
import {
|
||||
AGENT_DEFAULT_MAX_CONCURRENT_RUNS,
|
||||
ISSUE_CONTINUATION_SUMMARY_DOCUMENT_KEY,
|
||||
type BillingType,
|
||||
type ExecutionWorkspace,
|
||||
type ExecutionWorkspaceConfig,
|
||||
type RunLivenessState,
|
||||
} from "@paperclipai/shared";
|
||||
import {
|
||||
agents,
|
||||
agentRuntimeState,
|
||||
@@ -31,7 +37,7 @@ import { getRunLogStore, type RunLogHandle } from "./run-log-store.js";
|
||||
import { getServerAdapter, runningProcesses } from "../adapters/index.js";
|
||||
import type { AdapterExecutionResult, AdapterInvocationMeta, AdapterSessionCodec, UsageSummary } from "../adapters/index.js";
|
||||
import { createLocalAgentJwt } from "../agent-auth-jwt.js";
|
||||
import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js";
|
||||
import { parseObject, asBoolean, asNumber, appendWithByteCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js";
|
||||
import { costService } from "./costs.js";
|
||||
import { trackAgentFirstHeartbeat } from "@paperclipai/shared/telemetry";
|
||||
import { getTelemetryClient } from "../telemetry.js";
|
||||
@@ -104,7 +110,11 @@ import { extractSkillMentionIds } from "@paperclipai/shared";
|
||||
|
||||
const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
|
||||
const MAX_PERSISTED_LOG_CHUNK_CHARS = 64 * 1024;
|
||||
const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = 1;
|
||||
const MAX_RUN_EVENT_PAYLOAD_STRING_CHARS = 16 * 1024;
|
||||
const MAX_RUN_EVENT_PAYLOAD_ARRAY_ITEMS = 50;
|
||||
const MAX_RUN_EVENT_PAYLOAD_OBJECT_KEYS = 100;
|
||||
const MAX_RUN_EVENT_PAYLOAD_DEPTH = 6;
|
||||
const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = AGENT_DEFAULT_MAX_CONCURRENT_RUNS;
|
||||
const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10;
|
||||
const DEFERRED_WAKE_CONTEXT_KEY = "_paperclipWakeContext";
|
||||
const WAKE_COMMENT_IDS_KEY = "wakeCommentIds";
|
||||
@@ -119,6 +129,8 @@ const MAX_INLINE_WAKE_COMMENT_BODY_CHARS = 4_000;
|
||||
const MAX_INLINE_WAKE_COMMENT_BODY_TOTAL_CHARS = 12_000;
|
||||
const execFile = promisify(execFileCallback);
|
||||
const ACTIVE_HEARTBEAT_RUN_STATUSES = ["queued", "running"] as const;
|
||||
const UNSUCCESSFUL_HEARTBEAT_RUN_TERMINAL_STATUSES = ["failed", "cancelled", "timed_out"] as const;
|
||||
const RUNNING_ISSUE_WAKE_REASONS_REQUIRING_FOLLOWUP = new Set(["approval_approved"]);
|
||||
const SESSIONED_LOCAL_ADAPTERS = new Set([
|
||||
"claude_local",
|
||||
"codex_local",
|
||||
@@ -504,6 +516,15 @@ const heartbeatRunSafeColumns = {
|
||||
resultJson: heartbeatRunSafeResultJsonColumn,
|
||||
} as const;
|
||||
|
||||
const heartbeatRunSqlAsciiSafeColumns = {
|
||||
...getTableColumns(heartbeatRuns),
|
||||
processGroupId: heartbeatRunProcessGroupIdColumn,
|
||||
error: sql<string | null>`NULL`.as("error"),
|
||||
resultJson: sql<Record<string, unknown> | null>`NULL`.as("resultJson"),
|
||||
stdoutExcerpt: sql<string | null>`NULL`.as("stdoutExcerpt"),
|
||||
stderrExcerpt: sql<string | null>`NULL`.as("stderrExcerpt"),
|
||||
} as const;
|
||||
|
||||
const heartbeatRunLogAccessColumns = {
|
||||
id: heartbeatRuns.id,
|
||||
companyId: heartbeatRuns.companyId,
|
||||
@@ -529,7 +550,81 @@ const heartbeatRunIssueSummaryColumns = {
|
||||
} as const;
|
||||
|
||||
function appendExcerpt(prev: string, chunk: string) {
|
||||
return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES);
|
||||
return appendWithByteCap(prev, chunk, MAX_EXCERPT_BYTES);
|
||||
}
|
||||
|
||||
function truncateRunEventString(value: string) {
|
||||
if (value.length <= MAX_RUN_EVENT_PAYLOAD_STRING_CHARS) return value;
|
||||
const omittedChars = value.length - MAX_RUN_EVENT_PAYLOAD_STRING_CHARS;
|
||||
return `${value.slice(0, MAX_RUN_EVENT_PAYLOAD_STRING_CHARS)}\n[truncated ${omittedChars} chars]`;
|
||||
}
|
||||
|
||||
function boundRunEventValue(value: unknown, depth: number, seen: WeakSet<object>): unknown {
|
||||
if (typeof value === "string") {
|
||||
return truncateRunEventString(value);
|
||||
}
|
||||
if (
|
||||
value === null
|
||||
|| typeof value === "number"
|
||||
|| typeof value === "boolean"
|
||||
) {
|
||||
return value;
|
||||
}
|
||||
if (value instanceof Date) {
|
||||
return value.toISOString();
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
if (depth >= MAX_RUN_EVENT_PAYLOAD_DEPTH) {
|
||||
return {
|
||||
_truncated: true,
|
||||
type: "array",
|
||||
originalLength: value.length,
|
||||
};
|
||||
}
|
||||
const bounded = value
|
||||
.slice(0, MAX_RUN_EVENT_PAYLOAD_ARRAY_ITEMS)
|
||||
.map((entry) => boundRunEventValue(entry, depth + 1, seen));
|
||||
if (value.length > MAX_RUN_EVENT_PAYLOAD_ARRAY_ITEMS) {
|
||||
bounded.push({
|
||||
_truncated: true,
|
||||
omittedItems: value.length - MAX_RUN_EVENT_PAYLOAD_ARRAY_ITEMS,
|
||||
});
|
||||
}
|
||||
return bounded;
|
||||
}
|
||||
if (typeof value !== "object" || value === undefined) {
|
||||
return null;
|
||||
}
|
||||
if (seen.has(value)) {
|
||||
return "[Circular]";
|
||||
}
|
||||
seen.add(value);
|
||||
const entries = Object.entries(value as Record<string, unknown>);
|
||||
if (depth >= MAX_RUN_EVENT_PAYLOAD_DEPTH) {
|
||||
const bounded = {
|
||||
_truncated: true,
|
||||
type: "object",
|
||||
keys: entries.map(([key]) => key).slice(0, 20),
|
||||
};
|
||||
seen.delete(value);
|
||||
return bounded;
|
||||
}
|
||||
|
||||
const out: Record<string, unknown> = {};
|
||||
for (const [key, entryValue] of entries.slice(0, MAX_RUN_EVENT_PAYLOAD_OBJECT_KEYS)) {
|
||||
out[key] = boundRunEventValue(entryValue, depth + 1, seen);
|
||||
}
|
||||
if (entries.length > MAX_RUN_EVENT_PAYLOAD_OBJECT_KEYS) {
|
||||
out._truncated = true;
|
||||
out._omittedKeys = entries.length - MAX_RUN_EVENT_PAYLOAD_OBJECT_KEYS;
|
||||
}
|
||||
seen.delete(value);
|
||||
return out;
|
||||
}
|
||||
|
||||
export function boundHeartbeatRunEventPayloadForStorage(payload: Record<string, unknown>): Record<string, unknown> {
|
||||
const bounded = boundRunEventValue(payload, 0, new WeakSet());
|
||||
return parseObject(bounded) ?? { _truncated: true };
|
||||
}
|
||||
|
||||
function redactInlineBase64ImageData(chunk: string) {
|
||||
@@ -716,6 +811,22 @@ function summarizeRunFailureForIssueComment(
|
||||
return null;
|
||||
}
|
||||
|
||||
function didAutomaticRecoveryFail(
|
||||
latestRun: Pick<typeof heartbeatRuns.$inferSelect, "status" | "contextSnapshot"> | null,
|
||||
expectedRetryReason: "assignment_recovery" | "issue_continuation_needed",
|
||||
) {
|
||||
if (!latestRun) return false;
|
||||
|
||||
const latestContext = parseObject(latestRun.contextSnapshot);
|
||||
const latestRetryReason = readNonEmptyString(latestContext.retryReason);
|
||||
return (
|
||||
latestRetryReason === expectedRetryReason &&
|
||||
UNSUCCESSFUL_HEARTBEAT_RUN_TERMINAL_STATUSES.includes(
|
||||
latestRun.status as (typeof UNSUCCESSFUL_HEARTBEAT_RUN_TERMINAL_STATUSES)[number],
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
function normalizeLedgerBillingType(value: unknown): BillingType {
|
||||
const raw = readNonEmptyString(value);
|
||||
switch (raw) {
|
||||
@@ -1095,6 +1206,15 @@ function shouldAutoCheckoutIssueForWake(input: {
|
||||
return true;
|
||||
}
|
||||
|
||||
function shouldQueueFollowupForRunningIssueWake(input: {
|
||||
contextSnapshot: Record<string, unknown> | null | undefined;
|
||||
wakeCommentId: string | null;
|
||||
}) {
|
||||
if (input.wakeCommentId) return true;
|
||||
const wakeReason = readNonEmptyString(input.contextSnapshot?.wakeReason);
|
||||
return Boolean(wakeReason && RUNNING_ISSUE_WAKE_REASONS_REQUIRING_FOLLOWUP.has(wakeReason));
|
||||
}
|
||||
|
||||
function isCheckoutConflictError(error: unknown): boolean {
|
||||
return error instanceof HttpError && error.status === 409 && error.message === "Issue checkout conflict";
|
||||
}
|
||||
@@ -1577,6 +1697,26 @@ export function heartbeatService(db: Db) {
|
||||
cancelWorkForScope: cancelBudgetScopeWork,
|
||||
};
|
||||
const budgets = budgetService(db, budgetHooks);
|
||||
let unsafeTextProjectionPromise: Promise<boolean> | null = null;
|
||||
|
||||
async function hasUnsafeTextProjectionDatabase() {
|
||||
if (!unsafeTextProjectionPromise) {
|
||||
unsafeTextProjectionPromise = db
|
||||
.execute(sql`select current_setting('server_encoding') as server_encoding`)
|
||||
.then((rows) => {
|
||||
const first = Array.isArray(rows) ? rows[0] : null;
|
||||
const serverEncoding = typeof first === "object" && first !== null
|
||||
? (first as Record<string, unknown>).server_encoding
|
||||
: null;
|
||||
return typeof serverEncoding === "string" && serverEncoding.toUpperCase() === "SQL_ASCII";
|
||||
})
|
||||
.catch((err) => {
|
||||
logger.warn({ err }, "failed to inspect database server encoding; using conservative heartbeat result projection");
|
||||
return true;
|
||||
});
|
||||
}
|
||||
return unsafeTextProjectionPromise;
|
||||
}
|
||||
|
||||
async function getAgent(agentId: string) {
|
||||
return db
|
||||
@@ -1587,8 +1727,15 @@ export function heartbeatService(db: Db) {
|
||||
}
|
||||
|
||||
async function getRun(runId: string, opts?: { unsafeFullResultJson?: boolean }) {
|
||||
const safeForLegacyEncoding = !opts?.unsafeFullResultJson && await hasUnsafeTextProjectionDatabase();
|
||||
return db
|
||||
.select(opts?.unsafeFullResultJson ? getTableColumns(heartbeatRuns) : heartbeatRunSafeColumns)
|
||||
.select(
|
||||
opts?.unsafeFullResultJson
|
||||
? getTableColumns(heartbeatRuns)
|
||||
: safeForLegacyEncoding
|
||||
? heartbeatRunSqlAsciiSafeColumns
|
||||
: heartbeatRunSafeColumns,
|
||||
)
|
||||
.from(heartbeatRuns)
|
||||
.where(eq(heartbeatRuns.id, runId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
@@ -2393,9 +2540,12 @@ export function heartbeatService(db: Db) {
|
||||
const sanitizedMessage = event.message
|
||||
? redactCurrentUserText(event.message, currentUserRedactionOptions)
|
||||
: event.message;
|
||||
const sanitizedPayload = event.payload
|
||||
? redactCurrentUserValue(event.payload, currentUserRedactionOptions)
|
||||
const boundedPayload = event.payload
|
||||
? boundHeartbeatRunEventPayloadForStorage(event.payload)
|
||||
: event.payload;
|
||||
const sanitizedPayload = boundedPayload
|
||||
? redactCurrentUserValue(boundedPayload, currentUserRedactionOptions)
|
||||
: boundedPayload;
|
||||
|
||||
await db.insert(heartbeatRunEvents).values({
|
||||
companyId: run.companyId,
|
||||
@@ -3484,16 +3634,13 @@ export function heartbeatService(db: Db) {
|
||||
}
|
||||
|
||||
const latestRun = await getLatestIssueRun(issue.companyId, issue.id);
|
||||
const latestContext = parseObject(latestRun?.contextSnapshot);
|
||||
const latestRetryReason = readNonEmptyString(latestContext.retryReason);
|
||||
|
||||
if (issue.status === "todo") {
|
||||
if (!latestRun || latestRun.status === "succeeded") {
|
||||
result.skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (latestRetryReason === "assignment_recovery") {
|
||||
if (didAutomaticRecoveryFail(latestRun, "assignment_recovery")) {
|
||||
const failureSummary = summarizeRunFailureForIssueComment(latestRun);
|
||||
const updated = await escalateStrandedAssignedIssue({
|
||||
issue,
|
||||
@@ -3530,7 +3677,12 @@ export function heartbeatService(db: Db) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (latestRetryReason === "issue_continuation_needed") {
|
||||
if (!latestRun && !issue.checkoutRunId && !issue.executionRunId) {
|
||||
result.skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (didAutomaticRecoveryFail(latestRun, "issue_continuation_needed")) {
|
||||
const failureSummary = summarizeRunFailureForIssueComment(latestRun);
|
||||
const updated = await escalateStrandedAssignedIssue({
|
||||
issue,
|
||||
@@ -5137,12 +5289,12 @@ export function heartbeatService(db: Db) {
|
||||
normalizeAgentNameKey(executionAgent?.name);
|
||||
const isSameExecutionAgent =
|
||||
Boolean(executionAgentNameKey) && executionAgentNameKey === agentNameKey;
|
||||
const shouldQueueFollowupForCommentWake =
|
||||
Boolean(wakeCommentId) &&
|
||||
const shouldQueueFollowupForRunningWake =
|
||||
shouldQueueFollowupForRunningIssueWake({ contextSnapshot: enrichedContextSnapshot, wakeCommentId }) &&
|
||||
activeExecutionRun.status === "running" &&
|
||||
isSameExecutionAgent;
|
||||
|
||||
if (isSameExecutionAgent && !shouldQueueFollowupForCommentWake) {
|
||||
if (isSameExecutionAgent && !shouldQueueFollowupForRunningWake) {
|
||||
const mergedContextSnapshot = mergeCoalescedContextSnapshot(
|
||||
activeExecutionRun.contextSnapshot,
|
||||
enrichedContextSnapshot,
|
||||
@@ -5319,12 +5471,14 @@ export function heartbeatService(db: Db) {
|
||||
const sameScopeRunningRun = activeRuns.find(
|
||||
(candidate) => candidate.status === "running" && isSameTaskScope(runTaskKey(candidate), taskKey),
|
||||
);
|
||||
const shouldQueueFollowupForCommentWake =
|
||||
Boolean(wakeCommentId) && Boolean(sameScopeRunningRun) && !sameScopeQueuedRun;
|
||||
const shouldQueueFollowupForRunningWake =
|
||||
Boolean(sameScopeRunningRun) &&
|
||||
!sameScopeQueuedRun &&
|
||||
shouldQueueFollowupForRunningIssueWake({ contextSnapshot: enrichedContextSnapshot, wakeCommentId });
|
||||
|
||||
const coalescedTargetRun =
|
||||
sameScopeQueuedRun ??
|
||||
(shouldQueueFollowupForCommentWake ? null : sameScopeRunningRun ?? null);
|
||||
(shouldQueueFollowupForRunningWake ? null : sameScopeRunningRun ?? null);
|
||||
|
||||
if (coalescedTargetRun) {
|
||||
const mergedContextSnapshot = mergeCoalescedContextSnapshot(
|
||||
@@ -5646,12 +5800,21 @@ export function heartbeatService(db: Db) {
|
||||
|
||||
return {
|
||||
list: async (companyId: string, agentId?: string, limit?: number) => {
|
||||
const safeForLegacyEncoding = await hasUnsafeTextProjectionDatabase();
|
||||
const query = db
|
||||
.select({
|
||||
...heartbeatRunListColumns,
|
||||
...heartbeatRunListContextColumns,
|
||||
...heartbeatRunListResultColumns,
|
||||
})
|
||||
.select(
|
||||
safeForLegacyEncoding
|
||||
? {
|
||||
...heartbeatRunListColumns,
|
||||
error: sql<string | null>`NULL`.as("error"),
|
||||
...heartbeatRunListContextColumns,
|
||||
}
|
||||
: {
|
||||
...heartbeatRunListColumns,
|
||||
...heartbeatRunListContextColumns,
|
||||
...heartbeatRunListResultColumns,
|
||||
},
|
||||
)
|
||||
.from(heartbeatRuns)
|
||||
.where(
|
||||
agentId
|
||||
@@ -5679,7 +5842,15 @@ export function heartbeatService(db: Db) {
|
||||
resultCostUsd,
|
||||
resultCostUsdCamel,
|
||||
...rest
|
||||
} = row;
|
||||
} = row as typeof row & {
|
||||
resultSummary?: string | null;
|
||||
resultResult?: string | null;
|
||||
resultMessage?: string | null;
|
||||
resultError?: string | null;
|
||||
resultTotalCostUsd?: string | null;
|
||||
resultCostUsd?: string | null;
|
||||
resultCostUsdCamel?: string | null;
|
||||
};
|
||||
|
||||
return {
|
||||
...rest,
|
||||
@@ -5693,15 +5864,17 @@ export function heartbeatService(db: Db) {
|
||||
wakeSource: contextWakeSource,
|
||||
wakeTriggerDetail: contextWakeTriggerDetail,
|
||||
}),
|
||||
resultJson: summarizeHeartbeatRunListResultJson({
|
||||
summary: resultSummary,
|
||||
result: resultResult,
|
||||
message: resultMessage,
|
||||
error: resultError,
|
||||
totalCostUsd: resultTotalCostUsd,
|
||||
costUsd: resultCostUsd,
|
||||
costUsdCamel: resultCostUsdCamel,
|
||||
}),
|
||||
resultJson: safeForLegacyEncoding
|
||||
? null
|
||||
: summarizeHeartbeatRunListResultJson({
|
||||
summary: resultSummary,
|
||||
result: resultResult,
|
||||
message: resultMessage,
|
||||
error: resultError,
|
||||
totalCostUsd: resultTotalCostUsd,
|
||||
costUsd: resultCostUsd,
|
||||
costUsdCamel: resultCostUsdCamel,
|
||||
}),
|
||||
};
|
||||
});
|
||||
},
|
||||
@@ -5810,7 +5983,9 @@ export function heartbeatService(db: Db) {
|
||||
store: run.logStore,
|
||||
logRef: run.logRef,
|
||||
...result,
|
||||
content: redactCurrentUserText(result.content, await getCurrentUserRedactionOptions()),
|
||||
// Run-log chunks are already redacted before they are appended to the store.
|
||||
// Rewriting the full chunk again on every poll creates avoidable string copies.
|
||||
content: result.content,
|
||||
};
|
||||
},
|
||||
|
||||
|
||||
@@ -12,7 +12,13 @@ export {
|
||||
refreshIssueContinuationSummary,
|
||||
} from "./issue-continuation-summary.js";
|
||||
export { projectService } from "./projects.js";
|
||||
export { issueService, type IssueFilters } from "./issues.js";
|
||||
export {
|
||||
clampIssueListLimit,
|
||||
ISSUE_LIST_DEFAULT_LIMIT,
|
||||
ISSUE_LIST_MAX_LIMIT,
|
||||
issueService,
|
||||
type IssueFilters,
|
||||
} from "./issues.js";
|
||||
export { issueApprovalService } from "./issue-approvals.js";
|
||||
export { goalService } from "./goals.js";
|
||||
export { activityService, type ActivityFilters } from "./activity.js";
|
||||
|
||||
@@ -85,7 +85,16 @@ export function instanceSettingsService(db: Db) {
|
||||
})
|
||||
.returning();
|
||||
|
||||
return created;
|
||||
if (created) return created;
|
||||
|
||||
const raced = await db
|
||||
.select()
|
||||
.from(instanceSettings)
|
||||
.where(eq(instanceSettings.singletonKey, DEFAULT_SINGLETON_KEY))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (raced) return raced;
|
||||
|
||||
throw new Error("Failed to initialize instance settings row");
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
+184
-123
@@ -38,6 +38,9 @@ import { getDefaultCompanyGoal } from "./goals.js";
|
||||
|
||||
const ALL_ISSUE_STATUSES = ["backlog", "todo", "in_progress", "in_review", "blocked", "done", "cancelled"];
|
||||
const MAX_ISSUE_COMMENT_PAGE_LIMIT = 500;
|
||||
export const ISSUE_LIST_DEFAULT_LIMIT = 500;
|
||||
export const ISSUE_LIST_MAX_LIMIT = 1000;
|
||||
const ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE = 500;
|
||||
export const MAX_CHILD_ISSUES_CREATED_BY_HELPER = 25;
|
||||
const MAX_CHILD_COMPLETION_SUMMARIES = 20;
|
||||
const CHILD_COMPLETION_SUMMARY_BODY_MAX_CHARS = 500;
|
||||
@@ -106,6 +109,10 @@ type IssueUserCommentStats = {
|
||||
myLastCommentAt: Date | null;
|
||||
lastExternalCommentAt: Date | null;
|
||||
};
|
||||
type IssueReadStat = {
|
||||
issueId: string;
|
||||
myLastReadAt: Date | null;
|
||||
};
|
||||
type IssueLastActivityStat = {
|
||||
issueId: string;
|
||||
latestCommentAt: Date | null;
|
||||
@@ -158,6 +165,18 @@ function escapeLikePattern(value: string): string {
|
||||
return value.replace(/[\\%_]/g, "\\$&");
|
||||
}
|
||||
|
||||
export function clampIssueListLimit(limit: number): number {
|
||||
return Math.min(ISSUE_LIST_MAX_LIMIT, Math.max(1, Math.floor(limit)));
|
||||
}
|
||||
|
||||
function chunkList<T>(values: T[], size: number): T[][] {
|
||||
const chunks: T[][] = [];
|
||||
for (let index = 0; index < values.length; index += size) {
|
||||
chunks.push(values.slice(index, index + size));
|
||||
}
|
||||
return chunks;
|
||||
}
|
||||
|
||||
function truncateInlineSummary(value: string | null | undefined, maxChars = CHILD_COMPLETION_SUMMARY_BODY_MAX_CHARS) {
|
||||
const normalized = value?.trim();
|
||||
if (!normalized) return null;
|
||||
@@ -494,20 +513,22 @@ function latestIssueActivityAt(...values: Array<Date | string | null | undefined
|
||||
async function labelMapForIssues(dbOrTx: any, issueIds: string[]): Promise<Map<string, IssueLabelRow[]>> {
|
||||
const map = new Map<string, IssueLabelRow[]>();
|
||||
if (issueIds.length === 0) return map;
|
||||
const rows = await dbOrTx
|
||||
.select({
|
||||
issueId: issueLabels.issueId,
|
||||
label: labels,
|
||||
})
|
||||
.from(issueLabels)
|
||||
.innerJoin(labels, eq(issueLabels.labelId, labels.id))
|
||||
.where(inArray(issueLabels.issueId, issueIds))
|
||||
.orderBy(asc(labels.name), asc(labels.id));
|
||||
for (const issueIdChunk of chunkList(issueIds, ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) {
|
||||
const rows = await dbOrTx
|
||||
.select({
|
||||
issueId: issueLabels.issueId,
|
||||
label: labels,
|
||||
})
|
||||
.from(issueLabels)
|
||||
.innerJoin(labels, eq(issueLabels.labelId, labels.id))
|
||||
.where(inArray(issueLabels.issueId, issueIdChunk))
|
||||
.orderBy(asc(labels.name), asc(labels.id));
|
||||
|
||||
for (const row of rows) {
|
||||
const existing = map.get(row.issueId);
|
||||
if (existing) existing.push(row.label);
|
||||
else map.set(row.issueId, [row.label]);
|
||||
for (const row of rows) {
|
||||
const existing = map.get(row.issueId);
|
||||
if (existing) existing.push(row.label);
|
||||
else map.set(row.issueId, [row.label]);
|
||||
}
|
||||
}
|
||||
return map;
|
||||
}
|
||||
@@ -537,27 +558,29 @@ async function activeRunMapForIssues(
|
||||
.filter((id): id is string => id != null);
|
||||
if (runIds.length === 0) return map;
|
||||
|
||||
const rows = await dbOrTx
|
||||
.select({
|
||||
id: heartbeatRuns.id,
|
||||
status: heartbeatRuns.status,
|
||||
agentId: heartbeatRuns.agentId,
|
||||
invocationSource: heartbeatRuns.invocationSource,
|
||||
triggerDetail: heartbeatRuns.triggerDetail,
|
||||
startedAt: heartbeatRuns.startedAt,
|
||||
finishedAt: heartbeatRuns.finishedAt,
|
||||
createdAt: heartbeatRuns.createdAt,
|
||||
})
|
||||
.from(heartbeatRuns)
|
||||
.where(
|
||||
and(
|
||||
inArray(heartbeatRuns.id, runIds),
|
||||
inArray(heartbeatRuns.status, ACTIVE_RUN_STATUSES),
|
||||
),
|
||||
);
|
||||
for (const runIdChunk of chunkList([...new Set(runIds)], ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) {
|
||||
const rows = await dbOrTx
|
||||
.select({
|
||||
id: heartbeatRuns.id,
|
||||
status: heartbeatRuns.status,
|
||||
agentId: heartbeatRuns.agentId,
|
||||
invocationSource: heartbeatRuns.invocationSource,
|
||||
triggerDetail: heartbeatRuns.triggerDetail,
|
||||
startedAt: heartbeatRuns.startedAt,
|
||||
finishedAt: heartbeatRuns.finishedAt,
|
||||
createdAt: heartbeatRuns.createdAt,
|
||||
})
|
||||
.from(heartbeatRuns)
|
||||
.where(
|
||||
and(
|
||||
inArray(heartbeatRuns.id, runIdChunk),
|
||||
inArray(heartbeatRuns.status, ACTIVE_RUN_STATUSES),
|
||||
),
|
||||
);
|
||||
|
||||
for (const row of rows) {
|
||||
map.set(row.id, row);
|
||||
for (const row of rows) {
|
||||
map.set(row.id, row);
|
||||
}
|
||||
}
|
||||
return map;
|
||||
}
|
||||
@@ -617,6 +640,131 @@ function withActiveRuns(
|
||||
}));
|
||||
}
|
||||
|
||||
async function userCommentStatsForIssues(
|
||||
dbOrTx: any,
|
||||
companyId: string,
|
||||
userId: string,
|
||||
issueIds: string[],
|
||||
): Promise<IssueUserCommentStats[]> {
|
||||
const stats: IssueUserCommentStats[] = [];
|
||||
for (const issueIdChunk of chunkList(issueIds, ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) {
|
||||
const rows = await dbOrTx
|
||||
.select({
|
||||
issueId: issueComments.issueId,
|
||||
myLastCommentAt: sql<Date | null>`
|
||||
MAX(CASE WHEN ${issueComments.authorUserId} = ${userId} THEN ${issueComments.createdAt} END)
|
||||
`,
|
||||
lastExternalCommentAt: sql<Date | null>`
|
||||
MAX(
|
||||
CASE
|
||||
WHEN ${issueComments.authorUserId} IS NULL OR ${issueComments.authorUserId} <> ${userId}
|
||||
THEN ${issueComments.createdAt}
|
||||
END
|
||||
)
|
||||
`,
|
||||
})
|
||||
.from(issueComments)
|
||||
.where(
|
||||
and(
|
||||
eq(issueComments.companyId, companyId),
|
||||
inArray(issueComments.issueId, issueIdChunk),
|
||||
),
|
||||
)
|
||||
.groupBy(issueComments.issueId);
|
||||
stats.push(...rows);
|
||||
}
|
||||
return stats;
|
||||
}
|
||||
|
||||
async function userReadStatsForIssues(
|
||||
dbOrTx: any,
|
||||
companyId: string,
|
||||
userId: string,
|
||||
issueIds: string[],
|
||||
): Promise<IssueReadStat[]> {
|
||||
const stats: IssueReadStat[] = [];
|
||||
for (const issueIdChunk of chunkList(issueIds, ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) {
|
||||
const rows = await dbOrTx
|
||||
.select({
|
||||
issueId: issueReadStates.issueId,
|
||||
myLastReadAt: issueReadStates.lastReadAt,
|
||||
})
|
||||
.from(issueReadStates)
|
||||
.where(
|
||||
and(
|
||||
eq(issueReadStates.companyId, companyId),
|
||||
eq(issueReadStates.userId, userId),
|
||||
inArray(issueReadStates.issueId, issueIdChunk),
|
||||
),
|
||||
);
|
||||
stats.push(...rows);
|
||||
}
|
||||
return stats;
|
||||
}
|
||||
|
||||
async function lastActivityStatsForIssues(
|
||||
dbOrTx: any,
|
||||
companyId: string,
|
||||
issueIds: string[],
|
||||
): Promise<IssueLastActivityStat[]> {
|
||||
const byIssueId = new Map<string, IssueLastActivityStat>();
|
||||
for (const issueIdChunk of chunkList(issueIds, ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) {
|
||||
const [commentRows, logRows] = await Promise.all([
|
||||
dbOrTx
|
||||
.select({
|
||||
issueId: issueComments.issueId,
|
||||
latestCommentAt: sql<Date | null>`MAX(${issueComments.createdAt})`,
|
||||
})
|
||||
.from(issueComments)
|
||||
.where(
|
||||
and(
|
||||
eq(issueComments.companyId, companyId),
|
||||
inArray(issueComments.issueId, issueIdChunk),
|
||||
),
|
||||
)
|
||||
.groupBy(issueComments.issueId),
|
||||
dbOrTx
|
||||
.select({
|
||||
issueId: activityLog.entityId,
|
||||
latestLogAt: sql<Date | null>`MAX(${activityLog.createdAt})`,
|
||||
})
|
||||
.from(activityLog)
|
||||
.where(
|
||||
and(
|
||||
eq(activityLog.companyId, companyId),
|
||||
eq(activityLog.entityType, "issue"),
|
||||
inArray(activityLog.entityId, issueIdChunk),
|
||||
sql`${activityLog.action} NOT IN (${sql.join(
|
||||
ISSUE_LOCAL_INBOX_ACTIVITY_ACTIONS.map((action) => sql`${action}`),
|
||||
sql`, `,
|
||||
)})`,
|
||||
),
|
||||
)
|
||||
.groupBy(activityLog.entityId),
|
||||
]);
|
||||
|
||||
for (const row of commentRows) {
|
||||
byIssueId.set(row.issueId, {
|
||||
issueId: row.issueId,
|
||||
latestCommentAt: row.latestCommentAt,
|
||||
latestLogAt: null,
|
||||
});
|
||||
}
|
||||
for (const row of logRows) {
|
||||
const existing = byIssueId.get(row.issueId);
|
||||
if (existing) existing.latestLogAt = row.latestLogAt;
|
||||
else {
|
||||
byIssueId.set(row.issueId, {
|
||||
issueId: row.issueId,
|
||||
latestCommentAt: null,
|
||||
latestLogAt: row.latestLogAt,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
return [...byIssueId.values()];
|
||||
}
|
||||
|
||||
export function issueService(db: Db) {
|
||||
const instanceSettings = instanceSettingsService(db);
|
||||
|
||||
@@ -1105,99 +1253,12 @@ export function issueService(db: Db) {
|
||||
const issueIds = withRuns.map((row) => row.id);
|
||||
const [statsRows, readRows, lastActivityRows] = await Promise.all([
|
||||
contextUserId
|
||||
? db
|
||||
.select({
|
||||
issueId: issueComments.issueId,
|
||||
myLastCommentAt: sql<Date | null>`
|
||||
MAX(CASE WHEN ${issueComments.authorUserId} = ${contextUserId} THEN ${issueComments.createdAt} END)
|
||||
`,
|
||||
lastExternalCommentAt: sql<Date | null>`
|
||||
MAX(
|
||||
CASE
|
||||
WHEN ${issueComments.authorUserId} IS NULL OR ${issueComments.authorUserId} <> ${contextUserId}
|
||||
THEN ${issueComments.createdAt}
|
||||
END
|
||||
)
|
||||
`,
|
||||
})
|
||||
.from(issueComments)
|
||||
.where(
|
||||
and(
|
||||
eq(issueComments.companyId, companyId),
|
||||
inArray(issueComments.issueId, issueIds),
|
||||
),
|
||||
)
|
||||
.groupBy(issueComments.issueId)
|
||||
? userCommentStatsForIssues(db, companyId, contextUserId, issueIds)
|
||||
: Promise.resolve([]),
|
||||
contextUserId
|
||||
? db
|
||||
.select({
|
||||
issueId: issueReadStates.issueId,
|
||||
myLastReadAt: issueReadStates.lastReadAt,
|
||||
})
|
||||
.from(issueReadStates)
|
||||
.where(
|
||||
and(
|
||||
eq(issueReadStates.companyId, companyId),
|
||||
eq(issueReadStates.userId, contextUserId),
|
||||
inArray(issueReadStates.issueId, issueIds),
|
||||
),
|
||||
)
|
||||
? userReadStatsForIssues(db, companyId, contextUserId, issueIds)
|
||||
: Promise.resolve([]),
|
||||
Promise.all([
|
||||
db
|
||||
.select({
|
||||
issueId: issueComments.issueId,
|
||||
latestCommentAt: sql<Date | null>`MAX(${issueComments.createdAt})`,
|
||||
})
|
||||
.from(issueComments)
|
||||
.where(
|
||||
and(
|
||||
eq(issueComments.companyId, companyId),
|
||||
inArray(issueComments.issueId, issueIds),
|
||||
),
|
||||
)
|
||||
.groupBy(issueComments.issueId),
|
||||
db
|
||||
.select({
|
||||
issueId: activityLog.entityId,
|
||||
latestLogAt: sql<Date | null>`MAX(${activityLog.createdAt})`,
|
||||
})
|
||||
.from(activityLog)
|
||||
.where(
|
||||
and(
|
||||
eq(activityLog.companyId, companyId),
|
||||
eq(activityLog.entityType, "issue"),
|
||||
inArray(activityLog.entityId, issueIds),
|
||||
sql`${activityLog.action} NOT IN (${sql.join(
|
||||
ISSUE_LOCAL_INBOX_ACTIVITY_ACTIONS.map((action) => sql`${action}`),
|
||||
sql`, `,
|
||||
)})`,
|
||||
),
|
||||
)
|
||||
.groupBy(activityLog.entityId),
|
||||
]).then(([commentRows, logRows]) => {
|
||||
const byIssueId = new Map<string, IssueLastActivityStat>();
|
||||
for (const row of commentRows) {
|
||||
byIssueId.set(row.issueId, {
|
||||
issueId: row.issueId,
|
||||
latestCommentAt: row.latestCommentAt,
|
||||
latestLogAt: null,
|
||||
});
|
||||
}
|
||||
for (const row of logRows) {
|
||||
const existing = byIssueId.get(row.issueId);
|
||||
if (existing) existing.latestLogAt = row.latestLogAt;
|
||||
else {
|
||||
byIssueId.set(row.issueId, {
|
||||
issueId: row.issueId,
|
||||
latestCommentAt: null,
|
||||
latestLogAt: row.latestLogAt,
|
||||
});
|
||||
}
|
||||
}
|
||||
return [...byIssueId.values()];
|
||||
}),
|
||||
lastActivityStatsForIssues(db, companyId, issueIds),
|
||||
]);
|
||||
const statsByIssueId = new Map(statsRows.map((row) => [row.issueId, row]));
|
||||
const lastActivityByIssueId = new Map(lastActivityRows.map((row) => [row.issueId, row]));
|
||||
|
||||
@@ -250,9 +250,9 @@ export function workspaceOperationService(db: Db) {
|
||||
store: operation.logStore,
|
||||
logRef: operation.logRef,
|
||||
...result,
|
||||
content: redactCurrentUserText(result.content, {
|
||||
enabled: (await instanceSettings.getGeneral()).censorUsernameInLogs,
|
||||
}),
|
||||
// Workspace-operation log chunks are sanitized before append-time storage.
|
||||
// Returning the stored chunk avoids another whole-string rewrite per poll.
|
||||
content: result.content,
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
@@ -413,32 +413,33 @@ function formatCommandForDisplay(command: string, args: string[]) {
|
||||
.join(" ");
|
||||
}
|
||||
|
||||
function trimToLastBytes(value: string, limit: number) {
|
||||
const byteLength = Buffer.byteLength(value, "utf8");
|
||||
if (byteLength <= limit) return value;
|
||||
return Buffer.from(value, "utf8").subarray(byteLength - limit).toString("utf8");
|
||||
}
|
||||
|
||||
function createProcessOutputCapture(maxBytes: number): ProcessOutputAccumulator {
|
||||
const limit = Math.max(1, Math.trunc(maxBytes));
|
||||
let chunks: string[] = [];
|
||||
let text = "";
|
||||
let truncated = false;
|
||||
let totalBytes = 0;
|
||||
|
||||
return {
|
||||
append(chunk: string) {
|
||||
if (!chunk) return;
|
||||
chunks.push(chunk);
|
||||
totalBytes += Buffer.byteLength(chunk, "utf8");
|
||||
|
||||
let currentBytes = chunks.reduce((sum, value) => sum + Buffer.byteLength(value, "utf8"), 0);
|
||||
if (currentBytes <= limit) return;
|
||||
|
||||
const combined = Buffer.from(chunks.join(""), "utf8");
|
||||
const tail = combined.subarray(Math.max(0, combined.length - limit)).toString("utf8");
|
||||
chunks = [tail];
|
||||
truncated = true;
|
||||
currentBytes = Buffer.byteLength(tail, "utf8");
|
||||
if (currentBytes > limit) {
|
||||
chunks = [Buffer.from(tail, "utf8").subarray(Math.max(0, currentBytes - limit)).toString("utf8")];
|
||||
const combined = text + chunk;
|
||||
if (Buffer.byteLength(combined, "utf8") <= limit) {
|
||||
text = combined;
|
||||
return;
|
||||
}
|
||||
|
||||
text = trimToLastBytes(combined, limit);
|
||||
truncated = true;
|
||||
},
|
||||
finish(): ProcessOutputCapture {
|
||||
const text = chunks.join("");
|
||||
if (!truncated) {
|
||||
return {
|
||||
text,
|
||||
|
||||
Reference in New Issue
Block a user