diff --git a/cli/README.md b/cli/README.md index b23c32a6..8aa2cc16 100644 --- a/cli/README.md +++ b/cli/README.md @@ -226,6 +226,21 @@ By default, agents run on scheduled heartbeats and event-based triggers (task as
+## Paperclip Cloud Sync + +Cloud upstream sync is behind the `Cloud Sync` experimental setting. Enable it in Instance Settings before pushing. + +```bash +paperclipai cloud connect https://your-stack.paperclip.app +paperclipai cloud connect https://your-stack.paperclip.app --no-browser +paperclipai cloud push --company --dry-run +paperclipai cloud push --company +``` + +`cloud connect` authorizes the local instance against the target stack and stores the upstream token in the local instance secret store. The default path opens a browser for consent; `--no-browser` uses the device-code flow and prints the verification URL and user code. + +`cloud push --dry-run` exports the selected local company, sends a preview bundle to the connected Cloud stack, and exits with code `2` when conflicts need user resolution. A schema mismatch exits with code `3`. Running without `--dry-run` stages chunks idempotently, applies the run, and prints the final summary and recent progress events. + ## Development ```bash diff --git a/cli/src/__tests__/cloud.test.ts b/cli/src/__tests__/cloud.test.ts new file mode 100644 index 00000000..c66fd33f --- /dev/null +++ b/cli/src/__tests__/cloud.test.ts @@ -0,0 +1,243 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CompanyPortabilityExportResult } from "@paperclipai/shared"; +import { + assertDiscoveryCompatible, + buildBundleFromLocalCompany, + cloudCommandExitCodes, + connectCloud, + resolveDeviceCodeExpiresAt, +} from "../commands/client/cloud.js"; +import { + LocalUpstreamPushCoordinator, + normalizedContentHash, + type LocalUpstreamExportBundle, +} from "../commands/client/cloud-transfer.js"; +import { getCloudConnection } from "../commands/client/cloud-store.js"; + +const originalEnv = { ...process.env }; +const originalFetch = globalThis.fetch; + +describe("cloud CLI helpers", () => { + let tempHome: string; + + beforeEach(() => { + tempHome = fs.mkdtempSync(path.join(os.tmpdir(), "paperclip-cloud-cli-")); + process.env = { ...originalEnv, PAPERCLIP_HOME: tempHome }; + }); + + afterEach(() => { + process.env = { ...originalEnv }; + globalThis.fetch = originalFetch; + vi.restoreAllMocks(); + fs.rmSync(tempHome, { recursive: true, force: true }); + }); + + it("connects with the device-code flow and stores the resulting cloud connection", async () => { + globalThis.fetch = vi.fn(async (url, init) => { + const requestUrl = String(url); + if (requestUrl.endsWith("/.well-known/paperclip-upstream")) { + return jsonResponse(discovery()); + } + if (requestUrl.endsWith("/api/upstream-sync/device-code")) { + expect(JSON.parse(String(init?.body))).toMatchObject({ + stackId: "stack-1", + scopes: ["upstream_import:preview", "upstream_import:write", "upstream_import:read"], + }); + return jsonResponse({ + deviceCode: "device-1", + userCode: "ABCD-EFGH", + verificationUri: "https://cloud.example.test/api/upstream-sync/device-code/approve", + expiresAt: new Date(Date.now() + 60_000).toISOString(), + intervalSeconds: 0, + }); + } + if (requestUrl.endsWith("/api/upstream-sync/token")) { + return jsonResponse({ + accessToken: "upt_test", + scopes: ["upstream_import:preview"], + token: { + id: "token-1", + companyStackId: "stack-1", + targetOrigin: "https://cloud.example.test", + sourceInstanceId: "paperclip-local-default", + sourceInstanceFingerprint: "sha256:test", + scopes: ["upstream_import:preview"], + expiresAt: new Date(Date.now() + 60_000).toISOString(), + }, + }); + } + return jsonResponse({ error: "not_found" }, 404); + }) as typeof fetch; + + const connection = await connectCloud("https://cloud.example.test", { noBrowser: true, json: true }); + + expect(connection.accessToken).toBe("upt_test"); + expect(getCloudConnection("https://cloud.example.test")?.token.id).toBe("token-1"); + }); + + it("hard-blocks incompatible transfer schema versions with the stable schema exit code", () => { + expect(() => assertDiscoveryCompatible(discovery({ supportedSchemaMajor: 99 }))).toThrow(/schema mismatch/i); + expect(cloudCommandExitCodes.schemaMismatch).toBe(3); + }); + + it("falls back to a bounded device-code expiry when the cloud omits or malforms expiresAt", () => { + const now = Date.UTC(2026, 4, 22, 13, 0, 0); + const validExpiry = "2026-05-22T13:05:00.000Z"; + + expect(resolveDeviceCodeExpiresAt(validExpiry, now)).toBe(Date.parse(validExpiry)); + expect(resolveDeviceCodeExpiresAt(undefined, now)).toBe(now + 15 * 60_000); + expect(resolveDeviceCodeExpiresAt("not-a-date", now)).toBe(now + 15 * 60_000); + }); + + it("builds deterministic chunks with validated payload hashes", async () => { + const bundle = await buildTestBundle(); + + expect(bundle.chunks).toHaveLength(2); + expect(bundle.chunks[0]?.sha256).toBe(normalizedContentHash(bundle.chunks[0]?.payload)); + expect(bundle.manifest.chunks[0]?.manifestHash).toBe(bundle.manifest.manifestHash); + expect(bundle.manifest.idempotencyKey).toBe((await buildTestBundle()).manifest.idempotencyKey); + }); + + it("reuses the same manifest and chunk identity when an interrupted apply is retried", async () => { + const bundle = await buildTestBundle(); + const calls: Array<{ path: string; body: unknown }> = []; + const coordinator = new LocalUpstreamPushCoordinator({ + targetOrigin: "https://cloud.example.test", + paperclipCompanyId: "target-company-1", + fetch: async (url, init) => { + const parsed = new URL(String(url)); + const body = init?.body ? JSON.parse(String(init.body)) as unknown : {}; + calls.push({ path: parsed.pathname, body }); + if (parsed.pathname.endsWith("/runs")) return jsonResponse({ run: { id: "run-1" } }); + return jsonResponse({ run: { id: "run-1" }, summary: { create: 0, update: 0, adopt: 0, skip: 2, conflict: 0, staleMapping: 0 } }); + }, + }); + + await coordinator.apply(bundle); + await coordinator.apply(bundle); + + const runBodies = calls.filter((call) => call.path.endsWith("/runs")).map((call) => call.body as { manifest: { idempotencyKey: string } }); + const chunkBodies = calls.filter((call) => call.path.endsWith("/chunks")).map((call) => call.body as { chunkIndex: number; sha256: string }); + expect(runBodies).toHaveLength(2); + expect(runBodies[0]?.manifest.idempotencyKey).toBe(runBodies[1]?.manifest.idempotencyKey); + expect(chunkBodies[0]).toEqual(chunkBodies[2]); + expect(chunkBodies[1]).toEqual(chunkBodies[3]); + }); +}); + +async function buildTestBundle(): Promise { + return buildBundleFromLocalCompany({ + localCompanyId: "local-company-1", + connection: { + id: "conn-1", + remoteUrl: "https://cloud.example.test", + targetOrigin: "https://cloud.example.test", + targetHost: "cloud.example.test", + stackId: "stack-1", + targetCompanyId: "target-company-1", + accessToken: "upt_test", + token: { + id: "token-1", + companyStackId: "stack-1", + targetOrigin: "https://cloud.example.test", + sourceInstanceId: "paperclip-local-default", + sourceInstanceFingerprint: "sha256:test", + scopes: ["upstream_import:preview"], + expiresAt: new Date(Date.now() + 60_000).toISOString(), + }, + privateKeyPem: "unused", + sourcePublicKey: "unused", + sourceInstanceId: "paperclip-local-default", + sourceInstanceFingerprint: "sha256:test", + scopes: ["upstream_import:preview"], + createdAt: "2026-05-18T00:00:00.000Z", + updatedAt: "2026-05-18T00:00:00.000Z", + }, + discovery: discovery(), + localApi: { + post: async () => portabilityExport() as T, + }, + maxEntitiesPerChunk: 1, + mode: "apply", + }); +} + +function discovery(overrides: Partial<{ supportedSchemaMajor: number }> = {}) { + return { + schema: "paperclip-upstream-discovery-v1", + stack: { + id: "stack-1", + slug: "cloud-test", + displayName: "Cloud Test", + companyId: "target-company-1", + origin: "https://cloud.example.test", + }, + auth: { + deviceCode: { + deviceCodeUrl: "https://cloud.example.test/api/upstream-sync/device-code", + verificationUrl: "https://cloud.example.test/api/upstream-sync/device-code/approve", + tokenUrl: "https://cloud.example.test/api/upstream-sync/token", + }, + scopes: ["upstream_import:preview", "upstream_import:write", "upstream_import:read"], + }, + transfer: { + supportedSchemaMajor: overrides.supportedSchemaMajor ?? 1, + featureFlags: ["cloud_sync"], + }, + }; +} + +function portabilityExport(): CompanyPortabilityExportResult { + return { + rootPath: ".", + paperclipExtensionPath: ".paperclip.yaml", + manifest: { + schemaVersion: 1, + generatedAt: "2026-05-18T00:00:00.000Z", + source: { + companyId: "local-company-1", + companyName: "Local Company", + }, + includes: { + company: true, + agents: true, + projects: true, + issues: true, + skills: true, + }, + company: { + path: "company.json", + name: "Local Company", + description: null, + brandColor: null, + logoPath: null, + attachmentMaxBytes: null, + requireBoardApprovalForNewAgents: false, + feedbackDataSharingEnabled: false, + feedbackDataSharingConsentAt: null, + feedbackDataSharingConsentByUserId: null, + feedbackDataSharingTermsVersion: null, + }, + sidebar: null, + agents: [], + skills: [], + projects: [], + issues: [], + envInputs: [], + }, + files: { + "README.md": "Local Company", + }, + warnings: [], + }; +} + +function jsonResponse(body: unknown, status = 200): Response { + return new Response(JSON.stringify(body), { + status, + headers: { "Content-Type": "application/json" }, + }); +} diff --git a/cli/src/commands/client/cloud-store.ts b/cli/src/commands/client/cloud-store.ts new file mode 100644 index 00000000..fa63c713 --- /dev/null +++ b/cli/src/commands/client/cloud-store.ts @@ -0,0 +1,177 @@ +import fs from "node:fs"; +import path from "node:path"; +import { resolvePaperclipInstanceRoot } from "../../config/home.js"; + +export interface CloudConnectionTokenRecord { + id: string; + companyStackId: string; + targetOrigin: string; + sourceInstanceId: string; + sourceInstanceFingerprint: string; + scopes: string[]; + expiresAt: string; + [key: string]: unknown; +} + +export interface CloudConnection { + id: string; + remoteUrl: string; + targetOrigin: string; + targetHost: string; + stackId: string; + stackSlug?: string | null; + stackDisplayName?: string | null; + targetCompanyId: string; + accessToken: string; + token: CloudConnectionTokenRecord; + privateKeyPem: string; + sourcePublicKey: string; + sourceInstanceId: string; + sourceInstanceFingerprint: string; + scopes: string[]; + createdAt: string; + updatedAt: string; +} + +interface CloudConnectionStore { + version: 1; + connections: Record; + currentConnectionId?: string; +} + +function defaultStore(): CloudConnectionStore { + return { + version: 1, + connections: {}, + }; +} + +export function resolveCloudConnectionStorePath(): string { + return path.resolve(resolvePaperclipInstanceRoot(), "secrets", "cloud-upstream-connections.json"); +} + +export function readCloudConnectionStore(storePath = resolveCloudConnectionStorePath()): CloudConnectionStore { + if (!fs.existsSync(storePath)) return defaultStore(); + const raw = JSON.parse(fs.readFileSync(storePath, "utf8")) as Partial | null; + const connections: Record = {}; + if (raw?.connections && typeof raw.connections === "object") { + for (const [id, value] of Object.entries(raw.connections)) { + const normalized = normalizeConnection(value); + if (normalized) connections[id] = normalized; + } + } + const currentConnectionId = + typeof raw?.currentConnectionId === "string" && connections[raw.currentConnectionId] + ? raw.currentConnectionId + : Object.values(connections).sort((left, right) => right.updatedAt.localeCompare(left.updatedAt))[0]?.id; + return { + version: 1, + connections, + currentConnectionId, + }; +} + +export function writeCloudConnectionStore( + store: CloudConnectionStore, + storePath = resolveCloudConnectionStorePath(), +): void { + fs.mkdirSync(path.dirname(storePath), { recursive: true }); + fs.writeFileSync(storePath, `${JSON.stringify(store, null, 2)}\n`, { mode: 0o600 }); +} + +export function upsertCloudConnection( + connection: CloudConnection, + storePath = resolveCloudConnectionStorePath(), +): CloudConnection { + const store = readCloudConnectionStore(storePath); + const existing = store.connections[connection.id]; + const now = new Date().toISOString(); + const next = { + ...connection, + createdAt: existing?.createdAt ?? connection.createdAt ?? now, + updatedAt: now, + }; + store.connections[next.id] = next; + store.currentConnectionId = next.id; + writeCloudConnectionStore(store, storePath); + return next; +} + +export function getCloudConnection( + remoteUrlOrOrigin?: string, + storePath = resolveCloudConnectionStorePath(), +): CloudConnection | null { + const store = readCloudConnectionStore(storePath); + if (remoteUrlOrOrigin?.trim()) { + const needle = normalizeRemoteLookup(remoteUrlOrOrigin); + return Object.values(store.connections).find((connection) => + normalizeRemoteLookup(connection.remoteUrl) === needle || + normalizeRemoteLookup(connection.targetOrigin) === needle + ) ?? null; + } + return store.currentConnectionId ? store.connections[store.currentConnectionId] ?? null : null; +} + +function normalizeRemoteLookup(value: string): string { + try { + const url = new URL(value); + return url.origin.replace(/\/+$/u, ""); + } catch { + return value.trim().replace(/\/+$/u, ""); + } +} + +function normalizeConnection(value: unknown): CloudConnection | null { + if (typeof value !== "object" || value === null || Array.isArray(value)) return null; + const record = value as Record; + const id = stringValue(record.id); + const remoteUrl = stringValue(record.remoteUrl); + const targetOrigin = stringValue(record.targetOrigin); + const targetHost = stringValue(record.targetHost); + const stackId = stringValue(record.stackId); + const targetCompanyId = stringValue(record.targetCompanyId); + const accessToken = stringValue(record.accessToken); + const token = typeof record.token === "object" && record.token !== null && !Array.isArray(record.token) + ? record.token as CloudConnectionTokenRecord + : null; + const privateKeyPem = stringValue(record.privateKeyPem); + const sourcePublicKey = stringValue(record.sourcePublicKey); + const sourceInstanceId = stringValue(record.sourceInstanceId); + const sourceInstanceFingerprint = stringValue(record.sourceInstanceFingerprint); + const createdAt = stringValue(record.createdAt); + const updatedAt = stringValue(record.updatedAt); + if ( + !id || !remoteUrl || !targetOrigin || !targetHost || !stackId || !targetCompanyId || + !accessToken || !token || !privateKeyPem || !sourcePublicKey || !sourceInstanceId || + !sourceInstanceFingerprint || !createdAt || !updatedAt + ) { + return null; + } + return { + id, + remoteUrl, + targetOrigin, + targetHost, + stackId, + stackSlug: stringValue(record.stackSlug), + stackDisplayName: stringValue(record.stackDisplayName), + targetCompanyId, + accessToken, + token, + privateKeyPem, + sourcePublicKey, + sourceInstanceId, + sourceInstanceFingerprint, + scopes: stringArray(record.scopes), + createdAt, + updatedAt, + }; +} + +function stringValue(value: unknown): string | null { + return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; +} + +function stringArray(value: unknown): string[] { + return Array.isArray(value) ? value.filter((entry): entry is string => typeof entry === "string") : []; +} diff --git a/cli/src/commands/client/cloud-transfer.ts b/cli/src/commands/client/cloud-transfer.ts new file mode 100644 index 00000000..9cd1ccbe --- /dev/null +++ b/cli/src/commands/client/cloud-transfer.ts @@ -0,0 +1,297 @@ +import { createHash } from "node:crypto"; + +export const upstreamTransferSchema = { + family: "paperclip-upstream-transfer", + version: "1.0.0", + major: 1, + minor: 0, +} as const; + +export type NormalizedSha256 = `sha256:${string}`; + +export interface SourceEntityKey { + sourceInstanceId: string; + sourceCompanyId: string; + sourceEntityType: string; + sourceEntityId: string; + sourceNaturalKey?: string; +} + +export interface UpstreamTransferWarning { + code: string; + severity: "info" | "warning" | "blocker"; + message: string; + entity?: SourceEntityKey; +} + +export interface UpstreamTransferEntityRecord { + key: SourceEntityKey; + contentHash: NormalizedSha256; + dependencies: SourceEntityKey[]; + warnings: UpstreamTransferWarning[]; +} + +export interface UpstreamTransferManifestSource { + sourceInstanceId: string; + sourceCompanyId: string; + sourceInstanceKeyFingerprint: string; + exporterVersion: string; + sourceSchemaVersion: string; +} + +export interface UpstreamTransferManifestTarget { + targetStackId: string; + targetCompanyId: string; + targetOrigin: string; + supportedSchemaMajor: number; +} + +export interface UpstreamTransferChunk { + chunkIndex: number; + totalChunks: number; + byteLength: number; + sha256: NormalizedSha256; + manifestHash: NormalizedSha256; +} + +export interface UpstreamTransferManifest { + schema: typeof upstreamTransferSchema; + source: UpstreamTransferManifestSource; + target: UpstreamTransferManifestTarget; + runId: string; + idempotencyKey: string; + generatedAt: string; + entityCount: number; + entities: UpstreamTransferEntityRecord[]; + chunks: UpstreamTransferChunk[]; + warnings: UpstreamTransferWarning[]; + featureFlags: string[]; + manifestHash: NormalizedSha256; +} + +export interface LocalUpstreamExportEntityInput { + key: SourceEntityKey; + body: Record; + dependencies?: SourceEntityKey[]; + warnings?: UpstreamTransferWarning[]; + conflictKeys?: string[]; +} + +export interface LocalUpstreamExportEntity { + record: UpstreamTransferEntityRecord; + body: Record; + conflictKeys?: string[]; +} + +export interface LocalUpstreamExportChunk { + chunkIndex: number; + totalChunks: number; + byteLength: number; + sha256: NormalizedSha256; + payload: { + entityKeys: SourceEntityKey[]; + }; +} + +export interface LocalUpstreamExportBundle { + manifest: UpstreamTransferManifest; + entities: LocalUpstreamExportEntity[]; + chunks: LocalUpstreamExportChunk[]; +} + +export interface BuildLocalUpstreamExportBundleInput { + source: UpstreamTransferManifestSource; + target: UpstreamTransferManifestTarget; + runId: string; + idempotencyKey: string; + entities: LocalUpstreamExportEntityInput[]; + warnings?: UpstreamTransferWarning[]; + featureFlags?: string[]; + maxEntitiesPerChunk?: number; +} + +export interface LocalUpstreamPushCoordinatorOptions { + targetOrigin: string; + paperclipCompanyId: string; + fetch?: typeof fetch; + headers?: (input: { method: string; path: string }) => HeadersInit | Promise; +} + +export class UpstreamImportRequestError extends Error { + readonly status: number; + readonly body: unknown; + + constructor(status: number, message: string, body: unknown) { + super(message); + this.status = status; + this.body = body; + } +} + +export class LocalUpstreamPushCoordinator { + readonly #targetOrigin: string; + readonly #paperclipCompanyId: string; + readonly #fetch: typeof fetch; + readonly #headers: NonNullable; + + constructor(options: LocalUpstreamPushCoordinatorOptions) { + this.#targetOrigin = options.targetOrigin.replace(/\/+$/u, ""); + this.#paperclipCompanyId = options.paperclipCompanyId; + this.#fetch = options.fetch ?? fetch; + this.#headers = options.headers ?? (() => ({})); + } + + async preview(bundle: LocalUpstreamExportBundle): Promise { + return this.post(`/api/companies/${encodeURIComponent(this.#paperclipCompanyId)}/upstream-imports/preview`, { + manifest: bundle.manifest, + entities: bundle.entities, + }); + } + + async apply(bundle: LocalUpstreamExportBundle): Promise { + const run = await this.post(`/api/companies/${encodeURIComponent(this.#paperclipCompanyId)}/upstream-imports/runs`, { + mode: "apply", + manifest: bundle.manifest, + entities: bundle.entities, + }) as { run?: { id?: unknown } }; + const runId = typeof run.run?.id === "string" ? run.run.id : undefined; + if (!runId) { + throw new Error("Remote upstream importer did not return a run id"); + } + + for (const chunk of bundle.chunks) { + await this.post(`/api/upstream-import-runs/${encodeURIComponent(runId)}/chunks`, chunk); + } + + return this.post(`/api/upstream-import-runs/${encodeURIComponent(runId)}/apply`, {}); + } + + async events(runId: string): Promise { + return this.get(`/api/upstream-import-runs/${encodeURIComponent(runId)}/events`); + } + + private async get(path: string): Promise { + const response = await this.#fetch(`${this.#targetOrigin}${path}`, { + method: "GET", + headers: await this.#headers({ method: "GET", path }), + }); + return parseCoordinatorResponse(response); + } + + private async post(path: string, body: unknown): Promise { + const response = await this.#fetch(`${this.#targetOrigin}${path}`, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(await this.#headers({ method: "POST", path })), + }, + body: JSON.stringify(body), + }); + return parseCoordinatorResponse(response); + } +} + +export function buildLocalUpstreamExportBundle( + input: BuildLocalUpstreamExportBundleInput, +): LocalUpstreamExportBundle { + const entities = input.entities.map((entity) => ({ + record: { + key: entity.key, + contentHash: normalizedContentHash(entity.body), + dependencies: entity.dependencies ?? [], + warnings: entity.warnings ?? [], + }, + body: entity.body, + conflictKeys: entity.conflictKeys, + })); + const chunks = buildLocalChunks(entities, input.maxEntitiesPerChunk ?? 100); + const manifestWithoutHash = { + schema: upstreamTransferSchema, + source: input.source, + target: input.target, + runId: input.runId, + idempotencyKey: input.idempotencyKey, + generatedAt: new Date(0).toISOString(), + entityCount: entities.length, + entities: entities.map((entity) => entity.record), + chunks: chunks.map(({ payload: _payload, ...chunk }) => chunk), + warnings: input.warnings ?? [], + featureFlags: (input.featureFlags ?? ["cloud_sync"]).slice().sort(), + }; + const manifestHash = normalizedContentHash(manifestWithoutHash); + return { + manifest: { + ...manifestWithoutHash, + chunks: manifestWithoutHash.chunks.map((chunk) => ({ ...chunk, manifestHash })), + manifestHash, + }, + entities, + chunks, + }; +} + +export function normalizedContentHash(value: unknown): NormalizedSha256 { + return `sha256:${createHash("sha256").update(canonicalJson(value)).digest("hex")}`; +} + +export function canonicalJson(value: unknown): string { + return JSON.stringify(sortJson(value)); +} + +function buildLocalChunks( + entities: LocalUpstreamExportEntity[], + maxEntitiesPerChunk: number, +): LocalUpstreamExportChunk[] { + if (!Number.isInteger(maxEntitiesPerChunk) || maxEntitiesPerChunk < 1) { + throw new Error("maxEntitiesPerChunk must be a positive integer"); + } + if (entities.length === 0) return []; + + const groups: LocalUpstreamExportEntity[][] = []; + for (let index = 0; index < entities.length; index += maxEntitiesPerChunk) { + groups.push(entities.slice(index, index + maxEntitiesPerChunk)); + } + + return groups.map((group, index) => { + const payload = { + entityKeys: group.map((entity) => entity.record.key), + }; + return { + chunkIndex: index, + totalChunks: groups.length, + byteLength: Buffer.byteLength(canonicalJson(payload)), + sha256: normalizedContentHash(payload), + payload, + }; + }); +} + +function sortJson(value: unknown): unknown { + if (Array.isArray(value)) return value.map(sortJson); + if (typeof value !== "object" || value === null) return value; + return Object.fromEntries( + Object.entries(value as Record) + .sort(([left], [right]) => left.localeCompare(right)) + .map(([key, entry]) => [key, sortJson(entry)]), + ); +} + +async function parseCoordinatorResponse(response: Response): Promise { + const text = await response.text(); + const parsed = text.trim() ? safeParseJson(text) : {}; + if (!response.ok) { + const message = typeof parsed === "object" && parsed !== null && "error" in parsed + ? String((parsed as { error: unknown }).error) + : `Upstream importer request failed with ${response.status}`; + throw new UpstreamImportRequestError(response.status, message, parsed); + } + return parsed; +} + +function safeParseJson(text: string): unknown { + try { + return JSON.parse(text); + } catch { + return text; + } +} diff --git a/cli/src/commands/client/cloud.ts b/cli/src/commands/client/cloud.ts new file mode 100644 index 00000000..f87098a0 --- /dev/null +++ b/cli/src/commands/client/cloud.ts @@ -0,0 +1,721 @@ +import { createHash, generateKeyPairSync, randomBytes, randomUUID, sign } from "node:crypto"; +import { createServer, type Server } from "node:http"; +import { URL } from "node:url"; +import { Command } from "commander"; +import pc from "picocolors"; +import type { + CompanyPortabilityExportResult, + CompanyPortabilityFileEntry, + InstanceExperimentalSettings, +} from "@paperclipai/shared"; +import { openUrl } from "../../client/board-auth.js"; +import { resolvePaperclipInstanceId } from "../../config/home.js"; +import { + addCommonClientOptions, + handleCommandError, + printOutput, + resolveCommandContext, + type BaseClientOptions, +} from "./common.js"; +import { + buildLocalUpstreamExportBundle, + LocalUpstreamPushCoordinator, + normalizedContentHash, + upstreamTransferSchema, + UpstreamImportRequestError, + type LocalUpstreamExportBundle, + type LocalUpstreamExportEntityInput, + type SourceEntityKey, + type UpstreamTransferManifestSource, + type UpstreamTransferManifestTarget, + type UpstreamTransferWarning, +} from "./cloud-transfer.js"; +import { + getCloudConnection, + upsertCloudConnection, + type CloudConnection, + type CloudConnectionTokenRecord, +} from "./cloud-store.js"; + +const CLOUD_SYNC_CONFLICT_EXIT_CODE = 2; +const CLOUD_SYNC_SCHEMA_MISMATCH_EXIT_CODE = 3; +const CLOUD_SYNC_SCOPES = ["upstream_import:preview", "upstream_import:write", "upstream_import:read"]; +const DEVICE_CODE_FALLBACK_EXPIRES_MS = 15 * 60_000; + +interface CloudConnectOptions extends BaseClientOptions { + noBrowser?: boolean; +} + +interface CloudPushOptions extends BaseClientOptions { + company?: string; + remoteUrl?: string; + dryRun?: boolean; + maxEntitiesPerChunk?: number; +} + +interface UpstreamDiscovery { + schema: string; + stack: { + id: string; + slug?: string; + displayName?: string; + companyId: string; + origin: string; + }; + auth: { + pkce?: { + authorizeUrl: string; + tokenUrl: string; + codeChallengeMethod: string; + }; + deviceCode?: { + deviceCodeUrl: string; + verificationUrl: string; + tokenUrl: string; + }; + scopes?: string[]; + }; + transfer: { + supportedSchemaMajor: number; + featureFlags?: string[]; + }; +} + +interface TokenResponse { + accessToken: string; + token: CloudConnectionTokenRecord; + scopes?: string[]; + expiresAt?: string; +} + +class CloudAuthRequestError extends Error { + readonly status: number; + readonly body: unknown; + + constructor(status: number, message: string, body: unknown) { + super(message); + this.status = status; + this.body = body; + } +} + +export function registerCloudCommands(program: Command): void { + const cloud = program.command("cloud").description("Paperclip Cloud upstream sync commands"); + + addCommonClientOptions( + cloud + .command("connect") + .description("Authorize this local instance to push into a Paperclip Cloud stack") + .argument("", "Paperclip Cloud stack URL") + .option("--no-browser", "Use the device-code flow instead of opening a browser", false) + .action(async (remoteUrl: string, opts: CloudConnectOptions) => { + try { + await connectCloud(remoteUrl, opts); + } catch (err) { + handleCommandError(err); + } + }), + ); + + addCommonClientOptions( + cloud + .command("push") + .description("Preview or apply a local company push into the connected Paperclip Cloud stack") + .requiredOption("--company ", "Local company ID to export") + .option("--remote-url ", "Use a specific stored cloud connection") + .option("--dry-run", "Preview without applying", false) + .option("--max-entities-per-chunk ", "Chunk size for upstream uploads", (value) => Number(value), 100) + .action(async (opts: CloudPushOptions) => { + try { + await pushCloud(opts); + } catch (err) { + if (isSchemaMismatchError(err)) { + console.error(pc.red(err instanceof Error ? err.message : String(err))); + process.exitCode = CLOUD_SYNC_SCHEMA_MISMATCH_EXIT_CODE; + return; + } + handleCommandError(err); + } + }), + ); +} + +export async function connectCloud(remoteUrl: string, opts: CloudConnectOptions = {}): Promise { + const ctx = resolveCommandContext(opts); + const discovery = await discoverUpstream(remoteUrl); + assertDiscoveryCompatible(discovery); + const source = createSourceIdentity(); + const token = await authorizeConnection(discovery, source, { + noBrowser: Boolean(opts.noBrowser), + }); + const targetOrigin = discovery.stack.origin.replace(/\/+$/u, ""); + const targetHost = new URL(targetOrigin).host; + const now = new Date().toISOString(); + const connection = upsertCloudConnection({ + id: connectionId(targetOrigin), + remoteUrl, + targetOrigin, + targetHost, + stackId: discovery.stack.id, + stackSlug: discovery.stack.slug ?? null, + stackDisplayName: discovery.stack.displayName ?? null, + targetCompanyId: discovery.stack.companyId, + accessToken: token.accessToken, + token: token.token, + privateKeyPem: source.privateKeyPem, + sourcePublicKey: source.sourcePublicKey, + sourceInstanceId: source.sourceInstanceId, + sourceInstanceFingerprint: source.sourceInstanceFingerprint, + scopes: token.scopes ?? token.token.scopes ?? CLOUD_SYNC_SCOPES, + createdAt: now, + updatedAt: now, + }); + + if (ctx.json) { + printOutput(redactConnection(connection), { json: true }); + } else { + console.log(pc.bold("Connected to Paperclip Cloud")); + console.log(`stack=${connection.stackDisplayName ?? connection.stackSlug ?? connection.stackId}`); + console.log(`origin=${connection.targetOrigin}`); + console.log(`company=${connection.targetCompanyId}`); + } + return connection; +} + +export async function pushCloud(opts: CloudPushOptions): Promise { + const ctx = resolveCommandContext(opts, { requireCompany: false }); + const localCompanyId = requiredString(opts.company, "--company"); + await assertCloudSyncEnabled(ctx.api.get("/api/instance/settings/experimental")); + const connection = getCloudConnection(opts.remoteUrl); + if (!connection) { + throw new Error("No cloud connection found. Run `paperclipai cloud connect ` first."); + } + + const discovery = await discoverUpstream(connection.targetOrigin); + assertDiscoveryCompatible(discovery); + const bundle = await buildBundleFromLocalCompany({ + localCompanyId, + connection, + discovery, + localApi: ctx.api, + maxEntitiesPerChunk: opts.maxEntitiesPerChunk, + mode: opts.dryRun ? "preview" : "apply", + }); + const coordinator = new LocalUpstreamPushCoordinator({ + targetOrigin: connection.targetOrigin, + paperclipCompanyId: connection.targetCompanyId, + headers: ({ method, path }) => cloudProofHeaders(connection, method, path), + }); + + const result = opts.dryRun ? await coordinator.preview(bundle) : await coordinator.apply(bundle); + const runId = getRunId(result); + const events = !opts.dryRun && runId ? await coordinator.events(runId).catch(() => null) : null; + const summary = summarizeResult(result); + const conflictCount = summary.conflict + summary.staleMapping; + + if (ctx.json) { + printOutput({ result, events }, { json: true }); + } else { + console.log(pc.bold(opts.dryRun ? "Cloud Push Preview" : "Cloud Push Applied")); + console.log(`run=${runId ?? "-"}`); + console.log(`manifest=${bundle.manifest.manifestHash}`); + console.log( + `create=${summary.create} update=${summary.update} adopt=${summary.adopt} ` + + `skip=${summary.skip} conflict=${summary.conflict} staleMapping=${summary.staleMapping}`, + ); + printWarnings(result); + printConflicts(result); + printEvents(events); + } + + if (conflictCount > 0) { + process.exitCode = CLOUD_SYNC_CONFLICT_EXIT_CODE; + } + return result; +} + +export async function discoverUpstream(remoteUrl: string): Promise { + const base = new URL(remoteUrl); + const discoveryUrl = new URL("/.well-known/paperclip-upstream", base); + return requestCloudJson(discoveryUrl.toString(), { method: "GET" }); +} + +export function assertDiscoveryCompatible(discovery: UpstreamDiscovery): void { + if (discovery.schema !== "paperclip-upstream-discovery-v1") { + throw new Error("Remote URL is not a Paperclip Cloud upstream target."); + } + if (discovery.transfer.supportedSchemaMajor !== upstreamTransferSchema.major) { + throw new Error( + `Cloud upstream schema mismatch: local major ${upstreamTransferSchema.major}, remote supports ${discovery.transfer.supportedSchemaMajor}.`, + ); + } + if (!discovery.transfer.featureFlags?.includes("cloud_sync")) { + throw new Error("Remote Paperclip Cloud stack does not advertise the cloud_sync transfer flag."); + } +} + +export function resolveDeviceCodeExpiresAt(expiresAt: string | undefined, nowMs = Date.now()): number { + const parsed = typeof expiresAt === "string" ? Date.parse(expiresAt) : NaN; + return Number.isFinite(parsed) ? parsed : nowMs + DEVICE_CODE_FALLBACK_EXPIRES_MS; +} + +export async function buildBundleFromLocalCompany(input: { + localCompanyId: string; + connection: CloudConnection; + discovery: UpstreamDiscovery; + localApi: { + post(path: string, body?: unknown): Promise; + }; + maxEntitiesPerChunk?: number; + mode: "preview" | "apply"; +}): Promise { + const exported = await input.localApi.post( + `/api/companies/${input.localCompanyId}/export`, + { + include: { + company: true, + agents: true, + projects: true, + issues: true, + skills: true, + }, + expandReferencedSkills: true, + }, + ); + if (!exported) throw new Error("Local company export returned no data."); + + const sourceHash = normalizedContentHash({ + manifest: exported.manifest, + files: exported.files, + }); + const source: UpstreamTransferManifestSource = { + sourceInstanceId: input.connection.sourceInstanceId, + sourceCompanyId: input.localCompanyId, + sourceInstanceKeyFingerprint: input.connection.sourceInstanceFingerprint, + exporterVersion: "paperclipai-cli-cloud-v1", + sourceSchemaVersion: "paperclip-local-portability-v1", + }; + const target: UpstreamTransferManifestTarget = { + targetStackId: input.discovery.stack.id, + targetCompanyId: input.discovery.stack.companyId, + targetOrigin: input.discovery.stack.origin, + supportedSchemaMajor: input.discovery.transfer.supportedSchemaMajor, + }; + const entities = buildEntitiesFromPortableExport(input.localCompanyId, input.connection.sourceInstanceId, exported); + const idempotencyKey = [ + input.mode, + input.connection.sourceInstanceId, + input.localCompanyId, + input.discovery.stack.id, + sourceHash, + ].join(":"); + return buildLocalUpstreamExportBundle({ + source, + target, + runId: `local-${input.mode}-${shortHash(idempotencyKey)}`, + idempotencyKey, + entities, + warnings: exported.warnings.map((message): UpstreamTransferWarning => ({ + code: "local_company_export_warning", + severity: "warning", + message, + })), + featureFlags: ["cloud_sync"], + maxEntitiesPerChunk: input.maxEntitiesPerChunk, + }); +} + +async function authorizeConnection( + discovery: UpstreamDiscovery, + source: ReturnType, + opts: { noBrowser: boolean }, +): Promise { + if (!opts.noBrowser && canOpenBrowser() && discovery.auth.pkce) { + try { + return await authorizeWithBrowser(discovery, source); + } catch (error) { + console.error(pc.yellow(`Browser authorization failed; falling back to device-code flow. ${errorMessage(error)}`)); + } + } + if (!discovery.auth.deviceCode) { + throw new Error("Remote Paperclip Cloud stack does not support device-code authorization."); + } + return authorizeWithDeviceCode(discovery, source, { openBrowser: !opts.noBrowser && canOpenBrowser() }); +} + +async function authorizeWithBrowser( + discovery: UpstreamDiscovery, + source: ReturnType, +): Promise { + const pkce = discovery.auth.pkce; + if (!pkce) throw new Error("Remote did not advertise PKCE authorization."); + const callback = await startPkceCallbackServer(); + const verifier = randomBytes(32).toString("base64url"); + const challenge = createHash("sha256").update(verifier).digest("base64url"); + const state = randomUUID(); + const authorizeUrl = new URL(pkce.authorizeUrl); + authorizeUrl.searchParams.set("redirectUri", callback.redirectUri); + authorizeUrl.searchParams.set("state", state); + authorizeUrl.searchParams.set("codeChallenge", challenge); + authorizeUrl.searchParams.set("codeChallengeMethod", "S256"); + authorizeUrl.searchParams.set("sourceInstanceId", source.sourceInstanceId); + authorizeUrl.searchParams.set("sourceInstanceFingerprint", source.sourceInstanceFingerprint); + authorizeUrl.searchParams.set("sourcePublicKey", source.sourcePublicKey); + authorizeUrl.searchParams.set("scopes", CLOUD_SYNC_SCOPES.join(" ")); + + try { + console.error(`Open this URL to approve cloud sync:\n${authorizeUrl.toString()}`); + if (!openUrl(authorizeUrl.toString())) { + throw new Error("Could not open a browser."); + } + const code = await callback.waitForCode(state); + return requestCloudJson(pkce.tokenUrl, { + method: "POST", + body: JSON.stringify({ + grantType: "authorization_code", + code, + redirectUri: callback.redirectUri, + codeVerifier: verifier, + }), + }); + } finally { + await callback.close(); + } +} + +async function authorizeWithDeviceCode( + discovery: UpstreamDiscovery, + source: ReturnType, + opts: { openBrowser: boolean }, +): Promise { + const device = discovery.auth.deviceCode; + if (!device) throw new Error("Remote did not advertise device-code authorization."); + const response = await requestCloudJson<{ + deviceCode: string; + userCode: string; + verificationUri: string; + expiresAt?: string; + intervalSeconds?: number; + }>(device.deviceCodeUrl, { + method: "POST", + body: JSON.stringify({ + stackId: discovery.stack.id, + sourceInstanceId: source.sourceInstanceId, + sourceInstanceFingerprint: source.sourceInstanceFingerprint, + sourcePublicKey: source.sourcePublicKey, + scopes: CLOUD_SYNC_SCOPES, + }), + }); + console.error(pc.bold("Cloud device authorization required")); + console.error(`Open: ${response.verificationUri}`); + console.error(`Code: ${response.userCode}`); + if (opts.openBrowser) openUrl(response.verificationUri); + + const expiresAt = resolveDeviceCodeExpiresAt(response.expiresAt); + const intervalMs = Math.max(500, (response.intervalSeconds ?? 5) * 1000); + while (Date.now() < expiresAt) { + await sleep(intervalMs); + try { + return await requestCloudJson(device.tokenUrl, { + method: "POST", + body: JSON.stringify({ + grantType: "device_code", + deviceCode: response.deviceCode, + }), + }); + } catch (error) { + if (error instanceof CloudAuthRequestError && error.body && typeof error.body === "object") { + const code = (error.body as { error?: unknown }).error; + if (code === "authorization_pending") continue; + } + throw error; + } + } + throw new Error("Device-code authorization expired before it was approved."); +} + +function buildEntitiesFromPortableExport( + localCompanyId: string, + sourceInstanceId: string, + exported: CompanyPortabilityExportResult, +): LocalUpstreamExportEntityInput[] { + const companyKey: SourceEntityKey = { + sourceInstanceId, + sourceCompanyId: localCompanyId, + sourceEntityType: "company", + sourceEntityId: localCompanyId, + sourceNaturalKey: exported.manifest.company?.name ?? localCompanyId, + }; + const entities: LocalUpstreamExportEntityInput[] = [ + { + key: companyKey, + body: { + kind: "paperclip_company_portability_manifest", + manifest: exported.manifest, + rootPath: exported.rootPath, + paperclipExtensionPath: exported.paperclipExtensionPath, + fileCount: Object.keys(exported.files).length, + }, + conflictKeys: [`company:${companyKey.sourceNaturalKey ?? localCompanyId}`], + }, + ]; + + for (const [filePath, entry] of Object.entries(exported.files).sort(([left], [right]) => left.localeCompare(right))) { + entities.push({ + key: { + sourceInstanceId, + sourceCompanyId: localCompanyId, + sourceEntityType: "company_setting", + sourceEntityId: shortHash(filePath), + sourceNaturalKey: filePath, + }, + body: { + kind: "paperclip_portable_file", + path: filePath, + entry: normalizePortableFileEntry(entry), + }, + dependencies: [companyKey], + conflictKeys: [`portable_file:${filePath}`], + }); + } + return entities; +} + +function normalizePortableFileEntry(entry: CompanyPortabilityFileEntry): Record { + if (typeof entry === "string") { + return { encoding: "utf8", data: entry }; + } + return { ...entry }; +} + +async function assertCloudSyncEnabled(settingsPromise: Promise): Promise { + const settings = await settingsPromise; + if (settings?.enableCloudSync !== true) { + throw new Error( + "Cloud sync is disabled. Enable the cloud sync experimental setting before running `paperclipai cloud push`.", + ); + } +} + +function cloudProofHeaders(connection: CloudConnection, method: string, pathAndSearch: string): Record { + const timestamp = new Date().toISOString(); + const nonce = randomUUID(); + const payload = [ + method, + connection.targetHost.toLowerCase(), + pathAndSearch, + connection.token.id, + connection.sourceInstanceId, + timestamp, + nonce, + ].join("\n"); + return { + Authorization: `Bearer ${connection.accessToken}`, + "X-Paperclip-Upstream-Source-Instance-Id": connection.sourceInstanceId, + "X-Paperclip-Upstream-Proof-Timestamp": timestamp, + "X-Paperclip-Upstream-Proof-Nonce": nonce, + "X-Paperclip-Upstream-Proof-Signature": sign( + null, + Buffer.from(payload, "utf8"), + connection.privateKeyPem, + ).toString("base64url"), + }; +} + +async function requestCloudJson(url: string, init: RequestInit): Promise { + const headers = new Headers(init.headers); + headers.set("accept", "application/json"); + if (init.body !== undefined && !headers.has("content-type")) { + headers.set("content-type", "application/json"); + } + const response = await fetch(url, { ...init, headers }); + const text = await response.text(); + const parsed = text.trim() ? JSON.parse(text) as unknown : {}; + if (!response.ok) { + const message = typeof parsed === "object" && parsed !== null && "error" in parsed + ? String((parsed as { error: unknown }).error) + : `Cloud request failed with ${response.status}`; + throw new CloudAuthRequestError(response.status, message, parsed); + } + return parsed as T; +} + +function createSourceIdentity() { + const { publicKey, privateKey } = generateKeyPairSync("ed25519"); + const sourcePublicKey = publicKey.export({ type: "spki", format: "pem" }).toString(); + const sourceInstanceFingerprint = `sha256:${createHash("sha256") + .update(publicKey.export({ type: "spki", format: "der" })) + .digest("hex")}`; + return { + sourceInstanceId: `paperclip-local-${resolvePaperclipInstanceId()}`, + sourceInstanceFingerprint, + sourcePublicKey, + privateKeyPem: privateKey.export({ type: "pkcs8", format: "pem" }).toString(), + }; +} + +async function startPkceCallbackServer(): Promise<{ + redirectUri: string; + waitForCode: (state: string) => Promise; + close: () => Promise; +}> { + let resolveCode: ((code: string) => void) | null = null; + let rejectCode: ((error: Error) => void) | null = null; + let expectedState = ""; + const codePromise = new Promise((resolve, reject) => { + resolveCode = resolve; + rejectCode = reject; + }); + const server = createServer((req, res) => { + const url = new URL(req.url ?? "/", "http://127.0.0.1"); + const code = url.searchParams.get("code"); + const state = url.searchParams.get("state"); + if (!code || state !== expectedState) { + res.writeHead(400, { "Content-Type": "text/plain" }); + res.end("Paperclip Cloud authorization failed. You can close this tab."); + rejectCode?.(new Error("Authorization callback was missing a valid code or state.")); + return; + } + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end("Paperclip Cloud authorization complete. You can close this tab."); + resolveCode?.(code); + }); + await listenOnLoopback(server); + const address = server.address(); + if (typeof address !== "object" || !address?.port) { + throw new Error("Failed to start local authorization callback server."); + } + return { + redirectUri: `http://127.0.0.1:${address.port}/cloud/callback`, + waitForCode: (state: string) => { + expectedState = state; + return codePromise; + }, + close: () => closeServer(server), + }; +} + +function listenOnLoopback(server: Server): Promise { + return new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + server.off("error", reject); + resolve(); + }); + }); +} + +function closeServer(server: Server): Promise { + return new Promise((resolve, reject) => { + server.close((error) => error ? reject(error) : resolve()); + }); +} + +function canOpenBrowser(): boolean { + if (process.platform === "darwin" || process.platform === "win32") return true; + return Boolean(process.env.DISPLAY || process.env.WAYLAND_DISPLAY); +} + +function summarizeResult(result: unknown): { + create: number; + update: number; + adopt: number; + skip: number; + conflict: number; + staleMapping: number; +} { + const summary = asRecord(asRecord(result)?.summary); + return { + create: numberValue(summary?.create), + update: numberValue(summary?.update), + adopt: numberValue(summary?.adopt), + skip: numberValue(summary?.skip), + conflict: numberValue(summary?.conflict), + staleMapping: numberValue(summary?.staleMapping), + }; +} + +function printWarnings(result: unknown): void { + const warnings = Array.isArray(asRecord(result)?.warnings) ? asRecord(result)?.warnings as unknown[] : []; + for (const warning of warnings) { + const record = asRecord(warning); + console.log(pc.yellow(`warning=${record?.code ?? "warning"} ${record?.message ?? ""}`.trim())); + } +} + +function printConflicts(result: unknown): void { + const conflicts = Array.isArray(asRecord(result)?.conflicts) ? asRecord(result)?.conflicts as unknown[] : []; + for (const conflict of conflicts.slice(0, 10)) { + const record = asRecord(conflict); + console.log(pc.red(`conflict=${record?.conflictKind ?? "target_conflict"} target=${record?.targetEntityId ?? "-"}`)); + } + if (conflicts.length > 10) console.log(pc.red(`conflicts_truncated=${conflicts.length - 10}`)); +} + +function printEvents(events: unknown): void { + const rows = Array.isArray(asRecord(events)?.events) ? asRecord(events)?.events as unknown[] : []; + for (const row of rows.slice(-10)) { + const event = asRecord(row); + console.log(pc.dim(`event=${event?.action ?? "-"} target=${event?.targetEntityId ?? "-"}`)); + } +} + +function getRunId(result: unknown): string | null { + const run = asRecord(asRecord(result)?.run); + return typeof run?.id === "string" ? run.id : null; +} + +function redactConnection(connection: CloudConnection): Record { + return { + id: connection.id, + remoteUrl: connection.remoteUrl, + targetOrigin: connection.targetOrigin, + stackId: connection.stackId, + targetCompanyId: connection.targetCompanyId, + scopes: connection.scopes, + expiresAt: connection.token.expiresAt, + }; +} + +function connectionId(targetOrigin: string): string { + return `cloud-${shortHash(targetOrigin)}`; +} + +function shortHash(value: string): string { + return createHash("sha256").update(value).digest("hex").slice(0, 16); +} + +function requiredString(value: unknown, label: string): string { + if (typeof value === "string" && value.trim()) return value.trim(); + throw new Error(`${label} is required.`); +} + +function numberValue(value: unknown): number { + return typeof value === "number" && Number.isFinite(value) ? value : 0; +} + +function asRecord(value: unknown): Record | null { + return typeof value === "object" && value !== null && !Array.isArray(value) + ? value as Record + : null; +} + +function isSchemaMismatchError(error: unknown): boolean { + if (error instanceof UpstreamImportRequestError) { + return JSON.stringify(error.body).toLowerCase().includes("schema"); + } + return error instanceof Error && error.message.toLowerCase().includes("schema mismatch"); +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export const cloudCommandExitCodes = { + conflict: CLOUD_SYNC_CONFLICT_EXIT_CODE, + schemaMismatch: CLOUD_SYNC_SCHEMA_MISMATCH_EXIT_CODE, +} as const; diff --git a/cli/src/index.ts b/cli/src/index.ts index f1a2084a..b9fd1159 100644 --- a/cli/src/index.ts +++ b/cli/src/index.ts @@ -19,6 +19,7 @@ import { registerDashboardCommands } from "./commands/client/dashboard.js"; import { registerRoutineCommands } from "./commands/routines.js"; import { registerFeedbackCommands } from "./commands/client/feedback.js"; import { registerSecretCommands } from "./commands/client/secrets.js"; +import { registerCloudCommands } from "./commands/client/cloud.js"; import { applyDataDirOverride, type DataDirOptionLike } from "./config/data-dir.js"; import { loadPaperclipEnvFile } from "./config/env.js"; import { initTelemetryFromConfigFile, flushTelemetry } from "./telemetry.js"; @@ -149,6 +150,7 @@ registerDashboardCommands(program); registerRoutineCommands(program); registerFeedbackCommands(program); registerSecretCommands(program); +registerCloudCommands(program); registerWorktreeCommands(program); registerEnvLabCommands(program); registerPluginCommands(program); diff --git a/packages/db/src/migrations/0089_cloud_upstreams.sql b/packages/db/src/migrations/0089_cloud_upstreams.sql new file mode 100644 index 00000000..caf14522 --- /dev/null +++ b/packages/db/src/migrations/0089_cloud_upstreams.sql @@ -0,0 +1,71 @@ +CREATE TABLE IF NOT EXISTS "cloud_upstream_connections" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "company_id" uuid NOT NULL, + "remote_url" text NOT NULL, + "source_instance_id" text NOT NULL, + "source_instance_fingerprint" text NOT NULL, + "source_public_key" text NOT NULL, + "private_key_pem" text NOT NULL, + "token_status" text NOT NULL, + "scopes" text[] DEFAULT '{}' NOT NULL, + "authorized_global_user_id" text, + "access_token" text, + "token_id" text, + "token_expires_at" timestamp with time zone, + "target_stack_id" text NOT NULL, + "target_stack_slug" text, + "target_stack_display_name" text, + "target_company_id" text NOT NULL, + "target_origin" text NOT NULL, + "target_primary_host" text NOT NULL, + "target_product" text NOT NULL, + "target_schema_major" integer NOT NULL, + "target_max_chunk_bytes" integer NOT NULL, + "pending_state" text, + "pending_code_verifier" text, + "pending_redirect_uri" text, + "pending_token_url" text, + "last_run_id" uuid, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL +);--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "cloud_upstream_runs" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "connection_id" uuid NOT NULL, + "company_id" uuid NOT NULL, + "remote_run_id" text, + "status" text NOT NULL, + "active_step" text NOT NULL, + "progress_percent" integer DEFAULT 0 NOT NULL, + "dry_run" boolean DEFAULT false NOT NULL, + "retry_of_run_id" uuid, + "summary" jsonb DEFAULT '[]'::jsonb NOT NULL, + "warnings" jsonb DEFAULT '[]'::jsonb NOT NULL, + "conflicts" jsonb DEFAULT '[]'::jsonb NOT NULL, + "events" jsonb DEFAULT '[]'::jsonb NOT NULL, + "report" jsonb DEFAULT '{}'::jsonb NOT NULL, + "idempotency_key" text NOT NULL, + "manifest_hash" text NOT NULL, + "target_url" text, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + "completed_at" timestamp with time zone +);--> statement-breakpoint +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'cloud_upstream_connections_company_id_companies_id_fk') THEN + ALTER TABLE "cloud_upstream_connections" ADD CONSTRAINT "cloud_upstream_connections_company_id_companies_id_fk" FOREIGN KEY ("company_id") REFERENCES "public"."companies"("id") ON DELETE cascade ON UPDATE no action; + END IF; +END $$;--> statement-breakpoint +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'cloud_upstream_runs_connection_id_cloud_upstream_connections_id_fk') THEN + ALTER TABLE "cloud_upstream_runs" ADD CONSTRAINT "cloud_upstream_runs_connection_id_cloud_upstream_connections_id_fk" FOREIGN KEY ("connection_id") REFERENCES "public"."cloud_upstream_connections"("id") ON DELETE cascade ON UPDATE no action; + END IF; +END $$;--> statement-breakpoint +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'cloud_upstream_runs_company_id_companies_id_fk') THEN + ALTER TABLE "cloud_upstream_runs" ADD CONSTRAINT "cloud_upstream_runs_company_id_companies_id_fk" FOREIGN KEY ("company_id") REFERENCES "public"."companies"("id") ON DELETE cascade ON UPDATE no action; + END IF; +END $$;--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "cloud_upstream_connections_company_idx" ON "cloud_upstream_connections" USING btree ("company_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "cloud_upstream_runs_company_created_idx" ON "cloud_upstream_runs" USING btree ("company_id","created_at");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "cloud_upstream_runs_connection_idx" ON "cloud_upstream_runs" USING btree ("connection_id"); diff --git a/packages/db/src/migrations/meta/_journal.json b/packages/db/src/migrations/meta/_journal.json index 7aaa96d3..abaa1136 100644 --- a/packages/db/src/migrations/meta/_journal.json +++ b/packages/db/src/migrations/meta/_journal.json @@ -624,6 +624,13 @@ "when": 1779446400000, "tag": "0088_backfill_principal_access_compatibility", "breakpoints": true + }, + { + "idx": 89, + "version": "7", + "when": 1779129600000, + "tag": "0089_cloud_upstreams", + "breakpoints": true } ] } diff --git a/packages/db/src/schema/cloud_upstreams.ts b/packages/db/src/schema/cloud_upstreams.ts new file mode 100644 index 00000000..93b4341f --- /dev/null +++ b/packages/db/src/schema/cloud_upstreams.ts @@ -0,0 +1,75 @@ +import { boolean, index, integer, jsonb, pgTable, text, timestamp, uuid } from "drizzle-orm/pg-core"; +import { companies } from "./companies.js"; + +export const cloudUpstreamConnections = pgTable( + "cloud_upstream_connections", + { + id: uuid("id").primaryKey().defaultRandom(), + companyId: uuid("company_id").notNull().references(() => companies.id, { onDelete: "cascade" }), + remoteUrl: text("remote_url").notNull(), + sourceInstanceId: text("source_instance_id").notNull(), + sourceInstanceFingerprint: text("source_instance_fingerprint").notNull(), + sourcePublicKey: text("source_public_key").notNull(), + // Stored through the Cloud Upstream service as an encrypted credential envelope. + privateKeyPem: text("private_key_pem").notNull(), + tokenStatus: text("token_status").notNull(), + scopes: text("scopes").array().notNull().default([]), + authorizedGlobalUserId: text("authorized_global_user_id"), + // Stored through the Cloud Upstream service as an encrypted credential envelope. + accessToken: text("access_token"), + tokenId: text("token_id"), + tokenExpiresAt: timestamp("token_expires_at", { withTimezone: true }), + + targetStackId: text("target_stack_id").notNull(), + targetStackSlug: text("target_stack_slug"), + targetStackDisplayName: text("target_stack_display_name"), + targetCompanyId: text("target_company_id").notNull(), + targetOrigin: text("target_origin").notNull(), + targetPrimaryHost: text("target_primary_host").notNull(), + targetProduct: text("target_product").notNull(), + targetSchemaMajor: integer("target_schema_major").notNull(), + targetMaxChunkBytes: integer("target_max_chunk_bytes").notNull(), + + pendingState: text("pending_state"), + pendingCodeVerifier: text("pending_code_verifier"), + pendingRedirectUri: text("pending_redirect_uri"), + pendingTokenUrl: text("pending_token_url"), + + lastRunId: uuid("last_run_id"), + createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(), + updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(), + }, + (table) => [ + index("cloud_upstream_connections_company_idx").on(table.companyId), + ], +); + +export const cloudUpstreamRuns = pgTable( + "cloud_upstream_runs", + { + id: uuid("id").primaryKey().defaultRandom(), + connectionId: uuid("connection_id").notNull().references(() => cloudUpstreamConnections.id, { onDelete: "cascade" }), + companyId: uuid("company_id").notNull().references(() => companies.id, { onDelete: "cascade" }), + remoteRunId: text("remote_run_id"), + status: text("status").notNull(), + activeStep: text("active_step").notNull(), + progressPercent: integer("progress_percent").notNull().default(0), + dryRun: boolean("dry_run").notNull().default(false), + retryOfRunId: uuid("retry_of_run_id"), + summary: jsonb("summary").$type().notNull().default([]), + warnings: jsonb("warnings").$type().notNull().default([]), + conflicts: jsonb("conflicts").$type().notNull().default([]), + events: jsonb("events").$type().notNull().default([]), + report: jsonb("report").$type>().notNull().default({}), + idempotencyKey: text("idempotency_key").notNull(), + manifestHash: text("manifest_hash").notNull(), + targetUrl: text("target_url"), + createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(), + updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(), + completedAt: timestamp("completed_at", { withTimezone: true }), + }, + (table) => [ + index("cloud_upstream_runs_company_created_idx").on(table.companyId, table.createdAt), + index("cloud_upstream_runs_connection_idx").on(table.connectionId), + ], +); diff --git a/packages/db/src/schema/index.ts b/packages/db/src/schema/index.ts index fdf5f7cb..b7630a31 100644 --- a/packages/db/src/schema/index.ts +++ b/packages/db/src/schema/index.ts @@ -2,6 +2,7 @@ export { companies } from "./companies.js"; export { companyLogos } from "./company_logos.js"; export { authUsers, authSessions, authAccounts, authVerifications } from "./auth.js"; export { instanceSettings } from "./instance_settings.js"; +export { cloudUpstreamConnections, cloudUpstreamRuns } from "./cloud_upstreams.js"; export { instanceUserRoles } from "./instance_user_roles.js"; export { userSidebarPreferences } from "./user_sidebar_preferences.js"; export { agents } from "./agents.js"; diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 9afa40f4..7f605326 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -688,6 +688,22 @@ export { MAX_ISSUE_GRAPH_LIVENESS_AUTO_RECOVERY_LOOKBACK_HOURS, } from "./types/instance.js"; +export type { + CloudUpstreamConnectStartResponse, + CloudUpstreamActivationDecision, + CloudUpstreamActivationEntityType, + CloudUpstreamConnection, + CloudUpstreamConflict, + CloudUpstreamPreview, + CloudUpstreamRun, + CloudUpstreamRunEvent, + CloudUpstreamsState, + CloudUpstreamStep, + CloudUpstreamSummaryCount, + CloudUpstreamTarget, + CloudUpstreamWarning, +} from "./types/cloud-upstream.js"; + export { getClosedIsolatedExecutionWorkspaceMessage, isClosedIsolatedExecutionWorkspace, diff --git a/packages/shared/src/types/cloud-upstream.ts b/packages/shared/src/types/cloud-upstream.ts new file mode 100644 index 00000000..211fa25d --- /dev/null +++ b/packages/shared/src/types/cloud-upstream.ts @@ -0,0 +1,110 @@ +export type CloudUpstreamStep = "connect" | "scan" | "preview" | "push" | "verify" | "activate"; + +export type CloudUpstreamRunStatus = "previewed" | "running" | "succeeded" | "failed" | "cancelled"; + +export type CloudUpstreamActivationEntityType = "agents" | "routines" | "monitors"; + +export interface CloudUpstreamActivationDecision { + entityType: CloudUpstreamActivationEntityType; + count: number; + status: "paused" | "activated"; + activatedAt: string | null; +} + +export interface CloudUpstreamTarget { + stackId: string; + stackSlug: string | null; + stackDisplayName: string | null; + companyId: string; + primaryHost: string; + origin: string; + product: string; + schemaMajor: number; + maxChunkBytes: number; +} + +export interface CloudUpstreamConnection { + id: string; + companyId: string; + remoteUrl: string; + target: CloudUpstreamTarget; + tokenStatus: "pending" | "connected" | "expired" | "revoked"; + scopes: string[]; + authorizedGlobalUserId: string | null; + expiresAt: string | null; + createdAt: string; + updatedAt: string; + lastRunId: string | null; +} + +export interface CloudUpstreamSummaryCount { + key: string; + label: string; + count: number; +} + +export interface CloudUpstreamWarning { + code: string; + severity: "warning" | "blocker"; + title: string; + detail: string; +} + +export interface CloudUpstreamConflict { + id: string; + entityType: string; + sourceLabel: string; + targetLabel: string; + plannedAction: "create" | "update" | "skip" | "blocked"; + reason: string; +} + +export interface CloudUpstreamPreview { + connectionId: string; + sourceCompanyId: string; + target: CloudUpstreamTarget; + schemaCompatible: boolean; + summary: CloudUpstreamSummaryCount[]; + warnings: CloudUpstreamWarning[]; + conflicts: CloudUpstreamConflict[]; + generatedAt: string; +} + +export interface CloudUpstreamRunEvent { + id: string; + at: string; + phase: CloudUpstreamStep; + type: "created" | "updated" | "skipped" | "conflict" | "retrying" | "failed" | "completed"; + message: string; +} + +export interface CloudUpstreamRun { + id: string; + connectionId: string; + companyId: string; + status: CloudUpstreamRunStatus; + activeStep: CloudUpstreamStep; + progressPercent: number; + dryRun: boolean; + summary: CloudUpstreamSummaryCount[]; + warnings: CloudUpstreamWarning[]; + conflicts: CloudUpstreamConflict[]; + events: CloudUpstreamRunEvent[]; + targetUrl: string | null; + report: Record; + retryOfRunId: string | null; + createdAt: string; + updatedAt: string; + completedAt: string | null; +} + +export interface CloudUpstreamsState { + connections: CloudUpstreamConnection[]; + runs: CloudUpstreamRun[]; +} + +export interface CloudUpstreamConnectStartResponse { + pendingConnectionId: string; + authorizationUrl: string; + connection: CloudUpstreamConnection; +} diff --git a/packages/shared/src/types/instance.ts b/packages/shared/src/types/instance.ts index ee6a6553..3be7c6fd 100644 --- a/packages/shared/src/types/instance.ts +++ b/packages/shared/src/types/instance.ts @@ -29,6 +29,7 @@ export interface InstanceGeneralSettings { export interface InstanceExperimentalSettings { enableEnvironments: boolean; enableIsolatedWorkspaces: boolean; + enableCloudSync: boolean; autoRestartDevServerWhenIdle: boolean; enableIssueGraphLivenessAutoRecovery: boolean; issueGraphLivenessAutoRecoveryLookbackHours: number; diff --git a/packages/shared/src/validators/instance.ts b/packages/shared/src/validators/instance.ts index 3415539a..52da4d5f 100644 --- a/packages/shared/src/validators/instance.ts +++ b/packages/shared/src/validators/instance.ts @@ -38,6 +38,7 @@ export const patchInstanceGeneralSettingsSchema = instanceGeneralSettingsSchema. export const instanceExperimentalSettingsSchema = z.object({ enableEnvironments: z.boolean().default(false), enableIsolatedWorkspaces: z.boolean().default(false), + enableCloudSync: z.boolean().default(false), autoRestartDevServerWhenIdle: z.boolean().default(false), enableIssueGraphLivenessAutoRecovery: z.boolean().default(false), issueGraphLivenessAutoRecoveryLookbackHours: z diff --git a/server/src/__tests__/cleanup-removal-service.test.ts b/server/src/__tests__/cleanup-removal-service.test.ts index 25dc11d6..e65124cc 100644 --- a/server/src/__tests__/cleanup-removal-service.test.ts +++ b/server/src/__tests__/cleanup-removal-service.test.ts @@ -9,6 +9,7 @@ import { createDb, documents, documentRevisions, + heartbeatRunEvents, heartbeatRuns, issueComments, issueDocuments, @@ -42,6 +43,7 @@ describeEmbeddedPostgres("cleanup removal services", () => { }, 20_000); afterEach(async () => { + await db.delete(heartbeatRunEvents); await db.delete(activityLog); await db.delete(issueReadStates); await db.delete(issueComments); @@ -228,4 +230,32 @@ describeEmbeddedPostgres("cleanup removal services", () => { await expect(db.select().from(issueReadStates).where(eq(issueReadStates.companyId, companyId))).resolves.toHaveLength(0); await expect(db.select().from(activityLog).where(eq(activityLog.companyId, companyId))).resolves.toHaveLength(0); }); + + it("removes heartbeat events by run id before deleting company-owned runs", async () => { + const { agentId, companyId, runId } = await seedFixture(); + const otherCompanyId = randomUUID(); + + await db.insert(companies).values({ + id: otherCompanyId, + name: "Other Company", + issuePrefix: `O${otherCompanyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, + requireBoardApprovalForNewAgents: false, + }); + + await db.insert(heartbeatRunEvents).values({ + companyId: otherCompanyId, + runId, + agentId, + seq: 1, + eventType: "output", + message: "event with mismatched company scope", + }); + + const removed = await companyService(db).remove(companyId); + + expect(removed?.id).toBe(companyId); + await expect(db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, runId))).resolves.toHaveLength(0); + await expect(db.select().from(heartbeatRunEvents).where(eq(heartbeatRunEvents.runId, runId))).resolves.toHaveLength(0); + await expect(db.select().from(companies).where(eq(companies.id, otherCompanyId))).resolves.toHaveLength(1); + }); }); diff --git a/server/src/__tests__/cloud-upstreams.test.ts b/server/src/__tests__/cloud-upstreams.test.ts new file mode 100644 index 00000000..fce78dfd --- /dev/null +++ b/server/src/__tests__/cloud-upstreams.test.ts @@ -0,0 +1,334 @@ +import { generateKeyPairSync, randomUUID } from "node:crypto"; +import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest"; +import { companies, cloudUpstreamConnections, cloudUpstreamRuns, companySkills, createDb } from "@paperclipai/db"; + +import { HttpError } from "../errors.js"; +import { + cloudUpstreamRemoteFailureReport, + cloudUpstreamService, + reconcileCloudUpstreamRunsOnStartup, + sealCloudUpstreamCredential, + unsealCloudUpstreamCredential, +} from "../services/cloud-upstreams.js"; +import { + getEmbeddedPostgresTestSupport, + startEmbeddedPostgresTestDatabase, +} from "./helpers/embedded-postgres.js"; + +const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); +const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; + +if (!embeddedPostgresSupport.supported) { + console.warn( + `Skipping embedded Postgres cloud upstream tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, + ); +} + +describe("cloud upstream remote failures", () => { + it("preserves the cloud response body and message on run reports", () => { + const body = { + error: "bad_request", + message: "entities[42].body must be an object", + errors: [{ path: "entities[42].body" }], + }; + + expect(cloudUpstreamRemoteFailureReport(new HttpError(400, "bad_request", body))).toEqual({ + error: "bad_request", + errorMessage: "entities[42].body must be an object", + details: body, + }); + }); + + it("falls back to the thrown error message for non-remote failures", () => { + expect(cloudUpstreamRemoteFailureReport(new Error("network failed"))).toEqual({ + error: "network failed", + }); + }); +}); + +describe("cloud upstream credential storage", () => { + const previousMasterKey = process.env.PAPERCLIP_SECRETS_MASTER_KEY; + + afterEach(() => { + if (previousMasterKey === undefined) { + delete process.env.PAPERCLIP_SECRETS_MASTER_KEY; + } else { + process.env.PAPERCLIP_SECRETS_MASTER_KEY = previousMasterKey; + } + }); + + it("stores new credentials as encrypted envelopes and preserves legacy plaintext reads", async () => { + process.env.PAPERCLIP_SECRETS_MASTER_KEY = "12345678901234567890123456789012"; + const sealed = await sealCloudUpstreamCredential("cloud-access-token"); + + expect(sealed).toMatch(/^paperclip-cloud-credential:/); + expect(sealed).not.toContain("cloud-access-token"); + await expect(unsealCloudUpstreamCredential(sealed)).resolves.toBe("cloud-access-token"); + await expect(unsealCloudUpstreamCredential("legacy-plaintext-token")).resolves.toBe("legacy-plaintext-token"); + }); +}); + +describeEmbeddedPostgres("cloud upstream persistence", () => { + let db!: ReturnType; + let tempDb: Awaited> | null = null; + const previousMasterKey = process.env.PAPERCLIP_SECRETS_MASTER_KEY; + + beforeAll(async () => { + process.env.PAPERCLIP_SECRETS_MASTER_KEY = "12345678901234567890123456789012"; + tempDb = await startEmbeddedPostgresTestDatabase("paperclip-cloud-upstreams-"); + db = createDb(tempDb.connectionString); + }, 20_000); + + afterEach(async () => { + vi.restoreAllMocks(); + await db.delete(cloudUpstreamRuns); + await db.delete(cloudUpstreamConnections); + await db.delete(companySkills); + await db.delete(companies); + }); + + afterAll(async () => { + if (previousMasterKey === undefined) { + delete process.env.PAPERCLIP_SECRETS_MASTER_KEY; + } else { + process.env.PAPERCLIP_SECRETS_MASTER_KEY = previousMasterKey; + } + await tempDb?.cleanup(); + }); + + it("encrypts stored upstream credentials while keeping connection flows usable", async () => { + const companyId = randomUUID(); + await seedCompany(companyId); + const tokenUrl = "https://cloud.example.test/oauth/token"; + vi.spyOn(globalThis, "fetch").mockImplementation(async (input, init) => { + const url = String(input); + if (url.startsWith("https://cloud.example.test/.well-known/paperclip-upstream")) { + return jsonResponse({ + product: "Paperclip Cloud", + stack: { + id: "stack-1", + companyId: "cloud-company-1", + origin: "https://cloud.example.test", + primaryHost: "cloud.example.test", + }, + transfer: { + supportedSchemaMajor: 1, + maxChunkBytes: 8192, + }, + auth: { + scopes: ["upstream_import:write"], + pkce: { + authorizeUrl: "https://cloud.example.test/oauth/authorize", + tokenUrl, + }, + }, + }); + } + if (url === tokenUrl && init?.method === "POST") { + const payload = JSON.parse(String(init.body)); + expect(payload.codeVerifier).toEqual(expect.any(String)); + expect(payload.codeVerifier).not.toContain("paperclip-cloud-credential:"); + return jsonResponse({ + accessToken: "cloud-access-token", + token: { + id: "token-1", + expiresAt: "2026-05-22T13:00:00.000Z", + globalUserId: "user-1", + }, + }); + } + throw new Error(`Unexpected fetch: ${url}`); + }); + + const service = cloudUpstreamService(db, { instanceId: "test" }); + const started = await service.startConnect({ + companyId, + remoteUrl: "https://cloud.example.test", + redirectUri: "http://localhost:3100/callback", + }); + await service.finishConnect({ + pendingConnectionId: started.pendingConnectionId, + code: "auth-code", + state: new URL(started.authorizationUrl).searchParams.get("state") ?? "", + }); + + const [row] = await db.select().from(cloudUpstreamConnections); + expect(row.privateKeyPem).toMatch(/^paperclip-cloud-credential:/); + expect(row.privateKeyPem).not.toContain("BEGIN PRIVATE KEY"); + expect(row.accessToken).toMatch(/^paperclip-cloud-credential:/); + expect(row.accessToken).not.toContain("cloud-access-token"); + }); + + it("marks orphaned running runs failed during startup reconciliation", async () => { + const companyId = randomUUID(); + const connectionId = randomUUID(); + const runningRunId = randomUUID(); + const succeededRunId = randomUUID(); + const reconciledAt = new Date("2026-05-22T13:00:00.000Z"); + await seedCompany(companyId); + await db.insert(cloudUpstreamConnections).values({ + id: connectionId, + companyId, + remoteUrl: "https://cloud.example.test", + sourceInstanceId: "source-1", + sourceInstanceFingerprint: "sha256:test", + sourcePublicKey: "public-key", + privateKeyPem: "legacy-private-key", + tokenStatus: "connected", + scopes: ["upstream_import:write"], + authorizedGlobalUserId: "user-1", + accessToken: "legacy-token", + tokenId: "token-1", + targetStackId: "stack-1", + targetCompanyId: "cloud-company-1", + targetOrigin: "https://cloud.example.test", + targetPrimaryHost: "cloud.example.test", + targetProduct: "Paperclip Cloud", + targetSchemaMajor: 1, + targetMaxChunkBytes: 8192, + }); + await db.insert(cloudUpstreamRuns).values([ + cloudRunRow({ id: runningRunId, connectionId, companyId, status: "running" }), + cloudRunRow({ id: succeededRunId, connectionId, companyId, status: "succeeded", completedAt: reconciledAt }), + ]); + + await expect(reconcileCloudUpstreamRunsOnStartup(db, reconciledAt)).resolves.toEqual({ reconciled: 1 }); + + const rows = await db.select().from(cloudUpstreamRuns); + const running = rows.find((row) => row.id === runningRunId); + const succeeded = rows.find((row) => row.id === succeededRunId); + expect(running?.status).toBe("failed"); + expect(running?.completedAt?.toISOString()).toBe(reconciledAt.toISOString()); + expect(running?.events.at(-1)?.message).toContain("server startup"); + expect(running?.report).toMatchObject({ + error: "orphaned_running_run", + reconciledAt: reconciledAt.toISOString(), + }); + expect(succeeded?.status).toBe("succeeded"); + }); + + it("rejects a new run when the connection already has a running run", async () => { + const companyId = randomUUID(); + const connectionId = randomUUID(); + const runningRunId = randomUUID(); + await seedCompany(companyId); + await db.insert(cloudUpstreamConnections).values(cloudConnectionRow({ id: connectionId, companyId })); + await db.insert(cloudUpstreamRuns).values( + cloudRunRow({ id: runningRunId, connectionId, companyId, status: "running" }), + ); + + await expect(cloudUpstreamService(db).createRun({ connectionId, companyId })).rejects.toMatchObject({ + status: 409, + details: { runId: runningRunId }, + }); + }); + + it("preserves a cancelled run when an in-flight createRun tries to finish", async () => { + const companyId = randomUUID(); + const connectionId = randomUUID(); + await seedCompany(companyId); + await db.insert(cloudUpstreamConnections).values(cloudConnectionRow({ id: connectionId, companyId })); + + const service = cloudUpstreamService(db); + const remoteCalls: string[] = []; + globalThis.fetch = vi.fn(async (input) => { + const path = new URL(String(input)).pathname; + remoteCalls.push(path); + if (path.endsWith("/upstream-imports/runs")) { + return jsonResponse({ run: { id: "remote-run-1" } }); + } + if (path.endsWith("/chunks")) { + const run = await db.select().from(cloudUpstreamRuns).then((rows) => rows[0]); + expect(run?.status).toBe("running"); + await service.cancelRun(connectionId, run.id, companyId); + return jsonResponse({ ok: true }); + } + if (path.endsWith("/cancel")) { + return jsonResponse({ ok: true }); + } + if (path.endsWith("/apply")) { + return jsonResponse({ ok: true }); + } + if (path.endsWith("/events")) { + return jsonResponse({ events: [] }); + } + return jsonResponse({ error: "not_found" }, 404); + }) as typeof fetch; + + const result = await service.createRun({ connectionId, companyId }); + + expect(result.status).toBe("cancelled"); + expect(remoteCalls.some((path) => path.endsWith("/apply"))).toBe(false); + const rows = await db.select().from(cloudUpstreamRuns); + expect(rows).toHaveLength(1); + expect(rows[0]?.status).toBe("cancelled"); + }); + + async function seedCompany(companyId: string) { + await db.insert(companies).values({ + id: companyId, + name: "Paperclip", + issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, + requireBoardApprovalForNewAgents: false, + }); + } +}); + +function jsonResponse(body: unknown): Response { + return new Response(JSON.stringify(body), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); +} + +function cloudConnectionRow(input: { id: string; companyId: string }) { + const { privateKey } = generateKeyPairSync("ed25519"); + return { + id: input.id, + companyId: input.companyId, + remoteUrl: "https://cloud.example.test", + sourceInstanceId: "source-1", + sourceInstanceFingerprint: "sha256:test", + sourcePublicKey: "public-key", + privateKeyPem: privateKey.export({ type: "pkcs8", format: "pem" }).toString(), + tokenStatus: "connected", + scopes: ["upstream_import:write"], + authorizedGlobalUserId: "user-1", + accessToken: "legacy-token", + tokenId: "token-1", + targetStackId: "stack-1", + targetCompanyId: "cloud-company-1", + targetOrigin: "https://cloud.example.test", + targetPrimaryHost: "cloud.example.test", + targetProduct: "Paperclip Cloud", + targetSchemaMajor: 1, + targetMaxChunkBytes: 8192, + }; +} + +function cloudRunRow(input: { + id: string; + connectionId: string; + companyId: string; + status: string; + completedAt?: Date; +}) { + return { + id: input.id, + connectionId: input.connectionId, + companyId: input.companyId, + status: input.status, + activeStep: "push", + progressPercent: input.status === "running" ? 45 : 100, + dryRun: false, + summary: [], + warnings: [], + conflicts: [], + events: [], + report: {}, + idempotencyKey: `key-${input.id}`, + manifestHash: `sha256:${input.id.replace(/-/g, "")}`, + targetUrl: "https://cloud.example.test", + completedAt: input.completedAt, + }; +} diff --git a/server/src/__tests__/instance-settings-routes.test.ts b/server/src/__tests__/instance-settings-routes.test.ts index 41e52190..5d324cd4 100644 --- a/server/src/__tests__/instance-settings-routes.test.ts +++ b/server/src/__tests__/instance-settings-routes.test.ts @@ -64,6 +64,7 @@ describe("instance settings routes", () => { mockInstanceSettingsService.getExperimental.mockResolvedValue({ enableEnvironments: false, enableIsolatedWorkspaces: false, + enableCloudSync: false, autoRestartDevServerWhenIdle: false, enableIssueGraphLivenessAutoRecovery: true, issueGraphLivenessAutoRecoveryLookbackHours: 24, @@ -81,6 +82,7 @@ describe("instance settings routes", () => { experimental: { enableEnvironments: true, enableIsolatedWorkspaces: true, + enableCloudSync: true, autoRestartDevServerWhenIdle: false, enableIssueGraphLivenessAutoRecovery: true, issueGraphLivenessAutoRecoveryLookbackHours: 24, @@ -123,6 +125,7 @@ describe("instance settings routes", () => { expect(getRes.body).toEqual({ enableEnvironments: false, enableIsolatedWorkspaces: false, + enableCloudSync: false, autoRestartDevServerWhenIdle: false, enableIssueGraphLivenessAutoRecovery: true, issueGraphLivenessAutoRecoveryLookbackHours: 24, diff --git a/server/src/__tests__/instance-settings-service.test.ts b/server/src/__tests__/instance-settings-service.test.ts new file mode 100644 index 00000000..6669414d --- /dev/null +++ b/server/src/__tests__/instance-settings-service.test.ts @@ -0,0 +1,23 @@ +import { describe, expect, it } from "vitest"; +import { normalizeExperimentalSettings } from "../services/instance-settings.js"; + +describe("instance settings service", () => { + it("ignores retired experimental flags without resetting current settings", () => { + expect(normalizeExperimentalSettings({ + enableEnvironments: true, + enableIsolatedWorkspaces: true, + enableCloudSync: true, + autoRestartDevServerWhenIdle: true, + enableIssueGraphLivenessAutoRecovery: true, + issueGraphLivenessAutoRecoveryLookbackHours: 48, + enableNewestFirstIssueThread: true, + })).toEqual({ + enableEnvironments: true, + enableIsolatedWorkspaces: true, + enableCloudSync: true, + autoRestartDevServerWhenIdle: true, + enableIssueGraphLivenessAutoRecovery: true, + issueGraphLivenessAutoRecoveryLookbackHours: 48, + }); + }); +}); diff --git a/server/src/__tests__/server-startup-feedback-export.test.ts b/server/src/__tests__/server-startup-feedback-export.test.ts index f1023a91..3cdab7c4 100644 --- a/server/src/__tests__/server-startup-feedback-export.test.ts +++ b/server/src/__tests__/server-startup-feedback-export.test.ts @@ -166,6 +166,7 @@ vi.mock("../services/index.js", () => ({ }, })), })), + reconcileCloudUpstreamRunsOnStartup: vi.fn(async () => ({ reconciled: 0 })), reconcilePersistedRuntimeServicesOnStartup: vi.fn(async () => ({ reconciled: 0 })), routineService: vi.fn(() => ({ tickScheduledTriggers: vi.fn(async () => ({ triggered: 0 })), diff --git a/server/src/index.ts b/server/src/index.ts index bb89a46e..d9c52d27 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -33,6 +33,7 @@ import { backfillPrincipalAccessCompatibility, heartbeatService, instanceSettingsService, + reconcileCloudUpstreamRunsOnStartup, reconcilePersistedRuntimeServicesOnStartup, routineService, } from "./services/index.js"; @@ -699,6 +700,19 @@ export async function startServer(): Promise { .catch((err) => { logger.error({ err }, "startup reconciliation of persisted runtime services failed"); }); + + void reconcileCloudUpstreamRunsOnStartup(db as any) + .then((result) => { + if (result.reconciled > 0) { + logger.warn( + { reconciled: result.reconciled }, + "reconciled cloud upstream runs from a previous server process", + ); + } + }) + .catch((err) => { + logger.error({ err }, "startup reconciliation of cloud upstream runs failed"); + }); if (config.heartbeatSchedulerEnabled) { const heartbeat = heartbeatService(db as any, { pluginWorkerManager }); diff --git a/server/src/routes/cloud-upstreams.ts b/server/src/routes/cloud-upstreams.ts new file mode 100644 index 00000000..085afe47 --- /dev/null +++ b/server/src/routes/cloud-upstreams.ts @@ -0,0 +1,118 @@ +import { Router } from "express"; +import type { Db } from "@paperclipai/db"; +import { badRequest, notFound } from "../errors.js"; +import { assertBoardOrgAccess } from "./authz.js"; +import { cloudUpstreamService, instanceSettingsService } from "../services/index.js"; + +export function cloudUpstreamRoutes(db: Db, options: { instanceId?: string } = {}) { + const router = Router(); + const service = cloudUpstreamService(db, options); + const settings = instanceSettingsService(db); + + async function assertEnabled() { + const experimental = await settings.getExperimental(); + if (experimental.enableCloudSync !== true) { + throw notFound("Cloud sync is not enabled"); + } + } + + router.get("/cloud-upstreams", async (req, res) => { + assertBoardOrgAccess(req); + await assertEnabled(); + const companyId = stringQuery(req.query.companyId, "companyId"); + res.json(await service.list(companyId)); + }); + + router.post("/cloud-upstreams/connect/start", async (req, res) => { + assertBoardOrgAccess(req); + await assertEnabled(); + const companyId = stringBody(req.body, "companyId"); + const remoteUrl = stringBody(req.body, "remoteUrl"); + const redirectUri = stringBody(req.body, "redirectUri"); + res.json(await service.startConnect({ companyId, remoteUrl, redirectUri })); + }); + + router.post("/cloud-upstreams/connect/finish", async (req, res) => { + assertBoardOrgAccess(req); + await assertEnabled(); + res.json(await service.finishConnect({ + pendingConnectionId: stringBody(req.body, "pendingConnectionId"), + code: stringBody(req.body, "code"), + state: stringBody(req.body, "state"), + })); + }); + + router.post("/cloud-upstreams/:connectionId/push-runs/preview", async (req, res) => { + assertBoardOrgAccess(req); + await assertEnabled(); + res.json(await service.preview(req.params.connectionId, stringBody(req.body, "companyId"))); + }); + + router.post("/cloud-upstreams/:connectionId/push-runs", async (req, res) => { + assertBoardOrgAccess(req); + await assertEnabled(); + res.json(await service.createRun({ + connectionId: req.params.connectionId, + companyId: stringBody(req.body, "companyId"), + retryOfRunId: optionalString(req.body?.retryOfRunId), + })); + }); + + router.get("/cloud-upstreams/:connectionId/push-runs/:runId", async (req, res) => { + assertBoardOrgAccess(req); + await assertEnabled(); + res.json(await service.readRun(req.params.connectionId, req.params.runId, stringQuery(req.query.companyId, "companyId"))); + }); + + router.post("/cloud-upstreams/:connectionId/push-runs/:runId/cancel", async (req, res) => { + assertBoardOrgAccess(req); + await assertEnabled(); + res.json(await service.cancelRun(req.params.connectionId, req.params.runId, stringBody(req.body, "companyId"))); + }); + + router.post("/cloud-upstreams/:connectionId/push-runs/:runId/activation", async (req, res) => { + assertBoardOrgAccess(req); + await assertEnabled(); + res.json(await service.activateRunEntities({ + connectionId: req.params.connectionId, + runId: req.params.runId, + companyId: stringBody(req.body, "companyId"), + entityType: activationEntityTypeBody(req.body), + })); + }); + + return router; +} + +function stringQuery(value: unknown, label: string): string { + if (typeof value !== "string" || value.trim().length === 0) { + throw badRequest(`${label} is required`); + } + return value; +} + +function stringBody(body: unknown, key: string): string { + if (!body || typeof body !== "object" || Array.isArray(body)) { + throw badRequest(`${key} is required`); + } + const value = (body as Record)[key]; + if (typeof value !== "string" || value.trim().length === 0) { + throw badRequest(`${key} is required`); + } + return value; +} + +function optionalString(value: unknown): string | null { + return typeof value === "string" && value.length > 0 ? value : null; +} + +function activationEntityTypeBody(body: unknown): "agents" | "routines" | "monitors" { + if (!body || typeof body !== "object" || Array.isArray(body)) { + throw badRequest("entityType is required"); + } + const value = (body as Record).entityType; + if (value !== "agents" && value !== "routines" && value !== "monitors") { + throw badRequest("entityType must be agents, routines, or monitors"); + } + return value; +} diff --git a/server/src/routes/index.ts b/server/src/routes/index.ts index 983562e2..74987ed4 100644 --- a/server/src/routes/index.ts +++ b/server/src/routes/index.ts @@ -19,3 +19,4 @@ export { llmRoutes } from "./llms.js"; export { accessRoutes } from "./access.js"; export { instanceSettingsRoutes } from "./instance-settings.js"; export { instanceDatabaseBackupRoutes } from "./instance-database-backups.js"; +export { cloudUpstreamRoutes } from "./cloud-upstreams.js"; diff --git a/server/src/services/cloud-upstreams.ts b/server/src/services/cloud-upstreams.ts new file mode 100644 index 00000000..765dd432 --- /dev/null +++ b/server/src/services/cloud-upstreams.ts @@ -0,0 +1,1298 @@ +import crypto, { sign } from "node:crypto"; +import { and, count, desc, eq, sql } from "drizzle-orm"; +import type { + CloudUpstreamConnectStartResponse, + CloudUpstreamActivationDecision, + CloudUpstreamActivationEntityType, + CloudUpstreamConnection, + CloudUpstreamConflict, + CloudUpstreamPreview, + CloudUpstreamRun, + CloudUpstreamRunEvent, + CloudUpstreamsState, + CloudUpstreamSummaryCount, + CloudUpstreamTarget, + CloudUpstreamWarning, + CompanyPortabilityExportResult, + CompanyPortabilityFileEntry, +} from "@paperclipai/shared"; +import type { Db } from "@paperclipai/db"; +import { + agents, + cloudUpstreamConnections, + cloudUpstreamRuns, + companies, + goals, + issueComments, + issues, + projects, + routines, +} from "@paperclipai/db"; +import { badRequest, conflict, HttpError, notFound } from "../errors.js"; +import { companyPortabilityService } from "./company-portability.js"; +import { localEncryptedProvider } from "../secrets/local-encrypted-provider.js"; + +const DEFAULT_SCOPES = ["upstream_import:preview", "upstream_import:write", "upstream_import:read"]; +const TRANSFER_SCHEMA = { + family: "paperclip-upstream-transfer", + version: "1.0.0", + major: 1, + minor: 0, +} as const; +const DEFAULT_MAX_ENTITIES_PER_CHUNK = 100; +const DISCOVERY_FETCH_TIMEOUT_MS = 30_000; +const REMOTE_FETCH_TIMEOUT_MS = 120_000; +const CLOUD_CREDENTIAL_PREFIX = "paperclip-cloud-credential:"; + +type NormalizedSha256 = `sha256:${string}`; + +type SourceEntityKey = { + sourceInstanceId: string; + sourceCompanyId: string; + sourceEntityType: string; + sourceEntityId: string; + sourceNaturalKey?: string; +}; + +type UpstreamTransferWarning = { + code: string; + severity: "info" | "warning" | "blocker"; + message: string; + entity?: SourceEntityKey; +}; + +type UpstreamTransferEntityRecord = { + key: SourceEntityKey; + contentHash: NormalizedSha256; + dependencies: SourceEntityKey[]; + warnings: UpstreamTransferWarning[]; +}; + +type LocalUpstreamExportEntity = { + record: UpstreamTransferEntityRecord; + body: Record; + conflictKeys?: string[]; +}; + +type LocalUpstreamExportChunk = { + chunkIndex: number; + totalChunks: number; + byteLength: number; + sha256: NormalizedSha256; + payload: { + entityKeys: SourceEntityKey[]; + }; +}; + +type UpstreamTransferManifest = { + schema: typeof TRANSFER_SCHEMA; + source: { + sourceInstanceId: string; + sourceCompanyId: string; + sourceInstanceKeyFingerprint: string; + exporterVersion: string; + sourceSchemaVersion: string; + }; + target: { + targetStackId: string; + targetCompanyId: string; + targetOrigin: string; + supportedSchemaMajor: number; + }; + runId: string; + idempotencyKey: string; + generatedAt: string; + entityCount: number; + entities: UpstreamTransferEntityRecord[]; + chunks: Array & { manifestHash: NormalizedSha256 }>; + warnings: UpstreamTransferWarning[]; + featureFlags: string[]; + manifestHash: NormalizedSha256; +}; + +type LocalUpstreamExportBundle = { + manifest: UpstreamTransferManifest; + entities: LocalUpstreamExportEntity[]; + chunks: LocalUpstreamExportChunk[]; +}; + +type ConnectionRow = typeof cloudUpstreamConnections.$inferSelect; +type RunRow = typeof cloudUpstreamRuns.$inferSelect; + +export function cloudUpstreamService(db: Db, options: { instanceId?: string } = {}) { + const sourceInstanceId = `paperclip-local-${options.instanceId ?? "default"}`; + const portability = companyPortabilityService(db); + + return { + list: async (companyId: string): Promise => { + const [connectionRows, runRows] = await Promise.all([ + db + .select() + .from(cloudUpstreamConnections) + .where(eq(cloudUpstreamConnections.companyId, companyId)) + .orderBy(desc(cloudUpstreamConnections.updatedAt)), + db + .select() + .from(cloudUpstreamRuns) + .where(eq(cloudUpstreamRuns.companyId, companyId)) + .orderBy(desc(cloudUpstreamRuns.createdAt)) + .limit(50), + ]); + return { + connections: connectionRows.map(connectionFromRow), + runs: runRows.map(runFromRow), + }; + }, + + startConnect: async (input: { + companyId: string; + remoteUrl: string; + redirectUri: string; + }): Promise => { + await requireCompany(input.companyId); + const remoteUrl = input.remoteUrl.trim(); + if (!remoteUrl) throw badRequest("Remote URL is required"); + + const discovery = await fetchDiscovery(remoteUrl); + const target = targetFromDiscovery(discovery); + const connectionId = crypto.randomUUID(); + const state = crypto.randomBytes(24).toString("base64url"); + const codeVerifier = crypto.randomBytes(32).toString("base64url"); + const codeChallenge = crypto.createHash("sha256").update(codeVerifier, "utf8").digest("base64url"); + const { publicKey, privateKey } = crypto.generateKeyPairSync("ed25519"); + const sourcePublicKey = publicKey.export({ type: "spki", format: "pem" }).toString(); + const sourceInstanceFingerprint = `sha256:${crypto + .createHash("sha256") + .update(publicKey.export({ type: "spki", format: "der" })) + .digest("hex")}`; + + const [row] = await db.insert(cloudUpstreamConnections).values({ + id: connectionId, + companyId: input.companyId, + remoteUrl, + sourceInstanceId, + sourceInstanceFingerprint, + sourcePublicKey, + privateKeyPem: await sealCloudUpstreamCredential(privateKey.export({ type: "pkcs8", format: "pem" }).toString()), + tokenStatus: "pending", + scopes: scopesFromDiscovery(discovery), + targetStackId: target.stackId, + targetStackSlug: target.stackSlug, + targetStackDisplayName: target.stackDisplayName, + targetCompanyId: target.companyId, + targetOrigin: target.origin, + targetPrimaryHost: target.primaryHost, + targetProduct: target.product, + targetSchemaMajor: target.schemaMajor, + targetMaxChunkBytes: target.maxChunkBytes, + pendingState: state, + pendingCodeVerifier: await sealCloudUpstreamCredential(codeVerifier), + pendingRedirectUri: input.redirectUri, + pendingTokenUrl: tokenUrlFromDiscovery(discovery), + }).returning(); + if (!row) throw badRequest("Failed to create cloud upstream connection"); + + const authorizationUrl = new URL(consentUrlFromDiscovery(discovery)); + authorizationUrl.searchParams.set("stackId", target.stackId); + authorizationUrl.searchParams.set("redirectUri", input.redirectUri); + authorizationUrl.searchParams.set("state", state); + authorizationUrl.searchParams.set("codeChallenge", codeChallenge); + authorizationUrl.searchParams.set("codeChallengeMethod", "S256"); + authorizationUrl.searchParams.set("sourceInstanceId", sourceInstanceId); + authorizationUrl.searchParams.set("sourceInstanceFingerprint", sourceInstanceFingerprint); + authorizationUrl.searchParams.set("sourcePublicKey", sourcePublicKey); + authorizationUrl.searchParams.set("scopes", row.scopes.join(" ")); + + return { + pendingConnectionId: row.id, + authorizationUrl: authorizationUrl.toString(), + connection: connectionFromRow(row), + }; + }, + + finishConnect: async (input: { + pendingConnectionId: string; + code: string; + state: string; + }): Promise => { + const pending = await getConnectionRow(input.pendingConnectionId); + if (!pending.pendingState || !pending.pendingCodeVerifier || !pending.pendingRedirectUri || !pending.pendingTokenUrl) { + throw notFound("Pending cloud upstream connection was not found"); + } + if (input.state !== pending.pendingState) throw badRequest("Cloud upstream state did not match"); + const tokenResponse = await postJson>(pending.pendingTokenUrl, { + grantType: "authorization_code", + code: input.code, + redirectUri: pending.pendingRedirectUri, + codeVerifier: await unsealCloudUpstreamCredential(pending.pendingCodeVerifier), + }); + const accessToken = stringField(tokenResponse, "accessToken"); + const token = objectField(tokenResponse, "token"); + const expiresAt = optionalString(token.expiresAt) ?? optionalString(tokenResponse.expiresAt); + const [updated] = await db + .update(cloudUpstreamConnections) + .set({ + tokenStatus: "connected", + authorizedGlobalUserId: optionalString(token.globalUserId), + accessToken: await sealCloudUpstreamCredential(accessToken), + tokenId: optionalString(token.id), + tokenExpiresAt: expiresAt ? new Date(expiresAt) : null, + pendingState: null, + pendingCodeVerifier: null, + pendingRedirectUri: null, + pendingTokenUrl: null, + updatedAt: new Date(), + }) + .where(eq(cloudUpstreamConnections.id, pending.id)) + .returning(); + if (!updated) throw notFound("Cloud upstream connection was not found"); + return connectionFromRow(updated); + }, + + preview: async (connectionId: string, companyId: string): Promise => { + const connection = await getConnectionRow(connectionId, companyId); + const basePreview = await localPreview(connection); + if (!basePreview.schemaCompatible || connection.tokenStatus !== "connected") { + return basePreview; + } + + const bundle = await buildBundle(connection, "preview"); + const conflictKeysBySource: Record = {}; + for (const entity of bundle.entities) { + if (!entity.conflictKeys || entity.conflictKeys.length === 0) continue; + conflictKeysBySource[sourceEntityKeyString(entity.record.key)] = [...entity.conflictKeys]; + } + const remotePreview = await remotePost(connection, `/api/companies/${encodeURIComponent(connection.targetCompanyId)}/upstream-imports/preview`, { + manifest: bundle.manifest, + previewShape: "manifest_only", + conflictKeysBySource, + }); + return { + ...basePreview, + warnings: mergeWarnings(basePreview.warnings, warningsFromRemote(remotePreview)), + conflicts: conflictsFromRemote(remotePreview), + }; + }, + + createRun: async (input: { connectionId: string; companyId: string; retryOfRunId?: string | null }): Promise => { + const connection = await getConnectionRow(input.connectionId, input.companyId); + if (connection.tokenStatus !== "connected") { + throw badRequest("Cloud upstream connection is not connected"); + } + await assertNoRunningRun(input.connectionId, input.companyId, db); + const preview = await localPreview(connection); + if (!preview.schemaCompatible) { + throw badRequest("Cloud stack schema is not compatible with this local Paperclip version"); + } + + const bundle = await buildBundle(connection, "apply"); + const runId = crypto.randomUUID(); + const now = new Date(); + const initialEvents = [ + event(now.toISOString(), "connect", "completed", "Connected to the target Paperclip Cloud stack."), + event(now.toISOString(), "scan", "completed", "Scanned the local company inventory."), + event(now.toISOString(), "preview", "completed", "Generated the transfer manifest."), + ...(input.retryOfRunId + ? [event(now.toISOString(), "push", "retrying", `Retrying run ${input.retryOfRunId} with the same import ledger idempotency key.`)] + : []), + ]; + const created = await db.transaction(async (tx) => { + await tx.execute( + sql`select ${cloudUpstreamConnections.id} from ${cloudUpstreamConnections} where ${cloudUpstreamConnections.id} = ${connection.id} and ${cloudUpstreamConnections.companyId} = ${connection.companyId} for update`, + ); + await assertNoRunningRun(input.connectionId, input.companyId, tx); + const [row] = await tx.insert(cloudUpstreamRuns).values({ + id: runId, + connectionId: connection.id, + companyId: connection.companyId, + status: "running", + activeStep: "push", + progressPercent: 45, + dryRun: false, + retryOfRunId: input.retryOfRunId ?? null, + summary: preview.summary, + warnings: preview.warnings, + conflicts: preview.conflicts, + events: initialEvents, + report: {}, + idempotencyKey: bundle.manifest.idempotencyKey, + manifestHash: bundle.manifest.manifestHash, + targetUrl: connection.targetOrigin, + createdAt: now, + updatedAt: now, + }).returning(); + return row; + }); + if (!created) throw badRequest("Failed to create cloud upstream run"); + + try { + const remoteRun = await remotePost(connection, `/api/companies/${encodeURIComponent(connection.targetCompanyId)}/upstream-imports/runs`, { + mode: "apply", + manifest: bundle.manifest, + entities: bundle.entities, + }); + const remoteRunId = remoteRunIdFromResponse(remoteRun); + const pushedRun = await updateRunIfRunning(runId, { + remoteRunId, + activeStep: "push", + progressPercent: 60, + events: [ + ...initialEvents, + event(new Date().toISOString(), "push", "updated", "Created or resumed the cloud import ledger run."), + ], + }); + if (pushedRun.status !== "running") return pushedRun; + + for (const chunk of bundle.chunks) { + await remotePost(connection, `/api/upstream-import-runs/${encodeURIComponent(remoteRunId)}/chunks`, chunk); + } + const verifiedRun = await updateRunIfRunning(runId, { + activeStep: "verify", + progressPercent: 82, + events: [ + ...initialEvents, + event(new Date().toISOString(), "push", "completed", `Uploaded ${bundle.chunks.length} manifest chunk${bundle.chunks.length === 1 ? "" : "s"}.`), + ], + }); + if (verifiedRun.status !== "running") return verifiedRun; + + const applied = await remotePost(connection, `/api/upstream-import-runs/${encodeURIComponent(remoteRunId)}/apply`, {}); + const remoteEvents = await remoteGet(connection, `/api/upstream-import-runs/${encodeURIComponent(remoteRunId)}/events`).catch(() => null); + const completedAt = new Date(); + const finalEvents = [ + ...initialEvents, + event(completedAt.toISOString(), "push", "completed", "Pushed mapped objects without duplicate creation."), + event(completedAt.toISOString(), "verify", "completed", "Verified the cloud import ledger and generated a run report."), + event(completedAt.toISOString(), "activate", "completed", "Activation checklist is ready for manual unpause decisions."), + ...eventsFromRemote(remoteEvents), + ]; + const finalRun = await updateRunIfRunning(runId, { + remoteRunId, + status: "succeeded", + activeStep: "activate", + progressPercent: 100, + warnings: mergeWarnings(preview.warnings, warningsFromRemote(applied)), + conflicts: conflictsFromRemote(applied), + events: finalEvents, + report: { + runId, + remoteRunId, + target: targetFromConnectionRow(connection), + manifestHash: bundle.manifest.manifestHash, + idempotencyKey: bundle.manifest.idempotencyKey, + retryOfRunId: input.retryOfRunId ?? null, + result: applied, + events: remoteEvents, + }, + completedAt, + }); + if (finalRun.status === "succeeded") { + await db + .update(cloudUpstreamConnections) + .set({ lastRunId: finalRun.id, updatedAt: new Date() }) + .where(eq(cloudUpstreamConnections.id, connection.id)); + } + return finalRun; + } catch (error) { + const failedAt = new Date(); + const failure = cloudUpstreamRemoteFailureReport(error); + return updateRunIfRunning(runId, { + status: "failed", + activeStep: "push", + progressPercent: 100, + events: [ + ...initialEvents, + event(failedAt.toISOString(), "push", "failed", failure.errorMessage ?? failure.error), + ], + report: { + runId, + target: targetFromConnectionRow(connection), + manifestHash: bundle.manifest.manifestHash, + idempotencyKey: bundle.manifest.idempotencyKey, + retryOfRunId: input.retryOfRunId ?? null, + ...failure, + }, + completedAt: failedAt, + }); + } + }, + + readRun: async (connectionId: string, runId: string, companyId: string): Promise => { + const row = await getRunRow(connectionId, runId, companyId); + return runFromRow(row); + }, + + cancelRun: async (connectionId: string, runId: string, companyId: string): Promise => { + const row = await getRunRow(connectionId, runId, companyId); + if (row.status !== "running") return runFromRow(row); + const connection = await getConnectionRow(connectionId, companyId); + if (row.remoteRunId) { + await remotePost(connection, `/api/upstream-import-runs/${encodeURIComponent(row.remoteRunId)}/cancel`, {}).catch(() => null); + } + return updateRun(row.id, { + status: "cancelled", + activeStep: "push", + progressPercent: 100, + completedAt: new Date(), + events: [ + ...row.events, + event(new Date().toISOString(), "push", "failed", "Push cancelled locally before remote apply completed."), + ], + }); + }, + + activateRunEntities: async (input: { + connectionId: string; + runId: string; + companyId: string; + entityType: CloudUpstreamActivationEntityType; + }): Promise => { + const row = await getRunRow(input.connectionId, input.runId, input.companyId); + assertActivationEntityType(input.entityType); + if (row.status !== "succeeded") { + throw badRequest("Only succeeded cloud upstream runs can activate imported entities"); + } + + const activatedAt = new Date().toISOString(); + const count = summaryCount(row.summary, input.entityType); + const nextDecision: CloudUpstreamActivationDecision = { + entityType: input.entityType, + count, + status: "activated", + activatedAt, + }; + const report = asRecord(row.report); + const activationChecklist = activationChecklistFromReport(report); + const label = activationEntityLabel(input.entityType, count); + + return updateRun(row.id, { + report: { + ...report, + activationChecklist: { + ...activationChecklist, + [input.entityType]: nextDecision, + }, + }, + events: [ + ...row.events, + event(activatedAt, "activate", "completed", `Activated ${count} imported ${label}.`), + ], + }); + }, + }; + + async function requireCompany(companyId: string) { + const row = await db.select({ id: companies.id }).from(companies).where(eq(companies.id, companyId)).then((rows) => rows[0]); + if (!row) throw notFound("Company was not found"); + } + + async function getConnectionRow(connectionId: string, companyId?: string): Promise { + const row = await db + .select() + .from(cloudUpstreamConnections) + .where(companyId + ? and(eq(cloudUpstreamConnections.id, connectionId), eq(cloudUpstreamConnections.companyId, companyId)) + : eq(cloudUpstreamConnections.id, connectionId)) + .then((rows) => rows[0]); + if (!row) throw notFound("Cloud upstream connection was not found"); + return row; + } + + async function getRunRow(connectionId: string, runId: string, companyId: string): Promise { + const row = await db + .select() + .from(cloudUpstreamRuns) + .where(and( + eq(cloudUpstreamRuns.id, runId), + eq(cloudUpstreamRuns.connectionId, connectionId), + eq(cloudUpstreamRuns.companyId, companyId), + )) + .then((rows) => rows[0]); + if (!row) throw notFound("Cloud upstream run was not found"); + return row; + } + + async function assertNoRunningRun( + connectionId: string, + companyId: string, + database: Pick, + ) { + const [running] = await database + .select({ id: cloudUpstreamRuns.id }) + .from(cloudUpstreamRuns) + .where(and( + eq(cloudUpstreamRuns.connectionId, connectionId), + eq(cloudUpstreamRuns.companyId, companyId), + eq(cloudUpstreamRuns.status, "running"), + )) + .limit(1); + if (running) { + throw conflict("A cloud upstream run is already running for this connection", { runId: running.id }); + } + } + + async function updateRun(runId: string, patch: Partial): Promise { + const [updated] = await db + .update(cloudUpstreamRuns) + .set({ ...patch, updatedAt: new Date() }) + .where(eq(cloudUpstreamRuns.id, runId)) + .returning(); + if (!updated) throw notFound("Cloud upstream run was not found"); + return runFromRow(updated); + } + + async function updateRunIfRunning(runId: string, patch: Partial): Promise { + const [updated] = await db + .update(cloudUpstreamRuns) + .set({ ...patch, updatedAt: new Date() }) + .where(and(eq(cloudUpstreamRuns.id, runId), eq(cloudUpstreamRuns.status, "running"))) + .returning(); + if (updated) return runFromRow(updated); + + const [current] = await db + .select() + .from(cloudUpstreamRuns) + .where(eq(cloudUpstreamRuns.id, runId)) + .limit(1); + if (!current) throw notFound("Cloud upstream run was not found"); + return runFromRow(current); + } + + async function localPreview(connection: ConnectionRow): Promise { + return { + connectionId: connection.id, + sourceCompanyId: connection.companyId, + target: targetFromConnectionRow(connection), + schemaCompatible: connection.targetSchemaMajor === TRANSFER_SCHEMA.major, + summary: await buildSummary(connection.companyId), + warnings: buildWarnings(connection.targetSchemaMajor), + conflicts: [], + generatedAt: new Date().toISOString(), + }; + } + + async function buildSummary(companyId: string): Promise { + const [agentCount, projectCount, goalCount, issueCount, commentCount, routineCount] = await Promise.all([ + db.select({ count: count() }).from(agents).where(eq(agents.companyId, companyId)).then((rows) => rows[0]?.count ?? 0), + db.select({ count: count() }).from(projects).where(eq(projects.companyId, companyId)).then((rows) => rows[0]?.count ?? 0), + db.select({ count: count() }).from(goals).where(eq(goals.companyId, companyId)).then((rows) => rows[0]?.count ?? 0), + db.select({ count: count() }).from(issues).where(eq(issues.companyId, companyId)).then((rows) => rows[0]?.count ?? 0), + db.select({ count: count() }).from(issueComments).where(eq(issueComments.companyId, companyId)).then((rows) => rows[0]?.count ?? 0), + db.select({ count: count() }).from(routines).where(eq(routines.companyId, companyId)).then((rows) => rows[0]?.count ?? 0), + ]); + return [ + { key: "companies", label: "Companies", count: 1 }, + { key: "goals", label: "Goals", count: goalCount }, + { key: "projects", label: "Projects", count: projectCount }, + { key: "agents", label: "Agents", count: agentCount }, + { key: "issues", label: "Issues", count: issueCount }, + { key: "comments", label: "Comments", count: commentCount }, + { key: "routines", label: "Routines", count: routineCount }, + { key: "warnings", label: "Warnings", count: buildWarnings(TRANSFER_SCHEMA.major).length }, + ]; + } + + async function buildBundle(connection: ConnectionRow, mode: "preview" | "apply"): Promise { + const exported = await portability.exportBundle(connection.companyId, { + include: { + company: true, + agents: true, + projects: true, + issues: true, + skills: true, + }, + expandReferencedSkills: true, + }); + const sourceHash = normalizedContentHash({ + manifest: exported.manifest, + files: exported.files, + }); + const source = { + sourceInstanceId: connection.sourceInstanceId, + sourceCompanyId: connection.companyId, + sourceInstanceKeyFingerprint: connection.sourceInstanceFingerprint, + exporterVersion: "paperclip-local-cloud-ui-v1", + sourceSchemaVersion: TRANSFER_SCHEMA.version, + }; + const target = { + targetStackId: connection.targetStackId, + targetCompanyId: connection.targetCompanyId, + targetOrigin: connection.targetOrigin, + supportedSchemaMajor: connection.targetSchemaMajor, + }; + const idempotencyKey = [ + mode, + connection.sourceInstanceId, + connection.companyId, + connection.targetStackId, + sourceHash, + ].join(":"); + return buildLocalUpstreamExportBundle({ + source, + target, + runId: `local-${mode}-${shortHash(idempotencyKey)}`, + idempotencyKey, + entities: buildEntitiesFromPortableExport(connection.companyId, connection.sourceInstanceId, exported), + warnings: exported.warnings.map((message): UpstreamTransferWarning => ({ + code: "local_company_export_warning", + severity: "warning", + message, + })), + featureFlags: ["cloud_sync"], + maxEntitiesPerChunk: DEFAULT_MAX_ENTITIES_PER_CHUNK, + }); + } +} + +async function fetchDiscovery(remoteUrl: string): Promise> { + const parsed = new URL(remoteUrl); + if (parsed.protocol !== "https:" && parsed.hostname !== "localhost" && parsed.hostname !== "127.0.0.1") { + throw badRequest("Cloud upstream targets require HTTPS except localhost development"); + } + const stackId = firstPathSegment(parsed.pathname); + const discoveryUrl = new URL("/.well-known/paperclip-upstream", parsed.origin); + if (stackId) { + discoveryUrl.searchParams.set("stackId", stackId); + } + const response = await fetchWithTimeout(discoveryUrl, undefined, DISCOVERY_FETCH_TIMEOUT_MS); + if (!response.ok) { + throw badRequest(`Cloud upstream discovery failed: ${response.status}`); + } + return await response.json() as Record; +} + +export async function reconcileCloudUpstreamRunsOnStartup(db: Db, now = new Date()): Promise<{ reconciled: number }> { + const runningRows = await db + .select() + .from(cloudUpstreamRuns) + .where(eq(cloudUpstreamRuns.status, "running")); + if (runningRows.length === 0) return { reconciled: 0 }; + + for (const row of runningRows) { + const report = asRecord(row.report); + await db + .update(cloudUpstreamRuns) + .set({ + status: "failed", + activeStep: row.activeStep, + progressPercent: 100, + completedAt: now, + updatedAt: now, + events: [ + ...safeRunEvents(row.events), + event( + now.toISOString(), + cloudUpstreamStep(row.activeStep), + "failed", + "Marked failed on server startup because the previous process stopped while the cloud upstream run was in progress.", + ), + ], + report: { + ...report, + error: optionalString(report.error) ?? "orphaned_running_run", + errorMessage: optionalString(report.errorMessage) + ?? "The server restarted while this cloud upstream run was running, so Paperclip marked it failed instead of leaving it stuck.", + reconciledAt: now.toISOString(), + }, + }) + .where(eq(cloudUpstreamRuns.id, row.id)); + } + + return { reconciled: runningRows.length }; +} + +function firstPathSegment(pathname: string): string | null { + const segment = pathname.split("/").find(Boolean); + return segment && segment.toLowerCase() !== "dashboard" ? segment : null; +} + +function targetFromDiscovery(discovery: Record): CloudUpstreamTarget { + const stack = objectField(discovery, "stack"); + const transfer = objectField(discovery, "transfer"); + const schema = optionalObject(transfer.schema); + const origin = stringField(stack, "origin"); + return { + stackId: stringField(stack, "id"), + stackSlug: optionalString(stack.slug), + stackDisplayName: optionalString(stack.displayName), + companyId: stringField(stack, "companyId"), + primaryHost: optionalString(stack.primaryHost) ?? new URL(origin).host, + origin, + product: optionalString(discovery.product) ?? "Paperclip Cloud", + schemaMajor: optionalNumber(schema?.major) ?? numberField(transfer, "supportedSchemaMajor"), + maxChunkBytes: optionalNumber(transfer.maxChunkBytes) ?? 8 * 1024 * 1024, + }; +} + +function targetFromConnectionRow(row: ConnectionRow): CloudUpstreamTarget { + return { + stackId: row.targetStackId, + stackSlug: row.targetStackSlug, + stackDisplayName: row.targetStackDisplayName, + companyId: row.targetCompanyId, + primaryHost: row.targetPrimaryHost, + origin: row.targetOrigin, + product: row.targetProduct, + schemaMajor: row.targetSchemaMajor, + maxChunkBytes: row.targetMaxChunkBytes, + }; +} + +function connectionFromRow(row: ConnectionRow): CloudUpstreamConnection { + return { + id: row.id, + companyId: row.companyId, + remoteUrl: row.remoteUrl, + target: targetFromConnectionRow(row), + tokenStatus: cloudUpstreamTokenStatus(row.tokenStatus), + scopes: row.scopes, + authorizedGlobalUserId: row.authorizedGlobalUserId, + expiresAt: row.tokenExpiresAt?.toISOString() ?? null, + createdAt: row.createdAt.toISOString(), + updatedAt: row.updatedAt.toISOString(), + lastRunId: row.lastRunId, + }; +} + +function runFromRow(row: RunRow): CloudUpstreamRun { + return { + id: row.id, + connectionId: row.connectionId, + companyId: row.companyId, + status: cloudUpstreamRunStatus(row.status), + activeStep: cloudUpstreamStep(row.activeStep), + progressPercent: row.progressPercent, + dryRun: row.dryRun, + summary: row.summary, + warnings: row.warnings, + conflicts: row.conflicts, + events: row.events, + targetUrl: row.targetUrl, + report: row.report, + retryOfRunId: row.retryOfRunId, + createdAt: row.createdAt.toISOString(), + updatedAt: row.updatedAt.toISOString(), + completedAt: row.completedAt?.toISOString() ?? null, + }; +} + +function scopesFromDiscovery(discovery: Record): string[] { + const auth = objectField(discovery, "auth"); + const scopes = Array.isArray(auth.scopes) ? auth.scopes.map(String).filter(Boolean) : []; + return scopes.length > 0 ? scopes : [...DEFAULT_SCOPES]; +} + +function consentUrlFromDiscovery(discovery: Record): string { + const pkce = objectField(objectField(discovery, "auth"), "pkce"); + return optionalString(pkce.consentUrl) ?? stringField(pkce, "authorizeUrl"); +} + +function tokenUrlFromDiscovery(discovery: Record): string { + return stringField(objectField(objectField(discovery, "auth"), "pkce"), "tokenUrl"); +} + +function buildWarnings(schemaMajor: number): CloudUpstreamWarning[] { + const warnings: CloudUpstreamWarning[] = [ + { + code: "imported_automations_paused", + severity: "warning", + title: "Automations stay paused", + detail: "Imported agents, routines, and monitors require explicit activation after the push.", + }, + { + code: "unmatched_users_import_as_historical_authors", + severity: "warning", + title: "Unmatched users become historical authors", + detail: "Invite now remains a secondary action after the transfer is complete.", + }, + { + code: "secret_values_redacted", + severity: "warning", + title: "Secret values are not transferred", + detail: "The push carries secret requirements only. Configure cloud secrets before activating automations.", + }, + ]; + if (schemaMajor !== TRANSFER_SCHEMA.major) { + warnings.unshift({ + code: "schema_mismatch", + severity: "blocker", + title: "Cloud stack upgrade required", + detail: `This local build uses upstream schema ${TRANSFER_SCHEMA.major}, but the cloud stack reports schema ${schemaMajor}.`, + }); + } + return warnings; +} + +type LocalUpstreamExportEntityInput = { + key: SourceEntityKey; + body: Record; + dependencies?: SourceEntityKey[]; + warnings?: UpstreamTransferWarning[]; + conflictKeys?: string[]; +}; + +function buildEntitiesFromPortableExport( + localCompanyId: string, + sourceInstanceId: string, + exported: CompanyPortabilityExportResult, +): LocalUpstreamExportEntityInput[] { + const companyKey: SourceEntityKey = { + sourceInstanceId, + sourceCompanyId: localCompanyId, + sourceEntityType: "company", + sourceEntityId: localCompanyId, + sourceNaturalKey: exported.manifest.company?.name ?? localCompanyId, + }; + const entities: LocalUpstreamExportEntityInput[] = [ + { + key: companyKey, + body: { + kind: "paperclip_company_portability_manifest", + manifest: exported.manifest, + rootPath: exported.rootPath, + paperclipExtensionPath: exported.paperclipExtensionPath, + fileCount: Object.keys(exported.files).length, + }, + conflictKeys: [`company:${companyKey.sourceNaturalKey ?? localCompanyId}`], + }, + ]; + + for (const [filePath, entry] of Object.entries(exported.files).sort(([left], [right]) => left.localeCompare(right))) { + entities.push({ + key: { + sourceInstanceId, + sourceCompanyId: localCompanyId, + sourceEntityType: "company_setting", + sourceEntityId: shortHash(filePath), + sourceNaturalKey: filePath, + }, + body: { + kind: "paperclip_portable_file", + path: filePath, + entry: normalizePortableFileEntry(entry), + }, + dependencies: [companyKey], + conflictKeys: [`portable_file:${filePath}`], + }); + } + return entities; +} + +function normalizePortableFileEntry(entry: CompanyPortabilityFileEntry): Record { + if (typeof entry === "string") { + return { encoding: "utf8", data: entry }; + } + return { ...entry }; +} + +function buildLocalUpstreamExportBundle(input: { + source: UpstreamTransferManifest["source"]; + target: UpstreamTransferManifest["target"]; + runId: string; + idempotencyKey: string; + entities: LocalUpstreamExportEntityInput[]; + warnings?: UpstreamTransferWarning[]; + featureFlags?: string[]; + maxEntitiesPerChunk?: number; +}): LocalUpstreamExportBundle { + const entities = input.entities.map((entity) => ({ + record: { + key: entity.key, + contentHash: normalizedContentHash(entity.body), + dependencies: entity.dependencies ?? [], + warnings: entity.warnings ?? [], + }, + body: entity.body, + conflictKeys: entity.conflictKeys, + })); + const chunksWithoutManifestHash = buildLocalChunks(entities, input.maxEntitiesPerChunk ?? DEFAULT_MAX_ENTITIES_PER_CHUNK); + const manifestWithoutHash = { + schema: TRANSFER_SCHEMA, + source: input.source, + target: input.target, + runId: input.runId, + idempotencyKey: input.idempotencyKey, + generatedAt: new Date(0).toISOString(), + entityCount: entities.length, + entities: entities.map((entity) => entity.record), + chunks: chunksWithoutManifestHash.map(({ payload: _payload, ...chunk }) => chunk), + warnings: input.warnings ?? [], + featureFlags: (input.featureFlags ?? ["cloud_sync"]).slice().sort(), + }; + const manifestHash = normalizedContentHash(manifestWithoutHash); + return { + manifest: { + ...manifestWithoutHash, + chunks: chunksWithoutManifestHash.map(({ payload: _payload, ...chunk }) => ({ ...chunk, manifestHash })), + manifestHash, + }, + entities, + chunks: chunksWithoutManifestHash, + }; +} + +function buildLocalChunks(entities: LocalUpstreamExportEntity[], maxEntitiesPerChunk: number): LocalUpstreamExportChunk[] { + if (!Number.isInteger(maxEntitiesPerChunk) || maxEntitiesPerChunk < 1) { + throw new Error("maxEntitiesPerChunk must be a positive integer"); + } + if (entities.length === 0) return []; + + const groups: LocalUpstreamExportEntity[][] = []; + for (let index = 0; index < entities.length; index += maxEntitiesPerChunk) { + groups.push(entities.slice(index, index + maxEntitiesPerChunk)); + } + + return groups.map((group, index) => { + const payload = { + entityKeys: group.map((entity) => entity.record.key), + }; + return { + chunkIndex: index, + totalChunks: groups.length, + byteLength: Buffer.byteLength(canonicalJson(payload)), + sha256: normalizedContentHash(payload), + payload, + }; + }); +} + +async function remoteGet(connection: ConnectionRow, path: string): Promise { + const response = await fetchWithTimeout(`${connection.targetOrigin}${path}`, { + method: "GET", + headers: await proofHeaders(connection, "GET", path), + }, REMOTE_FETCH_TIMEOUT_MS); + return parseRemoteResponse(response); +} + +async function remotePost(connection: ConnectionRow, path: string, body: unknown): Promise { + const response = await fetchWithTimeout(`${connection.targetOrigin}${path}`, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...await proofHeaders(connection, "POST", path), + }, + body: JSON.stringify(body), + }, REMOTE_FETCH_TIMEOUT_MS); + return parseRemoteResponse(response); +} + +async function proofHeaders(connection: ConnectionRow, method: string, pathAndSearch: string): Promise> { + if (!connection.accessToken || !connection.tokenId) { + throw badRequest("Cloud upstream connection is missing an import token"); + } + const accessToken = await unsealCloudUpstreamCredential(connection.accessToken); + const privateKeyPem = await unsealCloudUpstreamCredential(connection.privateKeyPem); + const timestamp = new Date().toISOString(); + const nonce = crypto.randomUUID(); + const payload = [ + method, + new URL(connection.targetOrigin).host.toLowerCase(), + pathAndSearch, + connection.tokenId, + connection.sourceInstanceId, + timestamp, + nonce, + ].join("\n"); + return { + Authorization: `Bearer ${accessToken}`, + "X-Paperclip-Upstream-Source-Instance-Id": connection.sourceInstanceId, + "X-Paperclip-Upstream-Proof-Timestamp": timestamp, + "X-Paperclip-Upstream-Proof-Nonce": nonce, + "X-Paperclip-Upstream-Proof-Signature": sign( + null, + Buffer.from(payload, "utf8"), + privateKeyPem, + ).toString("base64url"), + }; +} + +async function parseRemoteResponse(response: Response): Promise { + const text = await response.text(); + const parsed = text.trim() ? safeParseJson(text) : {}; + if (!response.ok) { + const message = typeof parsed === "object" && parsed !== null && "error" in parsed + ? String((parsed as { error: unknown }).error) + : `Cloud upstream request failed: ${response.status}`; + throw badRequest(message, parsed); + } + return parsed; +} + +async function fetchWithTimeout(input: RequestInfo | URL, init: RequestInit | undefined, timeoutMs: number): Promise { + return fetch(input, { + ...init, + signal: AbortSignal.timeout(timeoutMs), + }); +} + +export async function sealCloudUpstreamCredential(value: string): Promise { + const prepared = await localEncryptedProvider.createSecret({ value }); + return `${CLOUD_CREDENTIAL_PREFIX}${JSON.stringify(prepared.material)}`; +} + +export async function unsealCloudUpstreamCredential(value: string): Promise { + if (!value.startsWith(CLOUD_CREDENTIAL_PREFIX)) return value; + const encoded = value.slice(CLOUD_CREDENTIAL_PREFIX.length); + const parsed = safeParseJson(encoded); + const material = optionalObject(parsed); + if (!material) { + throw badRequest("Invalid encrypted cloud upstream credential material"); + } + return localEncryptedProvider.resolveVersion({ + material, + externalRef: null, + }); +} + +export function cloudUpstreamRemoteFailureReport(error: unknown): { + error: string; + errorMessage?: string; + details?: unknown; +} { + const fallback = error instanceof Error ? error.message : String(error); + if (!(error instanceof HttpError)) { + return { error: fallback }; + } + const remote = remoteErrorBody(error.details); + return { + error: remote.error ?? error.message, + ...(remote.message ? { errorMessage: remote.message } : {}), + ...(error.details !== undefined ? { details: error.details } : {}), + }; +} + +function remoteErrorBody(details: unknown): { error?: string; message?: string } { + const record = optionalObject(details); + if (!record) return {}; + return { + error: optionalString(record.error) ?? undefined, + message: optionalString(record.message) ?? undefined, + }; +} + +async function postJson(url: string, body: unknown): Promise { + const response = await fetchWithTimeout(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + }, DISCOVERY_FETCH_TIMEOUT_MS); + const payload = await response.json().catch(() => null); + if (!response.ok) { + throw badRequest((payload as { error?: string } | null)?.error ?? `Cloud upstream request failed: ${response.status}`); + } + return payload as T; +} + +function remoteRunIdFromResponse(value: unknown): string { + const record = asRecord(value); + const run = asRecord(record.run); + const id = optionalString(run.id); + if (!id) throw badRequest("Remote upstream importer did not return a run id"); + return id; +} + +function warningsFromRemote(value: unknown): CloudUpstreamWarning[] { + const record = asRecord(value); + const warnings = Array.isArray(record.warnings) ? record.warnings : []; + return warnings.map((warning, index): CloudUpstreamWarning => { + const item = asRecord(warning); + const code = optionalString(item.code) ?? `remote_warning_${index}`; + const severity = item.severity === "blocker" ? "blocker" : "warning"; + const message = optionalString(item.message) ?? optionalString(item.detail) ?? "Remote importer warning."; + return { + code, + severity, + title: titleFromCode(code), + detail: message, + }; + }); +} + +function conflictsFromRemote(value: unknown): CloudUpstreamConflict[] { + const record = asRecord(value); + const conflicts = Array.isArray(record.conflicts) ? record.conflicts : []; + return conflicts.map((conflict, index): CloudUpstreamConflict => { + const item = asRecord(conflict); + const source = asRecord(item.source); + return { + id: optionalString(item.id) ?? `remote-conflict-${index}`, + entityType: optionalString(item.entityType) ?? optionalString(source.sourceEntityType) ?? "entity", + sourceLabel: optionalString(item.sourceLabel) ?? optionalString(source.sourceNaturalKey) ?? optionalString(source.sourceEntityId) ?? "Source entity", + targetLabel: optionalString(item.targetLabel) ?? optionalString(item.targetEntityId) ?? "Cloud entity", + plannedAction: "blocked", + reason: optionalString(item.reason) ?? "Remote importer reported a conflict.", + }; + }); +} + +function eventsFromRemote(value: unknown): CloudUpstreamRunEvent[] { + const record = asRecord(value); + const events = Array.isArray(record.events) ? record.events : []; + return events.slice(-25).map((remote, index) => { + const item = asRecord(remote); + const action = optionalString(item.action) ?? "updated"; + return event( + optionalString(item.createdAt) ?? new Date().toISOString(), + "verify", + action.includes("created") ? "created" : "updated", + `Cloud importer ${action.replace(/_/g, " ")}${index >= 0 ? "." : "."}`, + ); + }); +} + +function safeRunEvents(value: unknown): CloudUpstreamRunEvent[] { + return Array.isArray(value) ? value as CloudUpstreamRunEvent[] : []; +} + +function assertActivationEntityType(value: string): asserts value is CloudUpstreamActivationEntityType { + if (value !== "agents" && value !== "routines" && value !== "monitors") { + throw badRequest("entityType must be agents, routines, or monitors"); + } +} + +function summaryCount(summary: unknown, key: CloudUpstreamActivationEntityType): number { + if (!Array.isArray(summary)) return 0; + const item = summary.find((entry) => asRecord(entry).key === key); + const count = asRecord(item).count; + return typeof count === "number" && Number.isFinite(count) ? count : 0; +} + +function activationChecklistFromReport(report: Record): Record { + const value = asRecord(report.activationChecklist); + const decisions: Record = {}; + for (const [key, decision] of Object.entries(value)) { + if (key !== "agents" && key !== "routines" && key !== "monitors") continue; + const item = asRecord(decision); + decisions[key] = { + entityType: key, + count: typeof item.count === "number" && Number.isFinite(item.count) ? item.count : 0, + status: item.status === "activated" ? "activated" : "paused", + activatedAt: optionalString(item.activatedAt), + }; + } + return decisions; +} + +function activationEntityLabel(entityType: CloudUpstreamActivationEntityType, count: number): string { + const singular = entityType === "agents" ? "agent" : entityType === "routines" ? "routine" : "monitor"; + return `${singular}${count === 1 ? "" : "s"}`; +} + +function mergeWarnings(base: CloudUpstreamWarning[], extra: CloudUpstreamWarning[]): CloudUpstreamWarning[] { + const byCode = new Map(); + for (const warning of [...base, ...extra]) byCode.set(warning.code, warning); + return [...byCode.values()]; +} + +function event( + at: string, + phase: CloudUpstreamRunEvent["phase"], + type: CloudUpstreamRunEvent["type"], + message: string, +): CloudUpstreamRunEvent { + return { + id: crypto.randomUUID(), + at, + phase, + type, + message, + }; +} + +function normalizedContentHash(value: unknown): NormalizedSha256 { + return `sha256:${crypto.createHash("sha256").update(canonicalJson(value)).digest("hex")}`; +} + +function canonicalJson(value: unknown): string { + return JSON.stringify(sortJson(value)); +} + +function sortJson(value: unknown): unknown { + if (Array.isArray(value)) return value.map(sortJson); + if (typeof value !== "object" || value === null) return value; + return Object.fromEntries( + Object.entries(value as Record) + .sort(([left], [right]) => left.localeCompare(right)) + .map(([key, entry]) => [key, sortJson(entry)]), + ); +} + +function shortHash(value: string): string { + return crypto.createHash("sha256").update(value).digest("hex").slice(0, 12); +} + +function sourceEntityKeyString(key: SourceEntityKey): string { + return [key.sourceInstanceId, key.sourceCompanyId, key.sourceEntityType, key.sourceEntityId] + .map((part) => encodeURIComponent(part)) + .join("/"); +} + +function titleFromCode(code: string): string { + return code + .replace(/_/g, " ") + .replace(/\b\w/g, (letter) => letter.toUpperCase()); +} + +function objectField(value: Record, key: string): Record { + const field = value[key]; + if (!field || typeof field !== "object" || Array.isArray(field)) { + throw badRequest(`Cloud upstream discovery missing ${key}`); + } + return field as Record; +} + +function stringField(value: Record, key: string): string { + const field = value[key]; + if (typeof field !== "string" || field.length === 0) { + throw badRequest(`Cloud upstream discovery missing ${key}`); + } + return field; +} + +function numberField(value: Record, key: string): number { + const field = value[key]; + if (typeof field !== "number" || !Number.isFinite(field)) { + throw badRequest(`Cloud upstream discovery missing ${key}`); + } + return field; +} + +function optionalString(value: unknown): string | null { + return typeof value === "string" && value.length > 0 ? value : null; +} + +function optionalNumber(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function optionalObject(value: unknown): Record | null { + return value && typeof value === "object" && !Array.isArray(value) ? value as Record : null; +} + +function asRecord(value: unknown): Record { + return value && typeof value === "object" && !Array.isArray(value) ? value as Record : {}; +} + +function safeParseJson(text: string): unknown { + try { + return JSON.parse(text); + } catch { + return text; + } +} + +function cloudUpstreamTokenStatus(value: string): CloudUpstreamConnection["tokenStatus"] { + return value === "connected" || value === "expired" || value === "revoked" ? value : "pending"; +} + +function cloudUpstreamRunStatus(value: string): CloudUpstreamRun["status"] { + return value === "previewed" || value === "running" || value === "succeeded" || value === "failed" || value === "cancelled" + ? value + : "failed"; +} + +function cloudUpstreamStep(value: string): CloudUpstreamRun["activeStep"] { + return value === "connect" || value === "scan" || value === "preview" || value === "push" || value === "verify" || value === "activate" + ? value + : "push"; +} diff --git a/server/src/services/companies.ts b/server/src/services/companies.ts index 17905f11..8704b986 100644 --- a/server/src/services/companies.ts +++ b/server/src/services/companies.ts @@ -265,7 +265,17 @@ export function companyService(db: Db) { remove: (id: string) => db.transaction(async (tx) => { // Delete from child tables in dependency order + const companyRunIds = await tx + .select({ id: heartbeatRuns.id }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.companyId, id)); + await tx.delete(heartbeatRunEvents).where(eq(heartbeatRunEvents.companyId, id)); + if (companyRunIds.length > 0) { + await tx + .delete(heartbeatRunEvents) + .where(inArray(heartbeatRunEvents.runId, companyRunIds.map((run) => run.id))); + } await tx.delete(agentTaskSessions).where(eq(agentTaskSessions.companyId, id)); await tx.delete(activityLog).where(eq(activityLog.companyId, id)); await tx.delete(heartbeatRuns).where(eq(heartbeatRuns.companyId, id)); diff --git a/server/src/services/index.ts b/server/src/services/index.ts index a8570d2d..18857b7c 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -59,6 +59,7 @@ export type { } from "./authorization.js"; export { boardAuthService } from "./board-auth.js"; export { instanceSettingsService } from "./instance-settings.js"; +export { cloudUpstreamService, reconcileCloudUpstreamRunsOnStartup } from "./cloud-upstreams.js"; export { companyPortabilityService } from "./company-portability.js"; export { environmentService } from "./environments.js"; export { executionWorkspaceService } from "./execution-workspaces.js"; diff --git a/server/src/services/instance-settings.ts b/server/src/services/instance-settings.ts index c447a920..8b02f57c 100644 --- a/server/src/services/instance-settings.ts +++ b/server/src/services/instance-settings.ts @@ -15,9 +15,11 @@ import { import { eq } from "drizzle-orm"; const DEFAULT_SINGLETON_KEY = "default"; +const instanceGeneralSettingsStorageSchema = instanceGeneralSettingsSchema.strip(); +const instanceExperimentalSettingsStorageSchema = instanceExperimentalSettingsSchema.strip(); function normalizeGeneralSettings(raw: unknown): InstanceGeneralSettings { - const parsed = instanceGeneralSettingsSchema.safeParse(raw ?? {}); + const parsed = instanceGeneralSettingsStorageSchema.safeParse(raw ?? {}); if (parsed.success) { return { censorUsernameInLogs: parsed.data.censorUsernameInLogs ?? false, @@ -35,12 +37,13 @@ function normalizeGeneralSettings(raw: unknown): InstanceGeneralSettings { }; } -function normalizeExperimentalSettings(raw: unknown): InstanceExperimentalSettings { - const parsed = instanceExperimentalSettingsSchema.safeParse(raw ?? {}); +export function normalizeExperimentalSettings(raw: unknown): InstanceExperimentalSettings { + const parsed = instanceExperimentalSettingsStorageSchema.safeParse(raw ?? {}); if (parsed.success) { return { enableEnvironments: parsed.data.enableEnvironments ?? false, enableIsolatedWorkspaces: parsed.data.enableIsolatedWorkspaces ?? false, + enableCloudSync: parsed.data.enableCloudSync ?? false, autoRestartDevServerWhenIdle: parsed.data.autoRestartDevServerWhenIdle ?? false, enableIssueGraphLivenessAutoRecovery: parsed.data.enableIssueGraphLivenessAutoRecovery ?? false, issueGraphLivenessAutoRecoveryLookbackHours: @@ -51,6 +54,7 @@ function normalizeExperimentalSettings(raw: unknown): InstanceExperimentalSettin return { enableEnvironments: false, enableIsolatedWorkspaces: false, + enableCloudSync: false, autoRestartDevServerWhenIdle: false, enableIssueGraphLivenessAutoRecovery: false, issueGraphLivenessAutoRecoveryLookbackHours: diff --git a/ui/src/App.tsx b/ui/src/App.tsx index abc17fc2..905804a9 100644 --- a/ui/src/App.tsx +++ b/ui/src/App.tsx @@ -32,6 +32,8 @@ import { CompanySettings } from "./pages/CompanySettings"; import { CompanyEnvironments } from "./pages/CompanyEnvironments"; import { CompanySettingsPluginPage } from "./pages/CompanySettingsPluginPage"; import { CompanyAccess, CompanyAccessLegacyRoute } from "./pages/CompanyAccess"; +import { CloudUpstream } from "./pages/CloudUpstream"; +import { CloudUpstreamUxLab } from "./pages/CloudUpstreamUxLab"; import { CompanyInvites } from "./pages/CompanyInvites"; import { CompanySkills } from "./pages/CompanySkills"; import { Secrets } from "./pages/Secrets"; @@ -72,6 +74,7 @@ function boardRoutes() { } /> } /> } /> + } /> } /> } /> } /> @@ -279,6 +282,7 @@ export function App() { } /> } /> } /> + } /> }> } /> diff --git a/ui/src/api/cloudUpstreams.ts b/ui/src/api/cloudUpstreams.ts new file mode 100644 index 00000000..48ddff5c --- /dev/null +++ b/ui/src/api/cloudUpstreams.ts @@ -0,0 +1,40 @@ +import type { + CloudUpstreamActivationEntityType, + CloudUpstreamConnectStartResponse, + CloudUpstreamConnection, + CloudUpstreamPreview, + CloudUpstreamRun, + CloudUpstreamsState, +} from "@paperclipai/shared"; +import { api } from "./client"; + +export const cloudUpstreamsApi = { + list: (companyId: string) => + api.get(`/cloud-upstreams?companyId=${encodeURIComponent(companyId)}`), + startConnect: (input: { companyId: string; remoteUrl: string; redirectUri: string }) => + api.post("/cloud-upstreams/connect/start", input), + finishConnect: (input: { pendingConnectionId: string; code: string; state: string }) => + api.post("/cloud-upstreams/connect/finish", input), + preview: (connectionId: string, input: { companyId: string }) => + api.post(`/cloud-upstreams/${encodeURIComponent(connectionId)}/push-runs/preview`, input), + createRun: (connectionId: string, input: { companyId: string; retryOfRunId?: string | null }) => + api.post(`/cloud-upstreams/${encodeURIComponent(connectionId)}/push-runs`, input ?? {}), + getRun: (connectionId: string, runId: string, companyId: string) => + api.get( + `/cloud-upstreams/${encodeURIComponent(connectionId)}/push-runs/${encodeURIComponent(runId)}?companyId=${encodeURIComponent(companyId)}`, + ), + cancelRun: (connectionId: string, runId: string, input: { companyId: string }) => + api.post( + `/cloud-upstreams/${encodeURIComponent(connectionId)}/push-runs/${encodeURIComponent(runId)}/cancel`, + input, + ), + activateEntities: ( + connectionId: string, + runId: string, + input: { companyId: string; entityType: CloudUpstreamActivationEntityType }, + ) => + api.post( + `/cloud-upstreams/${encodeURIComponent(connectionId)}/push-runs/${encodeURIComponent(runId)}/activation`, + input, + ), +}; diff --git a/ui/src/components/CompanySettingsSidebar.test.tsx b/ui/src/components/CompanySettingsSidebar.test.tsx index 6514cd4a..cdff2d8b 100644 --- a/ui/src/components/CompanySettingsSidebar.test.tsx +++ b/ui/src/components/CompanySettingsSidebar.test.tsx @@ -11,6 +11,9 @@ const mockSidebarBadgesApi = vi.hoisted(() => ({ get: vi.fn(), })); const mockUsePluginSlots = vi.hoisted(() => vi.fn()); +const mockInstanceSettingsApi = vi.hoisted(() => ({ + getExperimental: vi.fn(), +})); vi.mock("@/lib/router", () => ({ Link: ({ @@ -66,6 +69,10 @@ vi.mock("@/plugins/slots", () => ({ usePluginSlots: mockUsePluginSlots, })); +vi.mock("@/api/instanceSettings", () => ({ + instanceSettingsApi: mockInstanceSettingsApi, +})); + // eslint-disable-next-line @typescript-eslint/no-explicit-any (globalThis as any).IS_REACT_ACT_ENVIRONMENT = true; @@ -93,6 +100,9 @@ describe("CompanySettingsSidebar", () => { isLoading: false, errorMessage: null, }); + mockInstanceSettingsApi.getExperimental.mockResolvedValue({ + enableCloudSync: false, + }); }); afterEach(() => { @@ -121,6 +131,7 @@ describe("CompanySettingsSidebar", () => { expect(container.textContent).toContain("General"); expect(container.textContent).toContain("Environments"); expect(container.textContent).toContain("Members"); + expect(container.textContent).not.toContain("Cloud upstream"); expect(container.textContent).toContain("Invites"); expect(container.textContent).toContain("Secrets"); expect(sidebarNavItemMock).toHaveBeenCalledWith( @@ -210,4 +221,36 @@ describe("CompanySettingsSidebar", () => { root.unmount(); }); }); + + it("shows cloud upstream only when cloud sync is enabled", async () => { + mockInstanceSettingsApi.getExperimental.mockResolvedValue({ + enableCloudSync: true, + }); + const root = createRoot(container); + const queryClient = new QueryClient({ + defaultOptions: { queries: { retry: false } }, + }); + + await act(async () => { + root.render( + + + , + ); + }); + await flushReact(); + + expect(container.textContent).toContain("Cloud upstream"); + expect(sidebarNavItemMock).toHaveBeenCalledWith( + expect.objectContaining({ + to: "/company/settings/cloud-upstream", + label: "Cloud upstream", + end: true, + }), + ); + + await act(async () => { + root.unmount(); + }); + }); }); diff --git a/ui/src/components/CompanySettingsSidebar.tsx b/ui/src/components/CompanySettingsSidebar.tsx index ed84e9ba..ad05f4b1 100644 --- a/ui/src/components/CompanySettingsSidebar.tsx +++ b/ui/src/components/CompanySettingsSidebar.tsx @@ -1,6 +1,7 @@ import { useQuery } from "@tanstack/react-query"; -import { ChevronLeft, KeyRound, MailPlus, MonitorCog, Puzzle, Settings, SlidersHorizontal, Users } from "lucide-react"; +import { ChevronLeft, CloudUpload, KeyRound, MailPlus, MonitorCog, Puzzle, Settings, SlidersHorizontal, Users } from "lucide-react"; import { sidebarBadgesApi } from "@/api/sidebarBadges"; +import { instanceSettingsApi } from "@/api/instanceSettings"; import { ApiError } from "@/api/client"; import { Link } from "@/lib/router"; import { queryKeys } from "@/lib/queryKeys"; @@ -35,6 +36,11 @@ export function CompanySettingsSidebar() { retry: false, refetchInterval: 15_000, }); + const { data: experimentalSettings } = useQuery({ + queryKey: queryKeys.instance.experimentalSettings, + queryFn: () => instanceSettingsApi.getExperimental(), + }); + const showCloudUpstream = experimentalSettings?.enableCloudSync === true; return (