From ad6effa65c6cb17266ba392e614dc8ea6916ce7b Mon Sep 17 00:00:00 2001 From: Dotta <34892728+cryppadotta@users.noreply.github.com> Date: Fri, 22 May 2026 09:57:22 -0500 Subject: [PATCH] [codex] Improve runtime and import reliability (#6549) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Thinking Path > - Paperclip coordinates autonomous company work through local and hosted runtime surfaces. > - Local embedded Postgres and tenant import/export paths are foundational reliability pieces. > - A runtime failure in either path can stop agents or imports before useful work begins. > - The branch included remaining fixes for embedded native library bootstrap and async tenant import handling. > - This pull request groups those runtime/import reliability changes into one standalone PR. > - The benefit is a more robust local runtime and safer cloud tenant import behavior. ## What Changed - Prepared embedded Postgres native runtime before startup in CLI/server/test entrypoints. - Added embedded Postgres native bootstrap coverage. - Added async tenant import job handling and deferred validation coverage. - Kept the runtime/import changes based directly on current `origin/master` after related upstream PRs had already merged. ## Verification - `pnpm --filter @paperclipai/plugin-sdk build` - `NODE_ENV=test pnpm exec vitest run packages/db/src/embedded-postgres-native.test.ts server/src/__tests__/company-portability-routes.test.ts` ## Risks - Medium-low: this touches startup/import paths, but the branch is small and covered by targeted tests. - The embedded Postgres change depends on platform-specific native-library behavior, so CI and follow-up checks should still verify supported runners. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI GPT-5 Codex via `codex_local`, tool-enabled coding session; exact context window not exposed by this runtime. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --- cli/src/commands/routines.ts | 2 + cli/src/commands/worktree.ts | 2 + .../db/src/embedded-postgres-native.test.ts | 43 ++++ packages/db/src/embedded-postgres-native.ts | 85 +++++++ packages/db/src/index.ts | 4 + packages/db/src/migration-runtime.ts | 2 + packages/db/src/test-embedded-postgres.ts | 2 + .../company-portability-routes.test.ts | 164 +++++++++++++ server/src/index.ts | 2 + server/src/routes/companies.ts | 224 ++++++++++++++++-- 10 files changed, 511 insertions(+), 19 deletions(-) create mode 100644 packages/db/src/embedded-postgres-native.test.ts create mode 100644 packages/db/src/embedded-postgres-native.ts diff --git a/cli/src/commands/routines.ts b/cli/src/commands/routines.ts index 4c5a3ca9..63ff89d8 100644 --- a/cli/src/commands/routines.ts +++ b/cli/src/commands/routines.ts @@ -9,6 +9,7 @@ import { createEmbeddedPostgresLogBuffer, ensurePostgresDatabase, formatEmbeddedPostgresError, + prepareEmbeddedPostgresNativeRuntime, routines, } from "@paperclipai/db"; import { eq, inArray } from "drizzle-orm"; @@ -116,6 +117,7 @@ async function ensureEmbeddedPostgres(dataDir: string, preferredPort: number): P "Embedded PostgreSQL support requires dependency `embedded-postgres`. Reinstall dependencies and try again.", ); } + await prepareEmbeddedPostgresNativeRuntime(); const postmasterPidFile = path.resolve(dataDir, "postmaster.pid"); const runningPid = readRunningPostmasterPid(postmasterPidFile); diff --git a/cli/src/commands/worktree.ts b/cli/src/commands/worktree.ts index fea6f2c3..db0ca008 100644 --- a/cli/src/commands/worktree.ts +++ b/cli/src/commands/worktree.ts @@ -45,6 +45,7 @@ import { runDatabaseRestore, createEmbeddedPostgresLogBuffer, formatEmbeddedPostgresError, + prepareEmbeddedPostgresNativeRuntime, } from "@paperclipai/db"; import type { Command } from "commander"; import { ensureAgentJwtSecret, loadPaperclipEnvFile, mergePaperclipEnvEntries, readPaperclipEnvEntries, resolvePaperclipEnvFile } from "../config/env.js"; @@ -1059,6 +1060,7 @@ async function ensureEmbeddedPostgres(dataDir: string, preferredPort: number): P "Embedded PostgreSQL support requires dependency `embedded-postgres`. Reinstall dependencies and try again.", ); } + await prepareEmbeddedPostgresNativeRuntime(); const postmasterPidFile = path.resolve(dataDir, "postmaster.pid"); const runningPid = readRunningPostmasterPid(postmasterPidFile); diff --git a/packages/db/src/embedded-postgres-native.test.ts b/packages/db/src/embedded-postgres-native.test.ts new file mode 100644 index 00000000..7335f2c6 --- /dev/null +++ b/packages/db/src/embedded-postgres-native.test.ts @@ -0,0 +1,43 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { ensureLinuxSharedLibraryAliases } from "./embedded-postgres-native.js"; + +describe("embedded Postgres native runtime", () => { + const tempDirs: string[] = []; + + afterEach(() => { + for (const tempDir of tempDirs.splice(0)) { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it.runIf(process.platform !== "win32")("creates soname aliases for bundled patch-level shared libraries", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "paperclip-embedded-pg-libs-")); + tempDirs.push(tempDir); + fs.writeFileSync(path.join(tempDir, "libicuuc.so.60.2"), ""); + fs.writeFileSync(path.join(tempDir, "libicui18n.so.60.2"), ""); + fs.writeFileSync(path.join(tempDir, "README.md"), ""); + + const created = await ensureLinuxSharedLibraryAliases(tempDir); + + expect(created.map((file) => path.basename(file)).sort()).toEqual([ + "libicui18n.so.60", + "libicuuc.so.60", + ]); + expect(fs.readlinkSync(path.join(tempDir, "libicuuc.so.60"))).toBe("libicuuc.so.60.2"); + }); + + it.runIf(process.platform !== "win32")("is idempotent when aliases already exist", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "paperclip-embedded-pg-libs-")); + tempDirs.push(tempDir); + fs.writeFileSync(path.join(tempDir, "libicuuc.so.60.2"), ""); + + await ensureLinuxSharedLibraryAliases(tempDir); + const second = await ensureLinuxSharedLibraryAliases(tempDir); + + expect(second).toEqual([]); + expect(fs.readlinkSync(path.join(tempDir, "libicuuc.so.60"))).toBe("libicuuc.so.60.2"); + }); +}); diff --git a/packages/db/src/embedded-postgres-native.ts b/packages/db/src/embedded-postgres-native.ts new file mode 100644 index 00000000..a304f8a1 --- /dev/null +++ b/packages/db/src/embedded-postgres-native.ts @@ -0,0 +1,85 @@ +import { promises as fs } from "node:fs"; +import { createRequire } from "node:module"; +import path from "node:path"; + +const require = createRequire(import.meta.url); + +function resolveNativePackageName(): string | null { + if (process.platform !== "linux") return null; + + switch (process.arch) { + case "arm64": + return "linux-arm64"; + case "arm": + return "linux-arm"; + case "ia32": + return "linux-ia32"; + case "ppc64": + return "linux-ppc64"; + case "x64": + return "linux-x64"; + default: + return null; + } +} + +async function pathExists(value: string): Promise { + try { + await fs.stat(value); + return true; + } catch { + return false; + } +} + +function resolveEmbeddedPostgresPackageRoot(): string | null { + try { + const entry = require.resolve("embedded-postgres"); + return path.dirname(path.dirname(entry)); + } catch { + return null; + } +} + +function prependPathEnv(name: string, value: string): void { + const current = process.env[name] ?? ""; + const parts = current.split(path.delimiter).filter(Boolean); + if (parts.includes(value)) return; + process.env[name] = [value, ...parts].join(path.delimiter); +} + +export async function ensureLinuxSharedLibraryAliases(libDir: string): Promise { + const entries = await fs.readdir(libDir, { withFileTypes: true }); + const created: string[] = []; + + for (const entry of entries) { + if (!entry.isFile()) continue; + const match = entry.name.match(/^(lib.+\.so\.\d+)\.\d+(?:\.\d+)?$/); + if (!match) continue; + + const aliasName = match[1]; + const aliasPath = path.join(libDir, aliasName); + try { + await fs.symlink(entry.name, aliasPath); + created.push(aliasPath); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "EEXIST") continue; + throw error; + } + } + + return created; +} + +export async function prepareEmbeddedPostgresNativeRuntime(): Promise { + const nativePackageName = resolveNativePackageName(); + const packageRoot = resolveEmbeddedPostgresPackageRoot(); + if (!nativePackageName || !packageRoot) return; + + const nativeRoot = path.resolve(packageRoot, "..", "@embedded-postgres", nativePackageName); + const libDir = path.join(nativeRoot, "native", "lib"); + if (!(await pathExists(libDir))) return; + + prependPathEnv("LD_LIBRARY_PATH", libDir); + await ensureLinuxSharedLibraryAliases(libDir); +} diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index 3bf4d22b..a1c5bce8 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -30,6 +30,10 @@ export { createEmbeddedPostgresLogBuffer, formatEmbeddedPostgresError, } from "./embedded-postgres-error.js"; +export { + ensureLinuxSharedLibraryAliases, + prepareEmbeddedPostgresNativeRuntime, +} from "./embedded-postgres-native.js"; export { issueRelations } from "./schema/issue_relations.js"; export { issueReferenceMentions } from "./schema/issue_reference_mentions.js"; export * from "./schema/index.js"; diff --git a/packages/db/src/migration-runtime.ts b/packages/db/src/migration-runtime.ts index 5aa2b6a2..3823b6d0 100644 --- a/packages/db/src/migration-runtime.ts +++ b/packages/db/src/migration-runtime.ts @@ -3,6 +3,7 @@ import { createServer } from "node:net"; import path from "node:path"; import { ensurePostgresDatabase, getPostgresDataDirectory } from "./client.js"; import { createEmbeddedPostgresLogBuffer, formatEmbeddedPostgresError } from "./embedded-postgres-error.js"; +import { prepareEmbeddedPostgresNativeRuntime } from "./embedded-postgres-native.js"; import { resolveDatabaseTarget } from "./runtime-config.js"; type EmbeddedPostgresInstance = { @@ -92,6 +93,7 @@ async function ensureEmbeddedPostgresConnection( preferredPort: number, ): Promise { const EmbeddedPostgres = await loadEmbeddedPostgresCtor(); + await prepareEmbeddedPostgresNativeRuntime(); const selectedPort = await findAvailablePort(preferredPort); const postmasterPidFile = path.resolve(dataDir, "postmaster.pid"); const pgVersionFile = path.resolve(dataDir, "PG_VERSION"); diff --git a/packages/db/src/test-embedded-postgres.ts b/packages/db/src/test-embedded-postgres.ts index 87db5cde..929d17c5 100644 --- a/packages/db/src/test-embedded-postgres.ts +++ b/packages/db/src/test-embedded-postgres.ts @@ -3,6 +3,7 @@ import net from "node:net"; import os from "node:os"; import path from "node:path"; import { applyPendingMigrations, ensurePostgresDatabase } from "./client.js"; +import { prepareEmbeddedPostgresNativeRuntime } from "./embedded-postgres-native.js"; type EmbeddedPostgresInstance = { initialise(): Promise; @@ -48,6 +49,7 @@ function getReservedTestPorts(): Set { async function getEmbeddedPostgresCtor(): Promise { const mod = await import("embedded-postgres"); + await prepareEmbeddedPostgresNativeRuntime(); return mod.default as EmbeddedPostgresCtor; } diff --git a/server/src/__tests__/company-portability-routes.test.ts b/server/src/__tests__/company-portability-routes.test.ts index 26f6df16..42590525 100644 --- a/server/src/__tests__/company-portability-routes.test.ts +++ b/server/src/__tests__/company-portability-routes.test.ts @@ -139,6 +139,58 @@ function createExportResult() { }; } +const importRequest = { + source: { type: "inline", files: { "COMPANY.md": "---\nname: Test\n---\n" } }, + include: { company: true, agents: true, projects: false, issues: false }, + target: { mode: "existing_company", companyId }, + collisionStrategy: "rename", +}; + +const cloudHeaders = { + "x-paperclip-cloud-stack-id": "stack-alpha", + "x-paperclip-cloud-paperclip-company-id": companyId, +}; + +function cloudTenantActor() { + return { + type: "board", + userId: "cloud-user-1", + userName: "Cloud User", + userEmail: "cloud-user@example.com", + companyIds: [companyId], + memberships: [{ companyId, membershipRole: "owner", status: "active" }], + isInstanceAdmin: true, + source: "cloud_tenant", + }; +} + +function createImportResult(action = "updated") { + return { + company: { id: companyId, action }, + agents: [{ id: "agent-1" }], + warnings: [], + }; +} + +async function waitForImportJobStatus(app: express.Express, statusUrl: string, status: string) { + for (let attempt = 0; attempt < 20; attempt += 1) { + const res = await request(app).get(statusUrl).set(cloudHeaders); + if (res.body.job?.status === status) { + return res; + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + throw new Error(`Timed out waiting for import job to reach ${status}`); +} + +async function waitForCondition(condition: () => boolean, label: string) { + for (let attempt = 0; attempt < 20; attempt += 1) { + if (condition()) return; + await new Promise((resolve) => setTimeout(resolve, 10)); + } + throw new Error(`Timed out waiting for ${label}`); +} + describe.sequential("company portability routes", () => { beforeEach(() => { vi.clearAllMocks(); @@ -426,4 +478,116 @@ describe.sequential("company portability routes", () => { expect(res.body.error).toContain("Instance admin"); expect(mockCompanyPortabilityService.importBundle).not.toHaveBeenCalled(); }); + + it.sequential("accepts trusted Cloud async import jobs and reports success by job id", async () => { + let resolveImport: (value: ReturnType) => void = () => undefined; + const pendingImport = new Promise>((resolve) => { + resolveImport = resolve; + }); + mockCompanyPortabilityService.importBundle.mockReturnValueOnce(pendingImport); + const app = await createApp(cloudTenantActor()); + + const accepted = await request(app) + .post("/api/companies/import") + .set("x-paperclip-cloud-async-import", "1") + .set(cloudHeaders) + .send(importRequest); + + expect(accepted.status).toBe(202); + expect(accepted.body.job.status).toBe("running"); + expect(accepted.body.statusUrl).toMatch(/^\/api\/companies\/import\/jobs\/tenant-import-/); + expect(accepted.body.retryAfterMs).toBe(1000); + await waitForCondition(() => mockCompanyPortabilityService.importBundle.mock.calls.length === 1, "import job start"); + expect(mockCompanyPortabilityService.importBundle).toHaveBeenCalledWith(importRequest, "cloud-user-1"); + expect(mockLogActivity).not.toHaveBeenCalled(); + + resolveImport(createImportResult("updated")); + const succeeded = await waitForImportJobStatus(app, accepted.body.statusUrl, "succeeded"); + + expect(succeeded.status).toBe(200); + expect(succeeded.body.job.status).toBe("succeeded"); + expect(succeeded.body.job.result.companyId).toBe(companyId); + expect(succeeded.body.retryAfterMs).toBeUndefined(); + expect(mockLogActivity).toHaveBeenCalledWith(expect.anything(), expect.objectContaining({ + action: "company.imported", + companyId, + details: expect.objectContaining({ + agentCount: 1, + warningCount: 0, + companyAction: "updated", + }), + })); + + const nowSpy = vi.spyOn(Date, "now").mockReturnValue(Date.parse(succeeded.body.job.completedAt) + (5 * 60 * 1000) + 1); + try { + const expired = await request(app).get(accepted.body.statusUrl).set(cloudHeaders); + expect(expired.status).toBe(404); + expect(expired.body.error).toBe("Import job not found"); + } finally { + nowSpy.mockRestore(); + } + }); + + it.sequential("reports trusted Cloud async import job failures with the tenant error message", async () => { + mockCompanyPortabilityService.importBundle.mockRejectedValueOnce(new Error("tenant import exploded")); + const app = await createApp(cloudTenantActor()); + + const accepted = await request(app) + .post("/api/companies/import") + .set("x-paperclip-cloud-async-import", "1") + .set(cloudHeaders) + .send(importRequest); + + expect(accepted.status).toBe(202); + const failed = await waitForImportJobStatus(app, accepted.body.statusUrl, "failed"); + + expect(failed.status).toBe(200); + expect(failed.body.job.status).toBe("failed"); + expect(failed.body.job.error.message).toBe("tenant import exploded"); + expect(failed.body.retryAfterMs).toBeUndefined(); + expect(failed.body.message).toBe("tenant import exploded"); + expect(mockLogActivity).not.toHaveBeenCalled(); + }); + + it.sequential("accepts trusted Cloud async import jobs before validating the full import payload", async () => { + const app = await createApp(cloudTenantActor()); + + const accepted = await request(app) + .post("/api/companies/import") + .set("x-paperclip-cloud-async-import", "1") + .set(cloudHeaders) + .send({ target: { mode: "existing_company", companyId } }); + + expect(accepted.status).toBe(202); + expect(accepted.body.job.status).toBe("running"); + expect(mockCompanyPortabilityService.importBundle).not.toHaveBeenCalled(); + + const failed = await waitForImportJobStatus(app, accepted.body.statusUrl, "failed"); + + expect(failed.status).toBe(200); + expect(failed.body.job.status).toBe("failed"); + expect(failed.body.job.error.message).toEqual(expect.any(String)); + expect(mockCompanyPortabilityService.importBundle).not.toHaveBeenCalled(); + expect(mockLogActivity).not.toHaveBeenCalled(); + }); + + it.sequential("keeps global import apply synchronous when Cloud async opt-in is absent", async () => { + mockCompanyPortabilityService.importBundle.mockResolvedValueOnce(createImportResult("created")); + const app = await createApp(cloudTenantActor()); + + const res = await request(app) + .post("/api/companies/import") + .set(cloudHeaders) + .send(importRequest); + + expect(res.status).toBe(200); + expect(res.body.company.id).toBe(companyId); + expect(res.body.company.action).toBe("created"); + expect(res.body.job).toBeUndefined(); + expect(mockCompanyPortabilityService.importBundle).toHaveBeenCalledWith(importRequest, "cloud-user-1"); + expect(mockLogActivity).toHaveBeenCalledWith(expect.anything(), expect.objectContaining({ + action: "company.imported", + companyId, + })); + }); }); diff --git a/server/src/index.ts b/server/src/index.ts index d9c52d27..38caf44a 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -15,6 +15,7 @@ import { inspectMigrations, applyPendingMigrations, createEmbeddedPostgresLogBuffer, + prepareEmbeddedPostgresNativeRuntime, reconcilePendingMigrationHistory, formatDatabaseBackupResult, runDatabaseBackup, @@ -318,6 +319,7 @@ export async function startServer(): Promise { "Embedded PostgreSQL mode requires dependency `embedded-postgres`. Reinstall dependencies (without omitting required packages), or set DATABASE_URL for external Postgres.", ); } + await prepareEmbeddedPostgresNativeRuntime(); const dataDir = resolve(config.embeddedPostgresDataDir); const configuredPort = config.embeddedPostgresPort; diff --git a/server/src/routes/companies.ts b/server/src/routes/companies.ts index 2de26afb..3fe9664f 100644 --- a/server/src/routes/companies.ts +++ b/server/src/routes/companies.ts @@ -1,3 +1,4 @@ +import { randomUUID } from "node:crypto"; import { Router, type Request } from "express"; import type { Db } from "@paperclipai/db"; import { @@ -35,6 +36,8 @@ export function companyRoutes(db: Db, storage?: StorageService) { const access = accessService(db); const budgets = budgetService(db); const feedback = feedbackService(db); + const importJobs = new Map(); + const importJobTerminalRetentionMs = 5 * 60 * 1000; function parseBooleanQuery(value: unknown) { return value === true || value === "true" || value === "1"; @@ -177,27 +180,47 @@ export function companyRoutes(db: Db, storage?: StorageService) { res.json(preview); }); - router.post(COMPANY_IMPORT_ROUTE_PATH, validate(companyPortabilityImportSchema), async (req, res) => { + router.get("/import/jobs/:jobId", async (req, res) => { + assertCloudTenantCaller(req); + cleanupTerminalImportJobs(importJobs, importJobTerminalRetentionMs); + const job = importJobs.get(req.params.jobId as string); + if (!job || job.cloudTenantKey !== cloudTenantRequestKey(req)) { + res.status(404).json({ error: "Import job not found" }); + return; + } + res.json(importJobResponse(job)); + }); + + router.post(COMPANY_IMPORT_ROUTE_PATH, async (req, res) => { assertBoard(req); - assertImportTargetAccess(req, req.body.target); + const rawImportBody: unknown = req.body; const actor = getActorInfo(req); - const result = await portability.importBundle(req.body, req.actor.type === "board" ? req.actor.userId : null); - await logActivity(db, { - companyId: result.company.id, - actorType: actor.actorType, - actorId: actor.actorId, - action: "company.imported", - entityType: "company", - entityId: result.company.id, - agentId: actor.agentId, - runId: actor.runId, - details: { - include: req.body.include ?? null, - agentCount: result.agents.length, - warningCount: result.warnings.length, - companyAction: result.company.action, - }, - }); + const boardUserId = req.actor.type === "board" ? req.actor.userId : null; + if (req.header("x-paperclip-cloud-async-import") === "1") { + assertCloudTenantCaller(req); + cleanupTerminalImportJobs(importJobs, importJobTerminalRetentionMs); + const job = createImportJob(cloudTenantRequestKey(req)); + importJobs.set(job.id, job); + const operation = async () => { + const importBody = companyPortabilityImportSchema.parse(rawImportBody); + assertImportTargetAccess(req, importBody.target); + const activity = importedCompanyActivityContext(actor, importBody.include ?? null); + const result = await portability.importBundle(importBody, boardUserId); + await logImportedCompanyActivity(db, activity, result); + return result; + }; + res.status(202).json(importJobAcceptedResponse(job)); + setImmediate(() => { + void runImportJob(job, operation); + }); + return; + } + + const importBody = companyPortabilityImportSchema.parse(rawImportBody); + assertImportTargetAccess(req, importBody.target); + const activity = importedCompanyActivityContext(actor, importBody.include ?? null); + const result = await portability.importBundle(importBody, boardUserId); + await logImportedCompanyActivity(db, activity, result); res.json(result); }); @@ -419,3 +442,166 @@ export function companyRoutes(db: Db, storage?: StorageService) { return router; } + +type CompanyImportResult = { + company: { id: string; action: unknown }; + agents: unknown[]; + warnings: unknown[]; +}; + +interface ImportJobRecord { + id: string; + cloudTenantKey: string; + status: "running" | "succeeded" | "failed"; + createdAt: string; + updatedAt: string; + completedAt?: string; + error?: { message: string }; + result?: { + companyId: string; + agentCount: number; + warningCount: number; + companyAction: unknown; + }; +} + +interface ImportedCompanyActivityContext { + actorType: "user" | "agent"; + actorId: string; + agentId: string | null; + runId: string | null; + include: unknown; +} + +function assertCloudTenantCaller(req: Request) { + if (req.actor.source !== "cloud_tenant") { + throw forbidden("Trusted Cloud tenant access required"); + } +} + +function cloudTenantRequestKey(req: Request) { + return [ + req.actor.userId ?? "", + req.header("x-paperclip-cloud-stack-id")?.trim() ?? "", + req.header("x-paperclip-cloud-paperclip-company-id")?.trim() ?? "", + ].join(":"); +} + +function createImportJob(cloudTenantKey: string): ImportJobRecord { + const now = new Date().toISOString(); + return { + id: `tenant-import-${randomUUID()}`, + cloudTenantKey, + status: "running", + createdAt: now, + updatedAt: now, + }; +} + +async function runImportJob( + job: ImportJobRecord, + operation: () => Promise, +) { + try { + const result = await operation(); + const now = new Date().toISOString(); + job.status = "succeeded"; + job.updatedAt = now; + job.completedAt = now; + job.result = { + companyId: result.company.id, + agentCount: result.agents.length, + warningCount: result.warnings.length, + companyAction: result.company.action, + }; + } catch (error) { + const now = new Date().toISOString(); + job.status = "failed"; + job.updatedAt = now; + job.completedAt = now; + job.error = { message: errorMessage(error) }; + } +} + +function importedCompanyActivityContext( + actor: ReturnType, + include: unknown, +): ImportedCompanyActivityContext { + return { + actorType: actor.actorType, + actorId: actor.actorId, + agentId: actor.agentId, + runId: actor.runId, + include, + }; +} + +async function logImportedCompanyActivity( + db: Db, + activity: ImportedCompanyActivityContext, + result: CompanyImportResult, +) { + await logActivity(db, { + companyId: result.company.id, + actorType: activity.actorType, + actorId: activity.actorId, + action: "company.imported", + entityType: "company", + entityId: result.company.id, + agentId: activity.agentId, + runId: activity.runId, + details: { + include: activity.include, + agentCount: result.agents.length, + warningCount: result.warnings.length, + companyAction: result.company.action, + }, + }); +} + +function importJobAcceptedResponse(job: ImportJobRecord) { + return { + job: { + id: job.id, + status: job.status, + }, + statusUrl: `/api/companies/import/jobs/${encodeURIComponent(job.id)}`, + retryAfterMs: 1000, + }; +} + +function importJobResponse(job: ImportJobRecord) { + const isTerminal = job.status === "succeeded" || job.status === "failed"; + const response: Record = { + job: { + id: job.id, + status: job.status, + createdAt: job.createdAt, + updatedAt: job.updatedAt, + ...(job.completedAt ? { completedAt: job.completedAt } : {}), + ...(job.error ? { error: job.error } : {}), + ...(job.result ? { result: job.result } : {}), + }, + ...(isTerminal ? {} : { retryAfterMs: 1000 }), + }; + if (job.error?.message) { + response.error = job.error.message; + response.message = job.error.message; + response.reason = job.error.message; + } + return response; +} + +function cleanupTerminalImportJobs(importJobs: Map, terminalRetentionMs: number) { + const now = Date.now(); + for (const [jobId, job] of importJobs) { + if (job.status === "running" || !job.completedAt) continue; + if (now - Date.parse(job.completedAt) > terminalRetentionMs) { + importJobs.delete(jobId); + } + } +} + +function errorMessage(error: unknown) { + return error instanceof Error && error.message.trim() ? error.message : String(error); +}