From c91a06232625d1939858fcdfbdc0c9b3a64a8296 Mon Sep 17 00:00:00 2001 From: Dotta <34892728+cryppadotta@users.noreply.github.com> Date: Wed, 20 May 2026 10:37:11 -0500 Subject: [PATCH] [codex] Runtime control-plane fixes (#6380) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Thinking Path > - Paperclip orchestrates AI agents through a server-side control plane > - That control plane depends on reliable issue state transitions, plugin lifecycle behavior, import limits, and startup/shutdown handling > - Several small runtime fixes had accumulated on the working branch and were mixed with larger feature work > - Keeping them separate makes the correctness fixes reviewable and mergeable without waiting for cloud-sync UI work > - This pull request groups the server/runtime control-plane fixes into one standalone branch > - The benefit is a tighter, safer runtime baseline for retries, imports, plugin migrations, feedback flushing, and trusted cloud import handling ## What Changed - Fixed updated issue list pagination sorting and scheduled retry comment handling. - Re-applied pending plugin migrations during hot reload and fixed plugin-schema worktree seed restore. - Hardened public tenant DB startup, portable import body limits, trusted cloud import errors, and trusted cloud tenant import mutation access. - Expired stale request confirmations after user comments. - Added feedback export shutdown hardening so database-unavailable flush loops stop cleanly. - Guarded plugin worker `error` event emission when no listener is registered. ## Verification - `pnpm install --frozen-lockfile --ignore-scripts` - `pnpm --filter @paperclipai/plugin-sdk build` - `npm run install --prefix node_modules/.pnpm/sqlite3@5.1.7/node_modules/sqlite3` - `pnpm exec vitest run server/src/__tests__/issues-service.test.ts server/src/__tests__/plugin-lifecycle-restart.test.ts server/src/__tests__/server-startup-feedback-export.test.ts server/src/__tests__/issue-comment-reopen-routes.test.ts server/src/__tests__/issue-thread-interactions-service.test.ts server/src/__tests__/issue-thread-interaction-routes.test.ts server/src/__tests__/body-limits.test.ts server/src/__tests__/feedback-flush-controller.test.ts server/src/__tests__/error-handler.test.ts server/src/__tests__/board-mutation-guard.test.ts packages/db/src/backup-lib.test.ts` initially exposed local setup issues and two 5s test timeouts. - Rerun after local prereq build: `pnpm exec vitest run --testTimeout 15000 server/src/__tests__/issue-comment-reopen-routes.test.ts server/src/__tests__/issue-thread-interaction-routes.test.ts server/src/__tests__/feedback-flush-controller.test.ts server/src/__tests__/server-startup-feedback-export.test.ts` passed. - Some embedded Postgres-backed tests skipped on this host because local Postgres init was unavailable. ## Risks - Runtime-touching branch: startup/shutdown and issue interaction behavior should be reviewed carefully. - The feedback export change disables repeated flush attempts only for database connection-refused failures; other upload failures still log normally. - The plugin worker error guard avoids process crashes from unhandled EventEmitter errors but may hide errors from code paths that expected an emitted listener. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex, GPT-5-based coding agent with local shell/git/tool use. Exact hosted model ID and context-window size are not exposed by the local Paperclip adapter runtime. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip --- packages/db/src/backup-lib.test.ts | 101 ++++++++ packages/db/src/backup-lib.ts | 136 ++++++----- server/src/__tests__/body-limits.test.ts | 19 ++ server/src/__tests__/error-handler.test.ts | 25 ++ .../feedback-flush-controller.test.ts | 24 ++ .../issue-comment-reopen-routes.test.ts | 215 +++++++++++++++++ .../__tests__/issue-recovery-actions.test.ts | 5 +- .../issue-thread-interaction-routes.test.ts | 32 +++ .../issue-thread-interactions-service.test.ts | 217 +++++++++++++++--- ...issue-update-comment-wakeup-routes.test.ts | 2 + server/src/__tests__/issues-service.test.ts | 40 ++++ .../plugin-lifecycle-restart.test.ts | 129 +++++++++++ .../server-startup-feedback-export.test.ts | 29 +++ server/src/app.ts | 67 ++++-- server/src/http/body-limits.ts | 3 + server/src/index.ts | 29 +++ server/src/middleware/error-handler.ts | 12 +- server/src/routes/companies.ts | 3 +- server/src/routes/company-import-paths.ts | 2 + server/src/routes/issues.ts | 172 +++++++++++++- .../src/services/issue-thread-interactions.ts | 111 ++++++++- server/src/services/issues.ts | 56 ++++- server/src/services/plugin-lifecycle.ts | 44 +++- server/src/services/plugin-worker-manager.ts | 4 +- ui/src/api/issues.test.ts | 12 + ui/src/api/issues.ts | 4 + 26 files changed, 1363 insertions(+), 130 deletions(-) create mode 100644 server/src/__tests__/body-limits.test.ts create mode 100644 server/src/__tests__/feedback-flush-controller.test.ts create mode 100644 server/src/__tests__/plugin-lifecycle-restart.test.ts create mode 100644 server/src/http/body-limits.ts create mode 100644 server/src/routes/company-import-paths.ts diff --git a/packages/db/src/backup-lib.test.ts b/packages/db/src/backup-lib.test.ts index 8a83a8f4..5cabb4d0 100644 --- a/packages/db/src/backup-lib.test.ts +++ b/packages/db/src/backup-lib.test.ts @@ -309,6 +309,107 @@ describeEmbeddedPostgres("runDatabaseBackup", () => { 60_000, ); + it( + "preserves composite foreign key column order without duplicate referenced columns", + async () => { + const sourceConnectionString = await createTempDatabase(); + const restoreConnectionString = await createSiblingDatabase( + sourceConnectionString, + "paperclip_composite_fk_restore_target", + ); + const backupDir = createTempDir("paperclip-db-composite-fk-backup-"); + const sourceSql = postgres(sourceConnectionString, { max: 1, onnotice: () => {} }); + const restoreSql = postgres(restoreConnectionString, { max: 1, onnotice: () => {} }); + + try { + await sourceSql.unsafe(` + CREATE SCHEMA "plugin_composite_fk"; + CREATE TABLE "plugin_composite_fk"."content_cases" ( + "id" uuid PRIMARY KEY, + "company_id" uuid NOT NULL, + "title" text NOT NULL, + CONSTRAINT "content_cases_company_case_unique" UNIQUE ("company_id", "id") + ); + CREATE TABLE "plugin_composite_fk"."content_case_signals" ( + "company_id" uuid NOT NULL, + "case_id" uuid NOT NULL, + "signal" text NOT NULL, + "scopes" text[] NOT NULL, + "warnings" jsonb DEFAULT '[]'::jsonb NOT NULL, + CONSTRAINT "content_case_signals_company_case" + FOREIGN KEY ("company_id", "case_id") + REFERENCES "plugin_composite_fk"."content_cases" ("company_id", "id") + ON DELETE CASCADE + ); + INSERT INTO "plugin_composite_fk"."content_cases" ("company_id", "id", "title") + VALUES ( + '11111111-1111-4111-8111-111111111111', + '22222222-2222-4222-8222-222222222222', + 'case' + ); + INSERT INTO "plugin_composite_fk"."content_case_signals" ("company_id", "case_id", "signal", "scopes", "warnings") + VALUES ( + '11111111-1111-4111-8111-111111111111', + '22222222-2222-4222-8222-222222222222', + 'signal', + ARRAY['upstream_import:preview', 'scope with space', 'quoted "scope"', 'NULL', 'null'], + jsonb_build_array('json warning', jsonb_build_object('code', 'quoted "value"')) + ); + `); + + const result = await runDatabaseBackup({ + connectionString: sourceConnectionString, + backupDir, + retention: { dailyDays: 7, weeklyWeeks: 4, monthlyMonths: 1 }, + filenamePrefix: "paperclip-composite-fk-test", + backupEngine: "javascript", + }); + + await runDatabaseRestore({ + connectionString: restoreConnectionString, + backupFile: result.backupFile, + }); + + const rows = await restoreSql.unsafe<{ + signal: string; + title: string; + scopes: string[]; + warnings: Array; + }[]>(` + SELECT s."signal", c."title", s."scopes", s."warnings" + FROM "plugin_composite_fk"."content_case_signals" s + JOIN "plugin_composite_fk"."content_cases" c + ON c."company_id" = s."company_id" + AND c."id" = s."case_id" + `); + expect(rows).toEqual([ + { + signal: "signal", + title: "case", + scopes: ["upstream_import:preview", "scope with space", 'quoted "scope"', "NULL", "null"], + warnings: ["json warning", { code: 'quoted "value"' }], + }, + ]); + + await expect( + restoreSql.unsafe(` + INSERT INTO "plugin_composite_fk"."content_case_signals" ("company_id", "case_id", "signal", "scopes") + VALUES ( + '11111111-1111-4111-8111-111111111111', + '33333333-3333-4333-8333-333333333333', + 'orphan', + ARRAY[]::text[] + ) + `), + ).rejects.toThrow(); + } finally { + await sourceSql.end(); + await restoreSql.end(); + } + }, + 60_000, + ); + it( "restores legacy public-only backups without migration history", async () => { diff --git a/packages/db/src/backup-lib.ts b/packages/db/src/backup-lib.ts index 2ae92517..02dc168b 100644 --- a/packages/db/src/backup-lib.ts +++ b/packages/db/src/backup-lib.ts @@ -249,12 +249,39 @@ function hasBackupTransforms(opts: RunDatabaseBackupOptions): boolean { Object.keys(opts.nullifyColumns ?? {}).length > 0; } -function formatSqlValue(rawValue: unknown, columnName: string | undefined, nullifiedColumns: Set): string { +function formatPostgresArrayElement(value: unknown): string { + if (value === null || value === undefined) return "NULL"; + if (Array.isArray(value)) return formatPostgresArrayLiteral(value); + const raw = value instanceof Date + ? value.toISOString() + : typeof value === "object" + ? JSON.stringify(value) + : String(value); + if (raw.length === 0 || /^null$/i.test(raw) || /[{}\s,"\\]/.test(raw)) { + return `"${raw.replaceAll("\\", "\\\\").replaceAll('"', '\\"')}"`; + } + return raw; +} + +function formatPostgresArrayLiteral(value: unknown[]): string { + return `{${value.map(formatPostgresArrayElement).join(",")}}`; +} + +function formatSqlValue( + rawValue: unknown, + columnName: string | undefined, + nullifiedColumns: Set, + dataType?: string, +): string { const val = columnName && nullifiedColumns.has(columnName) ? null : rawValue; if (val === null || val === undefined) return "NULL"; + if (dataType === "json" || dataType === "jsonb") { + return formatSqlLiteral(JSON.stringify(val)); + } if (typeof val === "boolean") return val ? "true" : "false"; if (typeof val === "number") return String(val); if (val instanceof Date) return formatSqlLiteral(val.toISOString()); + if (Array.isArray(val)) return formatSqlLiteral(formatPostgresArrayLiteral(val)); if (typeof val === "object") return formatSqlLiteral(JSON.stringify(val)); return formatSqlLiteral(String(val)); } @@ -745,58 +772,7 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise emit(""); } - // Foreign keys (after all tables created) - const allForeignKeys = await sql<{ - constraint_name: string; - source_schema: string; - source_table: string; - source_columns: string[]; - target_schema: string; - target_table: string; - target_columns: string[]; - update_rule: string; - delete_rule: string; - }[]>` - SELECT - c.conname AS constraint_name, - srcn.nspname AS source_schema, - src.relname AS source_table, - array_agg(sa.attname ORDER BY array_position(c.conkey, sa.attnum)) AS source_columns, - tgtn.nspname AS target_schema, - tgt.relname AS target_table, - array_agg(ta.attname ORDER BY array_position(c.confkey, ta.attnum)) AS target_columns, - CASE c.confupdtype WHEN 'a' THEN 'NO ACTION' WHEN 'r' THEN 'RESTRICT' WHEN 'c' THEN 'CASCADE' WHEN 'n' THEN 'SET NULL' WHEN 'd' THEN 'SET DEFAULT' END AS update_rule, - CASE c.confdeltype WHEN 'a' THEN 'NO ACTION' WHEN 'r' THEN 'RESTRICT' WHEN 'c' THEN 'CASCADE' WHEN 'n' THEN 'SET NULL' WHEN 'd' THEN 'SET DEFAULT' END AS delete_rule - FROM pg_constraint c - JOIN pg_class src ON src.oid = c.conrelid - JOIN pg_namespace srcn ON srcn.oid = src.relnamespace - JOIN pg_class tgt ON tgt.oid = c.confrelid - JOIN pg_namespace tgtn ON tgtn.oid = tgt.relnamespace - JOIN pg_attribute sa ON sa.attrelid = src.oid AND sa.attnum = ANY(c.conkey) - JOIN pg_attribute ta ON ta.attrelid = tgt.oid AND ta.attnum = ANY(c.confkey) - WHERE c.contype = 'f' - AND ${sql.unsafe(nonSystemSchemaPredicate("srcn.nspname"))} - GROUP BY c.conname, srcn.nspname, src.relname, tgtn.nspname, tgt.relname, c.confupdtype, c.confdeltype - ORDER BY srcn.nspname, src.relname, c.conname - `; - const fks = allForeignKeys.filter( - (fk) => includedTableNames.has(tableKey(fk.source_schema, fk.source_table)) - && includedTableNames.has(tableKey(fk.target_schema, fk.target_table)), - ); - - if (fks.length > 0) { - emit("-- Foreign keys"); - for (const fk of fks) { - const srcCols = fk.source_columns.map((c) => `"${c}"`).join(", "); - const tgtCols = fk.target_columns.map((c) => `"${c}"`).join(", "); - emitStatement( - `ALTER TABLE ${quoteQualifiedName(fk.source_schema, fk.source_table)} ADD CONSTRAINT "${fk.constraint_name}" FOREIGN KEY (${srcCols}) REFERENCES ${quoteQualifiedName(fk.target_schema, fk.target_table)} (${tgtCols}) ON UPDATE ${fk.update_rule} ON DELETE ${fk.delete_rule};`, - ); - } - emit(""); - } - - // Unique constraints + // Unique constraints must exist before foreign keys that reference them. const allUniqueConstraints = await sql<{ constraint_name: string; schema_name: string; @@ -827,6 +803,58 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise emit(""); } + // Foreign keys (after all tables and referenced unique constraints are created) + const allForeignKeys = await sql<{ + constraint_name: string; + source_schema: string; + source_table: string; + source_columns: string[]; + target_schema: string; + target_table: string; + target_columns: string[]; + update_rule: string; + delete_rule: string; + }[]>` + SELECT + c.conname AS constraint_name, + srcn.nspname AS source_schema, + src.relname AS source_table, + array_agg(sa.attname ORDER BY key_columns.ordinal_position) AS source_columns, + tgtn.nspname AS target_schema, + tgt.relname AS target_table, + array_agg(ta.attname ORDER BY key_columns.ordinal_position) AS target_columns, + CASE c.confupdtype WHEN 'a' THEN 'NO ACTION' WHEN 'r' THEN 'RESTRICT' WHEN 'c' THEN 'CASCADE' WHEN 'n' THEN 'SET NULL' WHEN 'd' THEN 'SET DEFAULT' END AS update_rule, + CASE c.confdeltype WHEN 'a' THEN 'NO ACTION' WHEN 'r' THEN 'RESTRICT' WHEN 'c' THEN 'CASCADE' WHEN 'n' THEN 'SET NULL' WHEN 'd' THEN 'SET DEFAULT' END AS delete_rule + FROM pg_constraint c + JOIN pg_class src ON src.oid = c.conrelid + JOIN pg_namespace srcn ON srcn.oid = src.relnamespace + JOIN pg_class tgt ON tgt.oid = c.confrelid + JOIN pg_namespace tgtn ON tgtn.oid = tgt.relnamespace + JOIN LATERAL unnest(c.conkey, c.confkey) WITH ORDINALITY AS key_columns(source_attnum, target_attnum, ordinal_position) ON true + JOIN pg_attribute sa ON sa.attrelid = src.oid AND sa.attnum = key_columns.source_attnum + JOIN pg_attribute ta ON ta.attrelid = tgt.oid AND ta.attnum = key_columns.target_attnum + WHERE c.contype = 'f' + AND ${sql.unsafe(nonSystemSchemaPredicate("srcn.nspname"))} + GROUP BY c.conname, srcn.nspname, src.relname, tgtn.nspname, tgt.relname, c.confupdtype, c.confdeltype + ORDER BY srcn.nspname, src.relname, c.conname + `; + const fks = allForeignKeys.filter( + (fk) => includedTableNames.has(tableKey(fk.source_schema, fk.source_table)) + && includedTableNames.has(tableKey(fk.target_schema, fk.target_table)), + ); + + if (fks.length > 0) { + emit("-- Foreign keys"); + for (const fk of fks) { + const srcCols = fk.source_columns.map((c) => `"${c}"`).join(", "); + const tgtCols = fk.target_columns.map((c) => `"${c}"`).join(", "); + emitStatement( + `ALTER TABLE ${quoteQualifiedName(fk.source_schema, fk.source_table)} ADD CONSTRAINT "${fk.constraint_name}" FOREIGN KEY (${srcCols}) REFERENCES ${quoteQualifiedName(fk.target_schema, fk.target_table)} (${tgtCols}) ON UPDATE ${fk.update_rule} ON DELETE ${fk.delete_rule};`, + ); + } + emit(""); + } + // Indexes (non-primary, non-unique-constraint) const allIndexes = await sql<{ schema_name: string; tablename: string; indexdef: string }[]>` SELECT schemaname AS schema_name, tablename, indexdef @@ -895,7 +923,7 @@ export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise for await (const rows of rowCursor) { for (const row of rows) { const values = row.map((rawValue, index) => - formatSqlValue(rawValue, cols[index]?.column_name, nullifiedColumns), + formatSqlValue(rawValue, cols[index]?.column_name, nullifiedColumns, cols[index]?.data_type), ); emitStatement(`INSERT INTO ${qualifiedTableName} (${colNames}) VALUES (${values.join(", ")});`); } diff --git a/server/src/__tests__/body-limits.test.ts b/server/src/__tests__/body-limits.test.ts new file mode 100644 index 00000000..8164e32b --- /dev/null +++ b/server/src/__tests__/body-limits.test.ts @@ -0,0 +1,19 @@ +import { describe, expect, it } from "vitest"; + +import { + DEFAULT_JSON_BODY_LIMIT, + PORTABLE_JSON_BODY_LIMIT, + PORTABLE_JSON_BODY_LIMIT_BYTES, +} from "../http/body-limits.js"; + +describe("HTTP body limits", () => { + it("keeps the global JSON parser at the established ceiling", () => { + expect(DEFAULT_JSON_BODY_LIMIT).toBe("10mb"); + }); + + it("allows PAP-scale portable import JSON payloads", () => { + expect(PORTABLE_JSON_BODY_LIMIT).toBe("64mb"); + expect(PORTABLE_JSON_BODY_LIMIT_BYTES).toBe(64 * 1024 * 1024); + expect(PORTABLE_JSON_BODY_LIMIT_BYTES).toBeGreaterThan(10 * 1024 * 1024); + }); +}); diff --git a/server/src/__tests__/error-handler.test.ts b/server/src/__tests__/error-handler.test.ts index d01a8c3c..483403e0 100644 --- a/server/src/__tests__/error-handler.test.ts +++ b/server/src/__tests__/error-handler.test.ts @@ -37,6 +37,31 @@ describe("errorHandler", () => { expect(res.__errorContext?.error?.message).toBe("boom"); }); + it("exposes raw 500 messages for trusted Cloud tenant imports", () => { + const req = { + ...makeReq(), + method: "POST", + originalUrl: "/api/companies/import", + actor: { + type: "board", + userId: "cloud-user", + source: "cloud_tenant", + }, + } as unknown as Request; + const res = makeRes() as any; + const next = vi.fn() as unknown as NextFunction; + const err = new Error("portable file references missing upload id"); + + errorHandler(err, req, res, next); + + expect(res.status).toHaveBeenCalledWith(500); + expect(res.json).toHaveBeenCalledWith({ + error: "Internal server error", + message: "portable file references missing upload id", + }); + expect(res.err).toBe(err); + }); + it("attaches HttpError instances for 500 responses", () => { const req = makeReq(); const res = makeRes() as any; diff --git a/server/src/__tests__/feedback-flush-controller.test.ts b/server/src/__tests__/feedback-flush-controller.test.ts new file mode 100644 index 00000000..b8f12908 --- /dev/null +++ b/server/src/__tests__/feedback-flush-controller.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, it } from "vitest"; +import { isDatabaseConnectionUnavailableError } from "../app.js"; + +describe("feedback export flush error classification", () => { + it("recognizes wrapped database connection-refused errors", () => { + const error = new Error("Failed query: select ...: connect ECONNREFUSED 127.0.0.1:54329"); + (error as { cause?: unknown }).cause = Object.assign( + new Error("connect ECONNREFUSED 127.0.0.1:54329"), + { code: "ECONNREFUSED" }, + ); + + expect(isDatabaseConnectionUnavailableError(error)).toBe(true); + }); + + it("does not classify ordinary feedback upload failures as database outages", () => { + expect(isDatabaseConnectionUnavailableError(new Error("upstream returned 500"))).toBe(false); + }); + + it("does not trust unrelated error messages that mention ECONNREFUSED", () => { + expect(isDatabaseConnectionUnavailableError( + new Error("feedback upload payload mentioned ECONNREFUSED in user content"), + )).toBe(false); + }); +}); diff --git a/server/src/__tests__/issue-comment-reopen-routes.test.ts b/server/src/__tests__/issue-comment-reopen-routes.test.ts index 8a55d9db..ecb3db9b 100644 --- a/server/src/__tests__/issue-comment-reopen-routes.test.ts +++ b/server/src/__tests__/issue-comment-reopen-routes.test.ts @@ -8,6 +8,7 @@ const mockIssueService = vi.hoisted(() => ({ update: vi.fn(), addComment: vi.fn(), getDependencyReadiness: vi.fn(), + getCurrentScheduledRetry: vi.fn(), findMentionedAgents: vi.fn(), listWakeableBlockedDependents: vi.fn(), getWakeableParentAfterChildCompletion: vi.fn(), @@ -223,6 +224,7 @@ describe.sequential("issue comment reopen routes", () => { mockIssueService.update.mockReset(); mockIssueService.addComment.mockReset(); mockIssueService.getDependencyReadiness.mockReset(); + mockIssueService.getCurrentScheduledRetry.mockReset(); mockIssueService.findMentionedAgents.mockReset(); mockIssueService.listWakeableBlockedDependents.mockReset(); mockIssueService.getWakeableParentAfterChildCompletion.mockReset(); @@ -300,6 +302,7 @@ describe.sequential("issue comment reopen routes", () => { allBlockersDone: true, isDependencyReady: true, }); + mockIssueService.getCurrentScheduledRetry.mockResolvedValue(null); mockIssueService.listWakeableBlockedDependents.mockResolvedValue([]); mockIssueService.getWakeableParentAfterChildCompletion.mockResolvedValue(null); mockIssueService.assertCheckoutOwner.mockResolvedValue({ adoptedFromRunId: null }); @@ -564,6 +567,128 @@ describe.sequential("issue comment reopen routes", () => { )); }); + it("moves in-progress issues with a scheduled retry back to todo via POST human comments", async () => { + const issue = { + ...makeIssue("in_progress"), + executionRunId: "retry-run-1", + }; + mockIssueService.getById.mockResolvedValue(issue); + mockIssueService.getCurrentScheduledRetry.mockResolvedValue({ + runId: "retry-run-1", + status: "scheduled_retry", + agentId: "22222222-2222-4222-8222-222222222222", + agentName: "CodexCoder", + retryOfRunId: "source-run-1", + scheduledRetryAt: new Date("2026-05-18T14:00:00.000Z"), + scheduledRetryAttempt: 1, + scheduledRetryReason: "transient_failure", + error: null, + errorCode: null, + }); + mockIssueService.update.mockImplementation(async (_id: string, patch: Record) => ({ + ...issue, + ...patch, + updatedAt: new Date(), + })); + mockHeartbeatService.cancelRun.mockResolvedValue({ + id: "retry-run-1", + companyId: "company-1", + agentId: "22222222-2222-4222-8222-222222222222", + status: "cancelled", + }); + + const res = await request(await installActor(createApp())) + .post("/api/issues/11111111-1111-4111-8111-111111111111/comments") + .send({ body: "I added the missing detail; please continue." }); + + expect(res.status).toBe(201); + expect(mockIssueService.update).toHaveBeenCalledWith( + "11111111-1111-4111-8111-111111111111", + { status: "todo" }, + ); + expect(mockHeartbeatService.cancelRun).toHaveBeenCalledWith("retry-run-1"); + expect(mockLogActivity).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + action: "issue.updated", + details: expect.objectContaining({ + status: "todo", + scheduledRetrySupersededByComment: true, + scheduledRetryRunId: "retry-run-1", + cancelledScheduledRetryRunId: "retry-run-1", + }), + }), + ); + await waitForWakeup(() => expect(mockHeartbeatService.wakeup).toHaveBeenCalledWith( + "22222222-2222-4222-8222-222222222222", + expect.objectContaining({ + reason: "issue_commented", + payload: expect.objectContaining({ + commentId: "comment-1", + mutation: "comment", + }), + contextSnapshot: expect.objectContaining({ + wakeReason: "issue_commented", + source: "issue.comment", + }), + }), + )); + }); + + it("does not move scheduled-retry issues to todo when POST comment retry cancellation fails", async () => { + const issue = { + ...makeIssue("in_progress"), + executionRunId: "retry-run-1", + }; + mockIssueService.getById.mockResolvedValue(issue); + mockIssueService.getCurrentScheduledRetry.mockResolvedValue({ + runId: "retry-run-1", + status: "scheduled_retry", + agentId: "22222222-2222-4222-8222-222222222222", + agentName: "CodexCoder", + retryOfRunId: "source-run-1", + scheduledRetryAt: new Date("2026-05-18T14:00:00.000Z"), + scheduledRetryAttempt: 1, + scheduledRetryReason: "transient_failure", + error: null, + errorCode: null, + }); + mockHeartbeatService.cancelRun.mockRejectedValue(new Error("cancel failed")); + + const res = await request(await installActor(createApp())) + .post("/api/issues/11111111-1111-4111-8111-111111111111/comments") + .send({ body: "I added the missing detail; please continue." }); + + expect(res.status).toBe(500); + expect(mockHeartbeatService.cancelRun).toHaveBeenCalledWith("retry-run-1"); + expect(mockIssueService.update).not.toHaveBeenCalled(); + expect(mockIssueService.addComment).not.toHaveBeenCalled(); + expect(mockLogActivity).not.toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ action: "issue.updated" }), + ); + }); + + it("keeps ordinary in-progress POST human comments in progress when no scheduled retry exists", async () => { + const issue = makeIssue("in_progress"); + mockIssueService.getById.mockResolvedValue(issue); + + const res = await request(await installActor(createApp())) + .post("/api/issues/11111111-1111-4111-8111-111111111111/comments") + .send({ body: "Checking in without retry state." }); + + expect(res.status).toBe(201); + expect(mockIssueService.getCurrentScheduledRetry).toHaveBeenCalledWith("11111111-1111-4111-8111-111111111111"); + expect(mockIssueService.update).not.toHaveBeenCalled(); + expect(mockHeartbeatService.cancelRun).not.toHaveBeenCalled(); + await waitForWakeup(() => expect(mockHeartbeatService.wakeup).toHaveBeenCalledWith( + "22222222-2222-4222-8222-222222222222", + expect.objectContaining({ + reason: "issue_commented", + }), + )); + }); + it("passes validated comment presentation fields to trusted board comment writes", async () => { const app = await installActor(createApp()); mockIssueService.getById.mockResolvedValue(makeIssue("todo")); @@ -727,6 +852,96 @@ describe.sequential("issue comment reopen routes", () => { )); }); + it("moves in-progress issues with a scheduled retry back to todo via the PATCH comment path", async () => { + const issue = { + ...makeIssue("in_progress"), + executionRunId: "retry-run-1", + }; + mockIssueService.getById.mockResolvedValue(issue); + mockIssueService.getCurrentScheduledRetry.mockResolvedValue({ + runId: "retry-run-1", + status: "scheduled_retry", + agentId: "22222222-2222-4222-8222-222222222222", + agentName: "CodexCoder", + retryOfRunId: "source-run-1", + scheduledRetryAt: new Date("2026-05-18T14:00:00.000Z"), + scheduledRetryAttempt: 1, + scheduledRetryReason: "transient_failure", + error: null, + errorCode: null, + }); + mockIssueService.update.mockImplementation(async (_id: string, patch: Record) => ({ + ...issue, + ...patch, + updatedAt: new Date(), + })); + mockHeartbeatService.cancelRun.mockResolvedValue({ + id: "retry-run-1", + companyId: "company-1", + agentId: "22222222-2222-4222-8222-222222222222", + status: "cancelled", + }); + + const res = await request(await installActor(createApp())) + .patch("/api/issues/11111111-1111-4111-8111-111111111111") + .send({ comment: "Retry window is over; please continue." }); + + expect(res.status).toBe(200); + expect(mockIssueService.update).toHaveBeenCalledWith( + "11111111-1111-4111-8111-111111111111", + expect.objectContaining({ + status: "todo", + actorAgentId: null, + actorUserId: "local-board", + }), + ); + expect(mockHeartbeatService.cancelRun).toHaveBeenCalledWith("retry-run-1"); + await waitForWakeup(() => expect(mockHeartbeatService.wakeup).toHaveBeenCalledWith( + "22222222-2222-4222-8222-222222222222", + expect.objectContaining({ + reason: "issue_commented", + payload: expect.objectContaining({ + commentId: "comment-1", + mutation: "comment", + }), + }), + )); + }); + + it("does not move scheduled-retry issues to todo when PATCH comment retry cancellation fails", async () => { + const issue = { + ...makeIssue("in_progress"), + executionRunId: "retry-run-1", + }; + mockIssueService.getById.mockResolvedValue(issue); + mockIssueService.getCurrentScheduledRetry.mockResolvedValue({ + runId: "retry-run-1", + status: "scheduled_retry", + agentId: "22222222-2222-4222-8222-222222222222", + agentName: "CodexCoder", + retryOfRunId: "source-run-1", + scheduledRetryAt: new Date("2026-05-18T14:00:00.000Z"), + scheduledRetryAttempt: 1, + scheduledRetryReason: "transient_failure", + error: null, + errorCode: null, + }); + mockHeartbeatService.cancelRun.mockRejectedValue(new Error("cancel failed")); + + const res = await request(await installActor(createApp())) + .patch("/api/issues/11111111-1111-4111-8111-111111111111") + .send({ comment: "Retry window is over; please continue." }); + + expect(res.status).toBe(500); + expect(mockHeartbeatService.cancelRun).toHaveBeenCalledWith("retry-run-1"); + expect(mockIssueService.update).not.toHaveBeenCalled(); + expect(mockIssueService.addComment).not.toHaveBeenCalled(); + expect(mockLogActivity).not.toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ action: "issue.updated" }), + ); + }); + it("rejects non-assignee agent PATCH comments on closed issues", async () => { mockIssueService.getById.mockResolvedValue(makeIssue("done")); mockIssueService.addComment.mockResolvedValue({ diff --git a/server/src/__tests__/issue-recovery-actions.test.ts b/server/src/__tests__/issue-recovery-actions.test.ts index a7e6de70..dd15e7c0 100644 --- a/server/src/__tests__/issue-recovery-actions.test.ts +++ b/server/src/__tests__/issue-recovery-actions.test.ts @@ -573,7 +573,10 @@ describeEmbeddedPostgres("issue recovery actions", () => { it("resolves an active recovery action by returning the source issue to todo", async () => { const { companyId, managerId, sourceIssueId } = await seedCompany(); - await db.update(issues).set({ status: "blocked" }).where(eq(issues.id, sourceIssueId)); + await db + .update(issues) + .set({ status: "blocked", assigneeAgentId: null, assigneeUserId: "board-user" }) + .where(eq(issues.id, sourceIssueId)); const recoveryActionSvc = issueRecoveryActionService(db); const action = await recoveryActionSvc.upsertSourceScoped({ companyId, diff --git a/server/src/__tests__/issue-thread-interaction-routes.test.ts b/server/src/__tests__/issue-thread-interaction-routes.test.ts index ae5df023..818c5d7d 100644 --- a/server/src/__tests__/issue-thread-interaction-routes.test.ts +++ b/server/src/__tests__/issue-thread-interaction-routes.test.ts @@ -16,6 +16,7 @@ const mockInteractionService = vi.hoisted(() => ({ acceptSuggestedTasks: vi.fn(), rejectInteraction: vi.fn(), rejectSuggestedTasks: vi.fn(), + expireRequestConfirmationsSupersededByHistoricalComments: vi.fn(), answerQuestions: vi.fn(), cancelQuestions: vi.fn(), })); @@ -156,6 +157,7 @@ describe.sequential("issue thread interaction routes", () => { vi.clearAllMocks(); mockIssueService.getById.mockResolvedValue(createIssue()); mockInteractionService.listForIssue.mockResolvedValue([]); + mockInteractionService.expireRequestConfirmationsSupersededByHistoricalComments.mockResolvedValue([]); mockInteractionService.create.mockResolvedValue({ id: "interaction-1", companyId: "company-1", @@ -288,6 +290,18 @@ describe.sequential("issue thread interaction routes", () => { }); it("lists and creates board-authored interactions", async () => { + mockInteractionService.expireRequestConfirmationsSupersededByHistoricalComments.mockResolvedValueOnce([ + { + id: "interaction-expired", + kind: "request_confirmation", + status: "expired", + result: { + version: 1, + outcome: "superseded_by_comment", + commentId: "bbbbbbbb-bbbb-4bbb-8bbb-bbbbbbbbbbbb", + }, + }, + ]); mockInteractionService.listForIssue.mockResolvedValue([ { id: "interaction-1", kind: "suggest_tasks", status: "pending" }, ]); @@ -298,6 +312,24 @@ describe.sequential("issue thread interaction routes", () => { expect(listRes.body).toEqual([ { id: "interaction-1", kind: "suggest_tasks", status: "pending" }, ]); + expect(mockInteractionService.expireRequestConfirmationsSupersededByHistoricalComments).toHaveBeenCalledWith( + expect.objectContaining({ id: "aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa" }), + ); + expect(mockLogActivity).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + action: "issue.thread_interaction_expired", + details: expect.objectContaining({ + interactionId: "interaction-expired", + interactionKind: "request_confirmation", + source: "issue.interactions.catchup_superseded_by_comment", + result: expect.objectContaining({ + outcome: "superseded_by_comment", + commentId: "bbbbbbbb-bbbb-4bbb-8bbb-bbbbbbbbbbbb", + }), + }), + }), + ); const createRes = await request(app) .post("/api/issues/aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa/interactions") diff --git a/server/src/__tests__/issue-thread-interactions-service.test.ts b/server/src/__tests__/issue-thread-interactions-service.test.ts index f1f0be38..d26c4fe4 100644 --- a/server/src/__tests__/issue-thread-interactions-service.test.ts +++ b/server/src/__tests__/issue-thread-interactions-service.test.ts @@ -9,6 +9,7 @@ import { documents, goals, heartbeatRuns, + issueComments, issueDocuments, instanceSettings, issueRelations, @@ -41,6 +42,7 @@ describeEmbeddedPostgres("issueThreadInteractionService", () => { afterEach(async () => { await db.delete(issueThreadInteractions); + await db.delete(issueComments); await db.delete(issueDocuments); await db.delete(documentRevisions); await db.delete(documents); @@ -57,6 +59,37 @@ describeEmbeddedPostgres("issueThreadInteractionService", () => { await tempDb?.cleanup(); }); + async function seedConfirmationIssue(title = "Comment supersede") { + const companyId = randomUUID(); + const goalId = randomUUID(); + const issueId = randomUUID(); + + await db.insert(companies).values({ + id: companyId, + name: "Paperclip", + issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, + requireBoardApprovalForNewAgents: false, + }); + await instanceSettingsService(db).updateExperimental({ enableIsolatedWorkspaces: false }); + await db.insert(goals).values({ + id: goalId, + companyId, + title, + level: "task", + status: "active", + }); + await db.insert(issues).values({ + id: issueId, + companyId, + goalId, + title: "Parent issue", + status: "in_progress", + priority: "medium", + }); + + return { companyId, goalId, issueId }; + } + it("accepts suggested tasks by creating a rooted issue tree under the current issue", async () => { const companyId = randomUUID(); const goalId = randomUUID(); @@ -783,35 +816,10 @@ describeEmbeddedPostgres("issueThreadInteractionService", () => { }); }); - it("expires supersedable request confirmations when a user comments", async () => { - const companyId = randomUUID(); - const goalId = randomUUID(); - const issueId = randomUUID(); + it("expires request confirmations opted into user-comment supersede after creation", async () => { + const { companyId, issueId } = await seedConfirmationIssue(); const commentId = randomUUID(); - await db.insert(companies).values({ - id: companyId, - name: "Paperclip", - issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, - requireBoardApprovalForNewAgents: false, - }); - await instanceSettingsService(db).updateExperimental({ enableIsolatedWorkspaces: false }); - await db.insert(goals).values({ - id: goalId, - companyId, - title: "Comment supersede", - level: "task", - status: "active", - }); - await db.insert(issues).values({ - id: issueId, - companyId, - goalId, - title: "Parent issue", - status: "in_progress", - priority: "medium", - }); - const created = await interactionsSvc.create({ id: issueId, companyId, @@ -831,6 +839,7 @@ describeEmbeddedPostgres("issueThreadInteractionService", () => { companyId, }, { id: commentId, + createdAt: new Date(new Date(created.createdAt).getTime() + 1_000), authorUserId: "local-board", }, { userId: "local-board", @@ -849,6 +858,160 @@ describeEmbeddedPostgres("issueThreadInteractionService", () => { }); }); + it("keeps request confirmations pending unless user-comment supersede is explicitly enabled", async () => { + const { companyId, issueId } = await seedConfirmationIssue("Comment supersede opt-out"); + + await interactionsSvc.create({ + id: issueId, + companyId, + }, { + kind: "request_confirmation", + payload: { + version: 1, + prompt: "Proceed with the current draft?", + }, + }, { + userId: "local-board", + }); + + const expired = await interactionsSvc.expireRequestConfirmationsSupersededByComment({ + id: issueId, + companyId, + }, { + id: randomUUID(), + createdAt: new Date(Date.now() + 1_000), + authorUserId: "local-board", + }, { + userId: "local-board", + }); + + expect(expired).toHaveLength(0); + const rows = await db.select().from(issueThreadInteractions); + expect(rows).toHaveLength(1); + expect(rows[0]?.status).toBe("pending"); + }); + + it("does not supersede request confirmations for agent, system, or older user comments", async () => { + const { companyId, issueId } = await seedConfirmationIssue("Comment supersede exclusions"); + + const created = await interactionsSvc.create({ + id: issueId, + companyId, + }, { + kind: "request_confirmation", + payload: { + version: 1, + prompt: "Proceed with the current draft?", + supersedeOnUserComment: true, + }, + }, { + userId: "local-board", + }); + const createdAtMs = new Date(created.createdAt).getTime(); + + await expect(interactionsSvc.expireRequestConfirmationsSupersededByComment({ + id: issueId, + companyId, + }, { + id: randomUUID(), + createdAt: new Date(createdAtMs + 1_000), + authorUserId: null, + }, { + agentId: randomUUID(), + })).resolves.toHaveLength(0); + + await expect(interactionsSvc.expireRequestConfirmationsSupersededByComment({ + id: issueId, + companyId, + }, { + id: randomUUID(), + createdAt: new Date(createdAtMs + 1_000), + authorUserId: null, + }, {})).resolves.toHaveLength(0); + + await expect(interactionsSvc.expireRequestConfirmationsSupersededByComment({ + id: issueId, + companyId, + }, { + id: randomUUID(), + createdAt: new Date(createdAtMs - 1_000), + authorUserId: "local-board", + }, { + userId: "local-board", + })).resolves.toHaveLength(0); + + const rows = await db.select().from(issueThreadInteractions); + expect(rows).toHaveLength(1); + expect(rows[0]?.status).toBe("pending"); + }); + + it("repairs historical request confirmations superseded by later user comments idempotently", async () => { + const { companyId, issueId } = await seedConfirmationIssue("Historical comment supersede"); + const commentId = randomUUID(); + const createdAt = new Date("2026-05-18T12:00:00.000Z"); + + const created = await interactionsSvc.create({ + id: issueId, + companyId, + }, { + kind: "request_confirmation", + payload: { + version: 1, + prompt: "Proceed with the current draft?", + supersedeOnUserComment: true, + }, + }, { + userId: "local-board", + }); + await db + .update(issueThreadInteractions) + .set({ createdAt, updatedAt: createdAt }) + .where(eq(issueThreadInteractions.id, created.id)); + + await db.insert(issueComments).values({ + id: randomUUID(), + companyId, + issueId, + authorType: "system", + body: "System-side progress note.", + createdAt: new Date("2026-05-18T12:00:30.000Z"), + updatedAt: new Date("2026-05-18T12:00:30.000Z"), + }); + await db.insert(issueComments).values({ + id: commentId, + companyId, + issueId, + authorUserId: "local-board", + authorType: "user", + body: "Please revise this first.", + createdAt: new Date("2026-05-18T12:01:00.000Z"), + updatedAt: new Date("2026-05-18T12:01:00.000Z"), + }); + + const expired = await interactionsSvc.expireRequestConfirmationsSupersededByHistoricalComments({ + id: issueId, + companyId, + }); + + expect(expired).toHaveLength(1); + expect(expired[0]).toMatchObject({ + id: created.id, + status: "expired", + result: { + version: 1, + outcome: "superseded_by_comment", + commentId, + }, + resolvedByAgentId: null, + resolvedByUserId: "local-board", + }); + + await expect(interactionsSvc.expireRequestConfirmationsSupersededByHistoricalComments({ + id: issueId, + companyId, + })).resolves.toEqual([]); + }); + it("expires request confirmations when the watched issue document revision changes", async () => { const companyId = randomUUID(); const goalId = randomUUID(); diff --git a/server/src/__tests__/issue-update-comment-wakeup-routes.test.ts b/server/src/__tests__/issue-update-comment-wakeup-routes.test.ts index b0f04f97..167037fb 100644 --- a/server/src/__tests__/issue-update-comment-wakeup-routes.test.ts +++ b/server/src/__tests__/issue-update-comment-wakeup-routes.test.ts @@ -12,6 +12,7 @@ const mockIssueService = vi.hoisted(() => ({ getRelationSummaries: vi.fn(), listWakeableBlockedDependents: vi.fn(), getWakeableParentAfterChildCompletion: vi.fn(), + getCurrentScheduledRetry: vi.fn(), })); const mockHeartbeatService = vi.hoisted(() => ({ @@ -205,6 +206,7 @@ describe("issue update comment wakeups", () => { mockIssueService.getRelationSummaries.mockResolvedValue({ blockedBy: [], blocks: [] }); mockIssueService.listWakeableBlockedDependents.mockResolvedValue([]); mockIssueService.getWakeableParentAfterChildCompletion.mockResolvedValue(null); + mockIssueService.getCurrentScheduledRetry.mockResolvedValue(null); }); it("includes the new comment in assignment wakes from issue updates", async () => { diff --git a/server/src/__tests__/issues-service.test.ts b/server/src/__tests__/issues-service.test.ts index 22a5b3ce..ef328416 100644 --- a/server/src/__tests__/issues-service.test.ts +++ b/server/src/__tests__/issues-service.test.ts @@ -380,6 +380,46 @@ describeEmbeddedPostgres("issueService.list participantAgentId", () => { expect(result.map((issue) => issue.id)).toEqual([titleMatchId, descriptionMatchId]); }); + it("can page issues by most recently updated before priority", async () => { + const companyId = randomUUID(); + const oldCriticalIssueId = randomUUID(); + const recentMediumIssueId = randomUUID(); + + await db.insert(companies).values({ + id: companyId, + name: "Paperclip", + issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, + requireBoardApprovalForNewAgents: false, + }); + + await db.insert(issues).values([ + { + id: oldCriticalIssueId, + companyId, + title: "Old critical issue", + status: "todo", + priority: "critical", + updatedAt: new Date("2026-05-01T10:00:00.000Z"), + }, + { + id: recentMediumIssueId, + companyId, + title: "Recent medium issue", + status: "todo", + priority: "medium", + updatedAt: new Date("2026-05-17T21:12:29.993Z"), + }, + ]); + + const result = await svc.list(companyId, { + limit: 1, + sortField: "updated", + sortDir: "desc", + }); + + expect(result.map((issue) => issue.id)).toEqual([recentMediumIssueId]); + }); + it("ranks comment matches ahead of description-only matches", async () => { const companyId = randomUUID(); const commentMatchId = randomUUID(); diff --git a/server/src/__tests__/plugin-lifecycle-restart.test.ts b/server/src/__tests__/plugin-lifecycle-restart.test.ts new file mode 100644 index 00000000..04026e6e --- /dev/null +++ b/server/src/__tests__/plugin-lifecycle-restart.test.ts @@ -0,0 +1,129 @@ +/** + * Regression test for PAP-9585. + * + * `restartWorker` is called by the dev file-watcher whenever a local-path + * plugin's source files change. Before PAP-9585 it only bounced the worker + * subprocess, which left newly added `migrations/*.sql` files unapplied — the + * plugin schema would silently drift out of sync with worker code. + * + * The fix is for `restartWorker` to do a full deactivate + reactivate cycle + * via the plugin loader, which re-reads the manifest from disk and runs + * `applyMigrations` (idempotently) before starting the new worker. + */ +import { describe, expect, it, vi } from "vitest"; + +const pluginRecord = { + id: "plugin-1", + pluginKey: "example.plugin", + status: "ready", + manifestJson: { id: "example.plugin", capabilities: [] }, + packageName: "@example/plugin", + version: "1.0.0", + packagePath: "/tmp/example-plugin", +}; + +const mockRegistry = vi.hoisted(() => ({ + getById: vi.fn(), + getByKey: vi.fn(), + update: vi.fn(), + updateStatus: vi.fn(), + upsertConfig: vi.fn(), + getConfig: vi.fn(), + list: vi.fn(), + delete: vi.fn(), +})); + +vi.mock("../services/plugin-registry.js", () => ({ + pluginRegistryService: () => mockRegistry, +})); + +import { pluginLifecycleManager } from "../services/plugin-lifecycle.js"; +import type { PluginLoader } from "../services/plugin-loader.js"; +import type { PluginWorkerManager } from "../services/plugin-worker-manager.js"; + +function makeWorkerManagerStub() { + const handle = { + restart: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + }; + return { + handle, + workerManager: { + getWorker: vi.fn().mockReturnValue(handle), + isRunning: vi.fn().mockReturnValue(true), + startWorker: vi.fn().mockResolvedValue(undefined), + stopWorker: vi.fn().mockResolvedValue(undefined), + restartWorker: vi.fn().mockResolvedValue(undefined), + } as unknown as PluginWorkerManager, + }; +} + +describe("pluginLifecycleManager.restartWorker", () => { + it("does a full deactivate+reactivate cycle when the loader has runtime services", async () => { + mockRegistry.getById.mockResolvedValue(pluginRecord); + mockRegistry.updateStatus.mockResolvedValue(pluginRecord); + + const { handle, workerManager } = makeWorkerManagerStub(); + + const loader: Partial = { + hasRuntimeServices: vi.fn().mockReturnValue(true) as PluginLoader["hasRuntimeServices"], + loadSingle: vi.fn().mockResolvedValue({ + success: true, + plugin: pluginRecord, + registered: { worker: true, eventSubscriptions: 0, jobs: 0, webhooks: 0, tools: 0 }, + }) as PluginLoader["loadSingle"], + unloadSingle: vi.fn().mockResolvedValue(undefined) as PluginLoader["unloadSingle"], + }; + + const lifecycle = pluginLifecycleManager( + {} as never, + { loader: loader as PluginLoader, workerManager }, + ); + const stopped = vi.fn(); + const started = vi.fn(); + lifecycle.on("plugin.worker_stopped", stopped); + lifecycle.on("plugin.worker_started", started); + + await lifecycle.restartWorker("plugin-1"); + + expect(loader.unloadSingle).toHaveBeenCalledWith("plugin-1", "example.plugin"); + expect(loader.loadSingle).toHaveBeenCalledWith("plugin-1"); + // The bare worker handle should NOT be bounced — the loader handles + // worker (re)start as part of activate. + expect(handle.restart).not.toHaveBeenCalled(); + expect(stopped).not.toHaveBeenCalled(); + expect(started).not.toHaveBeenCalled(); + }); + + it("falls back to bouncing the worker handle when the loader has no runtime services", async () => { + mockRegistry.getById.mockResolvedValue(pluginRecord); + mockRegistry.updateStatus.mockResolvedValue(pluginRecord); + + const { handle, workerManager } = makeWorkerManagerStub(); + + const loader: Partial = { + hasRuntimeServices: vi.fn().mockReturnValue(false) as PluginLoader["hasRuntimeServices"], + loadSingle: vi.fn() as PluginLoader["loadSingle"], + unloadSingle: vi.fn() as PluginLoader["unloadSingle"], + }; + + const lifecycle = pluginLifecycleManager( + {} as never, + { loader: loader as PluginLoader, workerManager }, + ); + const stopped = vi.fn(); + const started = vi.fn(); + lifecycle.on("plugin.worker_stopped", stopped); + lifecycle.on("plugin.worker_started", started); + + await lifecycle.restartWorker("plugin-1"); + + expect(loader.unloadSingle).not.toHaveBeenCalled(); + expect(loader.loadSingle).not.toHaveBeenCalled(); + expect(handle.restart).toHaveBeenCalledTimes(1); + expect(stopped).toHaveBeenCalledTimes(1); + expect(stopped).toHaveBeenCalledWith({ pluginId: "plugin-1", pluginKey: "example.plugin" }); + expect(started).toHaveBeenCalledTimes(1); + expect(started).toHaveBeenCalledWith({ pluginId: "plugin-1", pluginKey: "example.plugin" }); + }); +}); diff --git a/server/src/__tests__/server-startup-feedback-export.test.ts b/server/src/__tests__/server-startup-feedback-export.test.ts index 0cf1e664..c288b6e6 100644 --- a/server/src/__tests__/server-startup-feedback-export.test.ts +++ b/server/src/__tests__/server-startup-feedback-export.test.ts @@ -216,6 +216,35 @@ describe("startServer feedback export wiring", () => { serverPort: 3210, }); }); + + it("refuses authenticated public startup without an external database URL", async () => { + loadConfigMock.mockReturnValue(buildTestConfig({ + deploymentExposure: "public", + authBaseUrlMode: "explicit", + authPublicBaseUrl: "https://tenant.example.com", + databaseMode: "embedded-postgres", + databaseUrl: undefined, + })); + + await expect(startServer()).rejects.toThrow( + "authenticated public deployments require DATABASE_URL or config.database.connectionString", + ); + expect(createDbMock).not.toHaveBeenCalled(); + }); + + it("refuses authenticated public startup when DATABASE_URL is not a postgres URL", async () => { + loadConfigMock.mockReturnValue(buildTestConfig({ + deploymentExposure: "public", + authBaseUrlMode: "explicit", + authPublicBaseUrl: "https://tenant.example.com", + databaseUrl: "secret://paperclip-cloud/stacks/alpha/database/runtime-url", + })); + + await expect(startServer()).rejects.toThrow( + "authenticated public deployments require DATABASE_URL to be a postgres/postgresql connection string", + ); + expect(createDbMock).not.toHaveBeenCalled(); + }); }); describe("startServer authenticated auth origin setup", () => { diff --git a/server/src/app.ts b/server/src/app.ts index d0377184..421a72a4 100644 --- a/server/src/app.ts +++ b/server/src/app.ts @@ -59,6 +59,8 @@ import { pluginRegistryService } from "./services/plugin-registry.js"; import { createHostClientHandlers } from "@paperclipai/plugin-sdk"; import type { BetterAuthSessionResult } from "./auth/better-auth.js"; import { createCachedViteHtmlRenderer } from "./vite-html-renderer.js"; +import { DEFAULT_JSON_BODY_LIMIT, PORTABLE_JSON_BODY_LIMIT } from "./http/body-limits.js"; +import { COMPANY_IMPORT_API_PATH } from "./routes/company-import-paths.js"; type UiMode = "none" | "static" | "vite-dev"; const FEEDBACK_EXPORT_FLUSH_INTERVAL_MS = 5_000; @@ -81,6 +83,12 @@ const VITE_DEV_STATIC_PATHS = new Set([ "/sw.js", ]); +export function isDatabaseConnectionUnavailableError(err: unknown): boolean { + const error = err as { code?: unknown; message?: unknown; cause?: unknown }; + if (error?.code === "ECONNREFUSED") return true; + return Boolean(error?.cause && isDatabaseConnectionUnavailableError(error.cause)); +} + export function resolveViteHmrPort(serverPort: number): number { if (serverPort <= 55_535) { return serverPort + 10_000; @@ -136,13 +144,17 @@ export async function createApp( }, ) { const app = express(); + const captureRawBody = (req: express.Request, _res: express.Response, buf: Buffer) => { + (req as unknown as { rawBody: Buffer }).rawBody = buf; + }; + app.use(COMPANY_IMPORT_API_PATH, express.json({ + limit: PORTABLE_JSON_BODY_LIMIT, + verify: captureRawBody, + })); app.use(express.json({ - // Company import/export payloads can inline full portable packages. - limit: "10mb", - verify: (req, _res, buf) => { - (req as unknown as { rawBody: Buffer }).rawBody = buf; - }, + limit: DEFAULT_JSON_BODY_LIMIT, + verify: captureRawBody, })); app.use(httpLogger); const privateHostnameGateEnabled = shouldEnablePrivateHostnameGuard({ @@ -404,18 +416,37 @@ export async function createApp( jobCoordinator.start(); scheduler.start(); - const feedbackExportTimer = opts.feedbackExportService + let feedbackExportShuttingDown = false; + let feedbackExportTimer: ReturnType | null = null; + const disableFeedbackExportFlushes = () => { + feedbackExportShuttingDown = true; + if (feedbackExportTimer) { + clearInterval(feedbackExportTimer); + feedbackExportTimer = null; + } + }; + const flushPendingFeedbackExports = async () => { + if (feedbackExportShuttingDown) return; + try { + await opts.feedbackExportService?.flushPendingFeedbackTraces(); + } catch (err) { + if (isDatabaseConnectionUnavailableError(err)) { + disableFeedbackExportFlushes(); + logger.warn({ err }, "Disabling pending feedback export flushes because the database is unavailable"); + return; + } + logger.error({ err }, "Failed to flush pending feedback exports"); + } + }; + + feedbackExportTimer = opts.feedbackExportService ? setInterval(() => { - void opts.feedbackExportService?.flushPendingFeedbackTraces().catch((err) => { - logger.error({ err }, "Failed to flush pending feedback exports"); - }); + void flushPendingFeedbackExports(); }, FEEDBACK_EXPORT_FLUSH_INTERVAL_MS) : null; feedbackExportTimer?.unref?.(); if (opts.feedbackExportService) { - void opts.feedbackExportService.flushPendingFeedbackTraces().catch((err) => { - logger.error({ err }, "Failed to flush pending feedback exports"); - }); + void flushPendingFeedbackExports(); } void toolDispatcher.initialize().catch((err) => { logger.error({ err }, "Failed to initialize plugin tool dispatcher"); @@ -434,13 +465,19 @@ export async function createApp( }).catch((err) => { logger.error({ err }, "Failed to load ready plugins on startup"); }); - process.once("exit", () => { - if (feedbackExportTimer) clearInterval(feedbackExportTimer); + let appServicesShutdown = false; + const shutdownAppServices = () => { + if (appServicesShutdown) return; + appServicesShutdown = true; + disableFeedbackExportFlushes(); devWatcher?.close(); viteHtmlRenderer?.dispose(); hostServiceCleanup.disposeAll(); hostServiceCleanup.teardown(); - }); + }; + app.locals.paperclipShutdown = shutdownAppServices; + + process.once("exit", shutdownAppServices); process.once("beforeExit", () => { void flushPluginLogBuffer(); }); diff --git a/server/src/http/body-limits.ts b/server/src/http/body-limits.ts new file mode 100644 index 00000000..12c45705 --- /dev/null +++ b/server/src/http/body-limits.ts @@ -0,0 +1,3 @@ +export const DEFAULT_JSON_BODY_LIMIT = "10mb"; +export const PORTABLE_JSON_BODY_LIMIT = "64mb"; +export const PORTABLE_JSON_BODY_LIMIT_BYTES = 64 * 1024 * 1024; diff --git a/server/src/index.ts b/server/src/index.ts index 105f23ec..c039f758 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -187,6 +187,31 @@ export async function startServer(): Promise { return normalized === "127.0.0.1" || normalized === "localhost" || normalized === "::1"; } + function isPostgresConnectionString(connectionString: string): boolean { + try { + const parsed = new URL(connectionString); + return parsed.protocol === "postgres:" || parsed.protocol === "postgresql:"; + } catch { + return false; + } + } + + function assertCloudDatabaseContract(): void { + if (config.deploymentMode !== "authenticated" || config.deploymentExposure !== "public") { + return; + } + if (!config.databaseUrl) { + throw new Error( + "authenticated public deployments require DATABASE_URL or config.database.connectionString; refusing embedded PostgreSQL fallback", + ); + } + if (!isPostgresConnectionString(config.databaseUrl)) { + throw new Error( + "authenticated public deployments require DATABASE_URL to be a postgres/postgresql connection string", + ); + } + } + function rewriteLocalUrlPort(rawUrl: string | undefined, port: number): string | undefined { if (!rawUrl) return undefined; try { @@ -270,6 +295,7 @@ export async function startServer(): Promise { let startupDbInfo: | { mode: "external-postgres"; connectionString: string } | { mode: "embedded-postgres"; dataDir: string; port: number }; + assertCloudDatabaseContract(); if (config.databaseUrl) { const migrationUrl = config.databaseMigrationUrl ?? config.databaseUrl; migrationSummary = await ensureMigrations(migrationUrl, "PostgreSQL"); @@ -878,6 +904,9 @@ export async function startServer(): Promise { await telemetryClient.flush(); } + const appShutdown = (app as { locals?: { paperclipShutdown?: () => void } }).locals?.paperclipShutdown; + appShutdown?.(); + if (embeddedPostgres && embeddedPostgresStartedByThisProcess) { logger.info({ signal }, "Stopping embedded PostgreSQL"); try { diff --git a/server/src/middleware/error-handler.ts b/server/src/middleware/error-handler.ts index 032b5e1f..c455cbad 100644 --- a/server/src/middleware/error-handler.ts +++ b/server/src/middleware/error-handler.ts @@ -3,6 +3,7 @@ import { ZodError } from "zod"; import { HttpError } from "../errors.js"; import { trackErrorHandlerCrash } from "@paperclipai/shared/telemetry"; import { getTelemetryClient } from "../telemetry.js"; +import { COMPANY_IMPORT_API_PATH } from "../routes/company-import-paths.js"; export interface ErrorContext { error: { message: string; stack?: string; name?: string; details?: unknown; raw?: unknown }; @@ -74,5 +75,14 @@ export function errorHandler( const tc = getTelemetryClient(); if (tc) trackErrorHandlerCrash(tc, { errorCode: rootError.name }); - res.status(500).json({ error: "Internal server error" }); + res.status(500).json({ + error: "Internal server error", + ...(shouldExposeTrustedCloudTenantImportError(req) ? { message: rootError.message } : {}), + }); +} + +function shouldExposeTrustedCloudTenantImportError(req: Request) { + return req.actor?.source === "cloud_tenant" + && req.method === "POST" + && req.originalUrl.split("?")[0] === COMPANY_IMPORT_API_PATH; } diff --git a/server/src/routes/companies.ts b/server/src/routes/companies.ts index 6516e5ef..96a004ac 100644 --- a/server/src/routes/companies.ts +++ b/server/src/routes/companies.ts @@ -25,6 +25,7 @@ import { } from "../services/index.js"; import type { StorageService } from "../storage/types.js"; import { assertBoard, assertCompanyAccess, assertInstanceAdmin, getActorInfo } from "./authz.js"; +import { COMPANY_IMPORT_ROUTE_PATH } from "./company-import-paths.js"; export function companyRoutes(db: Db, storage?: StorageService) { const router = Router(); @@ -176,7 +177,7 @@ export function companyRoutes(db: Db, storage?: StorageService) { res.json(preview); }); - router.post("/import", validate(companyPortabilityImportSchema), async (req, res) => { + router.post(COMPANY_IMPORT_ROUTE_PATH, validate(companyPortabilityImportSchema), async (req, res) => { assertBoard(req); assertImportTargetAccess(req, req.body.target); const actor = getActorInfo(req); diff --git a/server/src/routes/company-import-paths.ts b/server/src/routes/company-import-paths.ts new file mode 100644 index 00000000..cf434900 --- /dev/null +++ b/server/src/routes/company-import-paths.ts @@ -0,0 +1,2 @@ +export const COMPANY_IMPORT_ROUTE_PATH = "/import"; +export const COMPANY_IMPORT_API_PATH = `/api/companies${COMPANY_IMPORT_ROUTE_PATH}`; diff --git a/server/src/routes/issues.ts b/server/src/routes/issues.ts index ab93132a..9b2b9da7 100644 --- a/server/src/routes/issues.ts +++ b/server/src/routes/issues.ts @@ -627,6 +627,18 @@ function shouldImplicitlyMoveCommentedIssueToTodo(input: { return true; } +function shouldHumanCommentResumeInProgressScheduledRetry(input: { + hasComment: boolean; + issueStatus: string | null | undefined; + assigneeAgentId: string | null | undefined; + actorType: "agent" | "user"; +}) { + if (!input.hasComment) return false; + if (input.actorType !== "user") return false; + if (input.issueStatus !== "in_progress") return false; + return typeof input.assigneeAgentId === "string" && input.assigneeAgentId.length > 0; +} + function isExplicitResumeCapableStatus(status: string | null | undefined) { return status === "done" || status === "blocked" || status === "todo" || status === "in_progress"; } @@ -873,6 +885,41 @@ export function issueRoutes( const feedbackExportService = opts?.feedbackExportService; const environmentsSvc = environmentService(db); + async function cancelScheduledRetrySupersededByComment(input: { + scheduledRetryRunId: string | null | undefined; + issue: { id: string; companyId: string }; + actor: ReturnType; + }) { + const scheduledRetryRunId = readNonEmptyString(input.scheduledRetryRunId); + if (!scheduledRetryRunId) return null; + + try { + const cancelled = await heartbeat.cancelRun(scheduledRetryRunId); + const cancelledRunId = cancelled?.id ?? scheduledRetryRunId; + await logActivity(db, { + companyId: input.issue.companyId, + actorType: input.actor.actorType, + actorId: input.actor.actorId, + agentId: input.actor.agentId, + runId: input.actor.runId, + action: "heartbeat.cancelled", + entityType: "heartbeat_run", + entityId: cancelledRunId, + details: { + source: "issue_comment_scheduled_retry_superseded", + issueId: input.issue.id, + }, + }); + return cancelledRunId; + } catch (err) { + logger.error( + { err, issueId: input.issue.id, runId: scheduledRetryRunId }, + "failed to cancel scheduled retry superseded by issue comment", + ); + throw err; + } + } + async function classifySourceRecoveryRevalidation(input: { issue: IssueRouteSnapshot; trigger: RecoveryRevalidationTrigger; @@ -1762,6 +1809,8 @@ export function issueRoutes( ? Number.parseInt(rawOffset, 10) : null; const attention = req.query.attention as string | undefined; + const sortField = req.query.sortField as string | undefined; + const sortDir = req.query.sortDir as string | undefined; if (assigneeUserFilterRaw === "me" && (!assigneeUserId || req.actor.type !== "board")) { res.status(403).json({ error: "assigneeUserId=me requires board authentication" }); @@ -1791,6 +1840,14 @@ export function issueRoutes( res.status(400).json({ error: "offset must be a non-negative integer" }); return; } + if (sortField !== undefined && sortField !== "updated") { + res.status(400).json({ error: "sortField must be 'updated' when provided" }); + return; + } + if (sortDir !== undefined && sortDir !== "asc" && sortDir !== "desc") { + res.status(400).json({ error: "sortDir must be 'asc' or 'desc' when provided" }); + return; + } const offset = parsedOffset ?? 0; const result = await svc.list(companyId, { @@ -1823,6 +1880,8 @@ export function issueRoutes( q: req.query.q as string | undefined, limit, offset, + sortField: sortField === "updated" ? "updated" : undefined, + sortDir: sortDir === "asc" || sortDir === "desc" ? sortDir : undefined, }); const issueIds = result.map((issue) => issue.id); const [handoffStates, recoveryActionByIssue] = await Promise.all([ @@ -3387,6 +3446,18 @@ export function issueRoutes( ) { return; } + const scheduledRetryForHumanComment = + shouldHumanCommentResumeInProgressScheduledRetry({ + hasComment: !!commentBody, + issueStatus: existing.status, + assigneeAgentId: requestedAssigneeAgentId, + actorType: actor.actorType, + }) + ? await svc.getCurrentScheduledRetry(existing.id) + : null; + const shouldResumeInProgressScheduledRetry = + !!scheduledRetryForHumanComment && + scheduledRetryForHumanComment.agentId === requestedAssigneeAgentId; const effectiveMoveToTodoRequested = explicitMoveToTodoRequested || (!!commentBody && @@ -3395,7 +3466,8 @@ export function issueRoutes( assigneeAgentId: requestedAssigneeAgentId, actorType: actor.actorType, actorId: actor.actorId, - })); + })) || + shouldResumeInProgressScheduledRetry; const updateReferenceSummaryBefore = titleOrDescriptionChanged ? await issueReferencesSvc.listIssueReferenceSummary(existing.id) : null; @@ -3457,11 +3529,23 @@ export function issueRoutes( if ( commentBody && effectiveMoveToTodoRequested && - (isClosed || (isBlocked && !hasUnresolvedFirstClassBlockers)) && + (isClosed || (isBlocked && !hasUnresolvedFirstClassBlockers) || shouldResumeInProgressScheduledRetry) && updateFields.status === undefined ) { updateFields.status = "todo"; } + let cancelledScheduledRetryRunId: string | null = null; + if ( + commentBody && + shouldResumeInProgressScheduledRetry && + updateFields.status === "todo" + ) { + cancelledScheduledRetryRunId = await cancelScheduledRetrySupersededByComment({ + scheduledRetryRunId: scheduledRetryForHumanComment?.runId, + issue: existing, + actor, + }); + } if (req.body.executionPolicy !== undefined) { updateFields.executionPolicy = applyActorMonitorScheduledBy( normalizeIssueExecutionPolicy(req.body.executionPolicy), @@ -3715,6 +3799,11 @@ export function issueRoutes( previous.status !== undefined && issue.status === "todo"; const reopenFromStatus = reopened ? existing.status : null; + const scheduledRetrySupersededByComment = + shouldResumeInProgressScheduledRetry && + previous.status !== undefined && + existing.status === "in_progress" && + issue.status === "todo"; const statusChangedFromBlockedToTodo = existing.status === "blocked" && issue.status === "todo" && @@ -3756,6 +3845,13 @@ export function issueRoutes( ...(commentBody ? { source: "comment" } : {}), ...(resumeRequested === true ? { resumeIntent: true, followUpRequested: true } : {}), ...(reopened ? { reopened: true, reopenedFrom: reopenFromStatus } : {}), + ...(scheduledRetrySupersededByComment + ? { + scheduledRetrySupersededByComment: true, + scheduledRetryRunId: scheduledRetryForHumanComment?.runId ?? null, + ...(cancelledScheduledRetryRunId ? { cancelledScheduledRetryRunId } : {}), + } + : {}), ...(interruptedRunId ? { interruptedRunId } : {}), ...(cancelledStatusRunId ? { cancelledStatusRunId } : {}), ...(workspaceChange ? { workspaceChange } : {}), @@ -3973,6 +4069,13 @@ export function issueRoutes( issueTitle: issue.title, ...(resumeRequested === true ? { resumeIntent: true, followUpRequested: true } : {}), ...(reopened ? { reopened: true, reopenedFrom: reopenFromStatus, source: "comment" } : {}), + ...(scheduledRetrySupersededByComment + ? { + scheduledRetrySupersededByComment: true, + scheduledRetryRunId: scheduledRetryForHumanComment?.runId ?? null, + ...(cancelledScheduledRetryRunId ? { cancelledScheduledRetryRunId } : {}), + } + : {}), ...(interruptedRunId ? { interruptedRunId } : {}), ...(hasFieldChanges ? { updated: true } : {}), ...summarizeIssueReferenceActivityDetails({ @@ -4470,7 +4573,17 @@ export function issueRoutes( return; } assertCompanyAccess(req, issue.companyId); - const interactions = await issueThreadInteractionService(db).listForIssue(id); + const actor = getActorInfo(req); + const interactionSvc = issueThreadInteractionService(db); + const expiredInteractions = await interactionSvc.expireRequestConfirmationsSupersededByHistoricalComments(issue); + await logExpiredRequestConfirmations({ + issue, + interactions: expiredInteractions, + actor, + source: "issue.interactions.catchup_superseded_by_comment", + }); + + const interactions = await interactionSvc.listForIssue(id); res.json(interactions); }); @@ -4976,6 +5089,18 @@ export function issueRoutes( const isClosed = isClosedIssueStatus(issue.status); const isBlocked = issue.status === "blocked"; const explicitMoveToTodoRequested = reopenRequested || resumeRequested === true; + const scheduledRetryForHumanComment = + shouldHumanCommentResumeInProgressScheduledRetry({ + hasComment: true, + issueStatus: issue.status, + assigneeAgentId: issue.assigneeAgentId, + actorType: actor.actorType, + }) + ? await svc.getCurrentScheduledRetry(issue.id) + : null; + const shouldResumeInProgressScheduledRetry = + !!scheduledRetryForHumanComment && + scheduledRetryForHumanComment.agentId === issue.assigneeAgentId; const effectiveMoveToTodoRequested = explicitMoveToTodoRequested || shouldImplicitlyMoveCommentedIssueToTodo({ @@ -4983,7 +5108,8 @@ export function issueRoutes( assigneeAgentId: issue.assigneeAgentId, actorType: actor.actorType, actorId: actor.actorId, - }); + }) || + shouldResumeInProgressScheduledRetry; const hasUnresolvedFirstClassBlockers = isBlocked && effectiveMoveToTodoRequested ? (await svc.getDependencyReadiness(issue.id)).unresolvedBlockerCount > 0 @@ -4998,14 +5124,27 @@ export function issueRoutes( let currentIssue = issue; const commentReferenceSummaryBefore = await issueReferencesSvc.listIssueReferenceSummary(issue.id); - if (effectiveMoveToTodoRequested && (isClosed || (isBlocked && !hasUnresolvedFirstClassBlockers))) { + let scheduledRetrySupersededByComment = false; + let cancelledScheduledRetryRunId: string | null = null; + if ( + effectiveMoveToTodoRequested && + (isClosed || (isBlocked && !hasUnresolvedFirstClassBlockers) || shouldResumeInProgressScheduledRetry) + ) { + scheduledRetrySupersededByComment = shouldResumeInProgressScheduledRetry && issue.status === "in_progress"; + cancelledScheduledRetryRunId = scheduledRetrySupersededByComment + ? await cancelScheduledRetrySupersededByComment({ + scheduledRetryRunId: scheduledRetryForHumanComment?.runId, + issue, + actor, + }) + : null; const reopenedIssue = await svc.update(id, { status: "todo" }); if (!reopenedIssue) { res.status(404).json({ error: "Issue not found" }); return; } - reopened = true; - reopenFromStatus = issue.status; + reopened = isClosed || (isBlocked && !hasUnresolvedFirstClassBlockers); + reopenFromStatus = reopened ? issue.status : null; currentIssue = reopenedIssue; await logActivity(db, { @@ -5019,8 +5158,14 @@ export function issueRoutes( entityId: currentIssue.id, details: { status: "todo", - reopened: true, - reopenedFrom: reopenFromStatus, + ...(reopened ? { reopened: true, reopenedFrom: reopenFromStatus } : {}), + ...(scheduledRetrySupersededByComment + ? { + scheduledRetrySupersededByComment: true, + scheduledRetryRunId: scheduledRetryForHumanComment?.runId ?? null, + ...(cancelledScheduledRetryRunId ? { cancelledScheduledRetryRunId } : {}), + } + : {}), source: "comment", ...(resumeRequested === true ? { resumeIntent: true, followUpRequested: true } : {}), identifier: currentIssue.identifier, @@ -5091,6 +5236,13 @@ export function issueRoutes( issueTitle: currentIssue.title, ...(resumeRequested === true ? { resumeIntent: true, followUpRequested: true } : {}), ...(reopened ? { reopened: true, reopenedFrom: reopenFromStatus, source: "comment" } : {}), + ...(scheduledRetrySupersededByComment + ? { + scheduledRetrySupersededByComment: true, + scheduledRetryRunId: scheduledRetryForHumanComment?.runId ?? null, + ...(cancelledScheduledRetryRunId ? { cancelledScheduledRetryRunId } : {}), + } + : {}), ...(interruptedRunId ? { interruptedRunId } : {}), ...summarizeIssueReferenceActivityDetails({ addedReferencedIssues: commentReferenceDiff.addedReferencedIssues.map(summarizeIssueRelationForActivity), @@ -5119,7 +5271,7 @@ export function issueRoutes( issue: currentIssue, trigger: "comment", actor, - statusChanged: reopened, + statusChanged: reopened || scheduledRetrySupersededByComment, resumeRequested: resumeRequested === true, reopened, blockedToTodoRecovery: reopened && reopenFromStatus === "blocked" && currentIssue.status === "todo", diff --git a/server/src/services/issue-thread-interactions.ts b/server/src/services/issue-thread-interactions.ts index 80b31c11..6fc377c2 100644 --- a/server/src/services/issue-thread-interactions.ts +++ b/server/src/services/issue-thread-interactions.ts @@ -1,5 +1,5 @@ import { isDeepStrictEqual } from "node:util"; -import { and, asc, eq, inArray } from "drizzle-orm"; +import { and, asc, eq, inArray, isNotNull } from "drizzle-orm"; import type { Db } from "@paperclipai/db"; import { documents, @@ -158,6 +158,20 @@ function shouldReturnAcceptedConfirmationToCreatorAgent(args: { return true; } +function shouldSupersedeRequestConfirmationOnUserComment(interaction: RequestConfirmationInteraction) { + return interaction.payload.supersedeOnUserComment === true; +} + +function isCommentAtOrAfterInteraction(args: { + commentCreatedAt: Date | string; + interactionCreatedAt: Date | string; +}) { + const commentCreatedAtMs = new Date(args.commentCreatedAt).getTime(); + const interactionCreatedAtMs = new Date(args.interactionCreatedAt).getTime(); + if (!Number.isFinite(commentCreatedAtMs) || !Number.isFinite(interactionCreatedAtMs)) return false; + return commentCreatedAtMs >= interactionCreatedAtMs; +} + function buildTaskCreationOrder(tasks: ReadonlyArray) { const taskByClientKey = new Map(tasks.map((task) => [task.clientKey, task] as const)); const ordered: Array = []; @@ -967,7 +981,7 @@ export function issueThreadInteractionService(db: Db) { expireRequestConfirmationsSupersededByComment: async ( issue: { id: string; companyId: string }, - comment: { id: string; authorUserId?: string | null }, + comment: { id: string; createdAt: Date | string; authorUserId?: string | null }, actor: InteractionActor, ) => { if (!comment.authorUserId) return []; @@ -984,7 +998,13 @@ export function issueThreadInteractionService(db: Db) { const superseded = rows.filter((row) => { const interaction = hydrateInteraction(row) as RequestConfirmationInteraction; - return interaction.payload.supersedeOnUserComment === true; + return ( + shouldSupersedeRequestConfirmationOnUserComment(interaction) + && isCommentAtOrAfterInteraction({ + commentCreatedAt: comment.createdAt, + interactionCreatedAt: row.createdAt, + }) + ); }); if (superseded.length === 0) return []; @@ -1020,6 +1040,91 @@ export function issueThreadInteractionService(db: Db) { return expired; }, + expireRequestConfirmationsSupersededByHistoricalComments: async ( + issue: { id: string; companyId: string }, + ) => { + const [rows, comments] = await Promise.all([ + db + .select() + .from(issueThreadInteractions) + .where(and( + eq(issueThreadInteractions.companyId, issue.companyId), + eq(issueThreadInteractions.issueId, issue.id), + eq(issueThreadInteractions.kind, "request_confirmation"), + eq(issueThreadInteractions.status, "pending"), + )), + db + .select() + .from(issueComments) + .where(and( + eq(issueComments.companyId, issue.companyId), + eq(issueComments.issueId, issue.id), + isNotNull(issueComments.authorUserId), + )) + .orderBy(asc(issueComments.createdAt)), + ]); + + if (rows.length === 0 || comments.length === 0) return []; + + const now = new Date(); + const expired: IssueThreadInteraction[] = []; + const supersededByComment = new Map< + string, + { + comment: (typeof comments)[number]; + rowIds: string[]; + } + >(); + for (const row of rows) { + const interaction = hydrateInteraction(row) as RequestConfirmationInteraction; + if (!shouldSupersedeRequestConfirmationOnUserComment(interaction)) continue; + + const supersedingComment = comments.find((comment) => isCommentAtOrAfterInteraction({ + commentCreatedAt: comment.createdAt, + interactionCreatedAt: row.createdAt, + })); + if (!supersedingComment) continue; + + const group = supersededByComment.get(supersedingComment.id); + if (group) { + group.rowIds.push(row.id); + } else { + supersededByComment.set(supersedingComment.id, { + comment: supersedingComment, + rowIds: [row.id], + }); + } + } + + for (const { comment, rowIds } of supersededByComment.values()) { + const updatedRows = await db + .update(issueThreadInteractions) + .set({ + status: "expired", + result: { + version: 1, + outcome: "superseded_by_comment", + commentId: comment.id, + }, + resolvedByAgentId: null, + resolvedByUserId: comment.authorUserId, + resolvedAt: now, + updatedAt: now, + }) + .where(and( + inArray(issueThreadInteractions.id, rowIds), + eq(issueThreadInteractions.status, "pending"), + )) + .returning(); + expired.push(...updatedRows.map(hydrateInteraction)); + } + + if (expired.length > 0) { + await touchIssue(db, issue.id); + } + return expired; + }, + expireStaleRequestConfirmationsForIssueDocument: async ( issue: { id: string; companyId: string }, document: { id: string; key: string; latestRevisionId?: string | null; latestRevisionNumber?: number | null } | null, diff --git a/server/src/services/issues.ts b/server/src/services/issues.ts index ae349906..0ab88d3e 100644 --- a/server/src/services/issues.ts +++ b/server/src/services/issues.ts @@ -1,5 +1,5 @@ import { Buffer } from "node:buffer"; -import { and, asc, desc, eq, gt, inArray, isNull, like, lt, ne, notInArray, or, sql } from "drizzle-orm"; +import { and, asc, desc, eq, gt, inArray, isNull, like, lt, ne, notInArray, or, sql, type SQL } from "drizzle-orm"; import type { Db } from "@paperclipai/db"; import { activityLog, @@ -239,6 +239,8 @@ export interface IssueFilters { q?: string; limit?: number; offset?: number; + sortField?: "updated"; + sortDir?: "asc" | "desc"; } type IssueRow = typeof issues.$inferSelect; @@ -782,6 +784,43 @@ function latestIssueActivityAt(...values: Array> { const map = new Map(); if (issueIds.length === 0) return map; @@ -3521,18 +3560,17 @@ export function issueService(db: Db) { ELSE 6 END `; - const canonicalLastActivityAt = issueCanonicalLastActivityAtExpr(companyId); const baseQuery = db .select(issueListSelect) .from(issues) .where(and(...conditions)) - .orderBy( - hasSearch ? asc(searchOrder) : asc(priorityOrder), - asc(priorityOrder), - desc(canonicalLastActivityAt), - desc(issues.updatedAt), - desc(issues.id), - ); + .orderBy(...issueListOrderBy(companyId, { + hasSearch, + priorityOrder, + searchOrder, + sortField: filters?.sortField, + sortDir: filters?.sortDir, + })); const pageQuery = offset > 0 ? (limit === undefined ? baseQuery.offset(offset) : baseQuery.limit(limit).offset(offset)) : (limit === undefined ? baseQuery : baseQuery.limit(limit)); diff --git a/server/src/services/plugin-lifecycle.ts b/server/src/services/plugin-lifecycle.ts index e9f9b02b..24fa13e7 100644 --- a/server/src/services/plugin-lifecycle.ts +++ b/server/src/services/plugin-lifecycle.ts @@ -776,19 +776,47 @@ export function pluginLifecycleManager( ); } - log.info( - { pluginId, pluginKey: plugin.pluginKey }, - "plugin lifecycle: restarting worker", - ); + const supportsRuntimeActivation = + typeof pluginLoaderInstance.hasRuntimeServices === "function" + && typeof pluginLoaderInstance.loadSingle === "function" + && typeof pluginLoaderInstance.unloadSingle === "function" + && pluginLoaderInstance.hasRuntimeServices(); - await handle.restart(); + if (supportsRuntimeActivation) { + log.info( + { pluginId, pluginKey: plugin.pluginKey }, + "plugin lifecycle: reloading plugin (re-reading manifest, re-applying pending migrations, restarting worker)", + ); - emitDomain("plugin.worker_stopped", { pluginId, pluginKey: plugin.pluginKey }); - emitDomain("plugin.worker_started", { pluginId, pluginKey: plugin.pluginKey }); + // Full deactivate+reactivate cycle (not just `handle.restart()`) so that: + // - the manifest is re-read from disk, picking up newly declared + // `migrations/*.sql` files and any other manifest changes, + // - `applyMigrations` runs idempotently against the up-to-date + // migrations directory — pending migrations get applied, already- + // applied ones are skipped via the `pluginMigrations` table, + // - the worker subprocess is replaced with one loading the freshly + // built bundle. + // + // Bouncing the worker process alone (`handle.restart()`) leaves plugin + // schema out of sync with worker code whenever a hot reload adds a new + // migration, which makes downstream queries fail against missing tables. + await deactivatePluginRuntime(pluginId, plugin.pluginKey); + await activateReadyPlugin(pluginId); + } else { + // No runtime activation services wired in (e.g. state-only test harness) + // — fall back to a bare worker subprocess bounce. + log.info( + { pluginId, pluginKey: plugin.pluginKey }, + "plugin lifecycle: restarting worker (runtime services unavailable; skipping migration re-apply)", + ); + await handle.restart(); + emitDomain("plugin.worker_stopped", { pluginId, pluginKey: plugin.pluginKey }); + emitDomain("plugin.worker_started", { pluginId, pluginKey: plugin.pluginKey }); + } log.info( { pluginId, pluginKey: plugin.pluginKey }, - "plugin lifecycle: worker restarted", + "plugin lifecycle: plugin reloaded", ); }, diff --git a/server/src/services/plugin-worker-manager.ts b/server/src/services/plugin-worker-manager.ts index daedc521..6e1d7689 100644 --- a/server/src/services/plugin-worker-manager.ts +++ b/server/src/services/plugin-worker-manager.ts @@ -653,7 +653,9 @@ export function createPluginWorkerHandle( // Handle process errors (e.g. spawn failure) child.on("error", (err) => { log.error({ err: err.message }, "worker process error"); - emitter.emit("error", { pluginId, error: err }); + if (emitter.listenerCount("error") > 0) { + emitter.emit("error", { pluginId, error: err }); + } if (status === "starting") { setStatus("crashed"); rejectAllPending( diff --git a/ui/src/api/issues.test.ts b/ui/src/api/issues.test.ts index 4013f67d..56bdcf47 100644 --- a/ui/src/api/issues.test.ts +++ b/ui/src/api/issues.test.ts @@ -51,6 +51,18 @@ describe("issuesApi.list", () => { ); }); + it("passes issue list sort options through to the company issues endpoint", async () => { + await issuesApi.list("company-1", { + limit: 500, + sortField: "updated", + sortDir: "desc", + }); + + expect(mockApi.get).toHaveBeenCalledWith( + "/companies/company-1/issues?limit=500&sortField=updated&sortDir=desc", + ); + }); + it("posts recovery action resolution to the source issue endpoint", async () => { await issuesApi.resolveRecoveryAction("issue-1", { actionId: "00000000-0000-0000-0000-0000000000aa", diff --git a/ui/src/api/issues.ts b/ui/src/api/issues.ts index 14def41b..2e35416a 100644 --- a/ui/src/api/issues.ts +++ b/ui/src/api/issues.ts @@ -60,6 +60,8 @@ export const issuesApi = { q?: string; limit?: number; offset?: number; + sortField?: "updated"; + sortDir?: "asc" | "desc"; }, ) => { const params = new URLSearchParams(); @@ -86,6 +88,8 @@ export const issuesApi = { if (filters?.q) params.set("q", filters.q); if (filters?.limit) params.set("limit", String(filters.limit)); if (filters?.offset !== undefined) params.set("offset", String(filters.offset)); + if (filters?.sortField) params.set("sortField", filters.sortField); + if (filters?.sortDir) params.set("sortDir", filters.sortDir); const qs = params.toString(); return api.get(`/companies/${companyId}/issues${qs ? `?${qs}` : ""}`); },