import { createHash } from "node:crypto"; import { readdir, readFile } from "node:fs/promises"; import path from "node:path"; import { and, eq, sql } from "drizzle-orm"; import type { SQL } from "drizzle-orm"; import type { Db } from "@paperclipai/db"; import { pluginDatabaseNamespaces, pluginMigrations, plugins, } from "@paperclipai/db"; import type { PaperclipPluginManifestV1, PluginDatabaseCoreReadTable, PluginMigrationRecord, } from "@paperclipai/shared"; const IDENTIFIER_RE = /^[A-Za-z_][A-Za-z0-9_]*$/; const MAX_POSTGRES_IDENTIFIER_LENGTH = 63; type SqlRef = { schema: string; table: string; keyword: string }; export type PluginDatabaseRuntimeResult> = { rows?: T[]; rowCount?: number; }; export function derivePluginDatabaseNamespace( pluginKey: string, namespaceSlug?: string, ): string { const hash = createHash("sha256").update(pluginKey).digest("hex").slice(0, 10); const slug = (namespaceSlug ?? pluginKey) .toLowerCase() .replace(/[^a-z0-9_]+/g, "_") .replace(/^_+|_+$/g, "") .replace(/_+/g, "_") .slice(0, 36) || "plugin"; const namespace = `plugin_${slug}_${hash}`; return namespace.slice(0, MAX_POSTGRES_IDENTIFIER_LENGTH); } function assertIdentifier(value: string, label = "identifier"): string { if (!IDENTIFIER_RE.test(value)) { throw new Error(`Unsafe SQL ${label}: ${value}`); } return value; } function quoteIdentifier(value: string): string { return `"${assertIdentifier(value).replaceAll("\"", "\"\"")}"`; } function splitSqlStatements(input: string): string[] { const statements: string[] = []; let start = 0; let quote: "'" | "\"" | null = null; let lineComment = false; let blockComment = false; for (let i = 0; i < input.length; i += 1) { const char = input[i]!; const next = input[i + 1]; if (lineComment) { if (char === "\n") lineComment = false; continue; } if (blockComment) { if (char === "*" && next === "/") { blockComment = false; i += 1; } continue; } if (quote) { if (char === quote) { if (next === quote) { i += 1; } else { quote = null; } } continue; } if (char === "-" && next === "-") { lineComment = true; i += 1; continue; } if (char === "/" && next === "*") { blockComment = true; i += 1; continue; } if (char === "'" || char === "\"") { quote = char; continue; } if (char === ";") { const statement = input.slice(start, i).trim(); if (statement) statements.push(statement); start = i + 1; } } const trailing = input.slice(start).trim(); if (trailing) statements.push(trailing); return statements; } function stripSqlForKeywordScan(input: string): string { return input .replace(/'([^']|'')*'/g, "''") .replace(/"([^"]|"")*"/g, "\"\"") .replace(/--.*$/gm, "") .replace(/\/\*[\s\S]*?\*\//g, ""); } function normaliseSql(input: string): string { return stripSqlForKeywordScan(input).replace(/\s+/g, " ").trim().toLowerCase(); } function extractQualifiedRefs(statement: string): SqlRef[] { const refs: SqlRef[] = []; const patterns = [ /\b(from|join|references|into|update)\s+"?([A-Za-z_][A-Za-z0-9_]*)"?\."?([A-Za-z_][A-Za-z0-9_]*)"?/gi, /\b(alter\s+table|create\s+table|create\s+view|drop\s+table|truncate\s+table)\s+(?:if\s+(?:not\s+)?exists\s+)?"?([A-Za-z_][A-Za-z0-9_]*)"?\."?([A-Za-z_][A-Za-z0-9_]*)"?/gi, ]; for (const pattern of patterns) { for (const match of statement.matchAll(pattern)) { refs.push({ keyword: match[1]!.toLowerCase(), schema: match[2]!, table: match[3]! }); } } return refs; } function assertAllowedPublicRead( ref: SqlRef, allowedCoreReadTables: ReadonlySet, ): void { if (ref.schema !== "public") return; if (!allowedCoreReadTables.has(ref.table)) { throw new Error(`Plugin SQL references public.${ref.table}, which is not whitelisted`); } if (!["from", "join", "references"].includes(ref.keyword)) { throw new Error(`Plugin SQL cannot mutate or define objects in public.${ref.table}`); } } function assertNoBannedSql(statement: string): void { const normalized = normaliseSql(statement); const banned = [ /\bcreate\s+extension\b/, /\bcreate\s+(?:event\s+)?trigger\b/, /\bcreate\s+(?:or\s+replace\s+)?function\b/, /\bcreate\s+language\b/, /\bgrant\b/, /\brevoke\b/, /\bsecurity\s+definer\b/, /\bcopy\b/, /\bcall\b/, /\bdo\s+(?:\$\$|language\b)/, ]; const matched = banned.find((pattern) => pattern.test(normalized)); if (matched) { throw new Error(`Plugin SQL contains a disallowed statement or clause: ${matched.source}`); } } export function validatePluginMigrationStatement( statement: string, namespace: string, coreReadTables: readonly PluginDatabaseCoreReadTable[] = [], ): void { assertIdentifier(namespace, "namespace"); assertNoBannedSql(statement); const normalized = normaliseSql(statement); if (/^\s*(drop|truncate)\b/.test(normalized)) { throw new Error("Destructive plugin migrations are not allowed in Phase 1"); } const ddlAllowed = /^(create|alter|comment)\b/.test(normalized); if (!ddlAllowed) { throw new Error("Plugin migrations may contain DDL statements only"); } const refs = extractQualifiedRefs(statement); if (refs.length === 0 && !normalized.startsWith("comment ")) { throw new Error("Plugin migration objects must use fully qualified schema names"); } const allowedCoreReadTables = new Set(coreReadTables); for (const ref of refs) { if (ref.schema === namespace) continue; if (ref.schema === "public") { assertAllowedPublicRead(ref, allowedCoreReadTables); continue; } throw new Error(`Plugin SQL references schema "${ref.schema}" outside namespace "${namespace}"`); } } export function validatePluginRuntimeQuery( query: string, namespace: string, coreReadTables: readonly PluginDatabaseCoreReadTable[] = [], ): void { const statements = splitSqlStatements(query); if (statements.length !== 1) { throw new Error("Plugin runtime SQL must contain exactly one statement"); } const statement = statements[0]!; assertNoBannedSql(statement); const normalized = normaliseSql(statement); if (!normalized.startsWith("select ") && !normalized.startsWith("with ")) { throw new Error("ctx.db.query only allows SELECT statements"); } if (/\b(insert|update|delete|alter|create|drop|truncate)\b/.test(normalized)) { throw new Error("ctx.db.query cannot contain mutation or DDL keywords"); } const allowedCoreReadTables = new Set(coreReadTables); for (const ref of extractQualifiedRefs(statement)) { if (ref.schema === namespace) continue; if (ref.schema === "public") { assertAllowedPublicRead(ref, allowedCoreReadTables); continue; } throw new Error(`ctx.db.query cannot read schema "${ref.schema}"`); } } export function validatePluginRuntimeExecute(query: string, namespace: string): void { const statements = splitSqlStatements(query); if (statements.length !== 1) { throw new Error("Plugin runtime SQL must contain exactly one statement"); } const statement = statements[0]!; assertNoBannedSql(statement); const normalized = normaliseSql(statement); if (!/^(insert\s+into|update|delete\s+from)\b/.test(normalized)) { throw new Error("ctx.db.execute only allows INSERT, UPDATE, or DELETE"); } if (/\b(alter|create|drop|truncate)\b/.test(normalized)) { throw new Error("ctx.db.execute cannot contain DDL keywords"); } const refs = extractQualifiedRefs(statement); const target = refs.find((ref) => ["into", "update", "from"].includes(ref.keyword)); if (!target || target.schema !== namespace) { throw new Error(`ctx.db.execute target must be inside plugin namespace "${namespace}"`); } for (const ref of refs) { if (ref.schema !== namespace) { throw new Error("ctx.db.execute cannot reference public or other non-plugin schemas"); } } } function bindSql(statement: string, params: readonly unknown[] = []): SQL { // Safe only after callers run the plugin SQL validators above. if (params.length === 0) return sql.raw(statement); const chunks: SQL[] = []; let cursor = 0; const placeholderPattern = /\$(\d+)/g; const seen = new Set(); for (const match of statement.matchAll(placeholderPattern)) { const index = Number(match[1]); if (!Number.isInteger(index) || index < 1 || index > params.length) { throw new Error(`SQL placeholder $${match[1]} has no matching parameter`); } chunks.push(sql.raw(statement.slice(cursor, match.index))); chunks.push(sql`${params[index - 1]}`); seen.add(index); cursor = match.index! + match[0].length; } chunks.push(sql.raw(statement.slice(cursor))); if (seen.size !== params.length) { throw new Error("Every ctx.db parameter must be referenced by a $n placeholder"); } return sql.join(chunks, sql.raw("")); } async function listSqlMigrationFiles(migrationsDir: string): Promise { const entries = await readdir(migrationsDir, { withFileTypes: true }); return entries .filter((entry) => entry.isFile() && entry.name.endsWith(".sql")) .map((entry) => entry.name) .sort((a, b) => a.localeCompare(b)); } function resolveMigrationsDir(packageRoot: string, migrationsDir: string): string { const resolvedRoot = path.resolve(packageRoot); const resolvedDir = path.resolve(resolvedRoot, migrationsDir); const relative = path.relative(resolvedRoot, resolvedDir); if (relative.startsWith("..") || path.isAbsolute(relative)) { throw new Error(`Plugin migrationsDir escapes package root: ${migrationsDir}`); } return resolvedDir; } type PluginDatabaseClient = Pick; type PluginDatabaseRootClient = PluginDatabaseClient & Partial>; export interface ApplyPluginMigrationsOptions { /** * Persist failed migration ledger rows. Fresh install uses false because the * caller owns a larger transaction and must roll back the plugin row and * namespace together. */ persistFailure?: boolean; } export function pluginDatabaseService(db: PluginDatabaseRootClient) { async function getPluginRecord(pluginId: string) { const rows = await db.select().from(plugins).where(eq(plugins.id, pluginId)).limit(1); const plugin = rows[0]; if (!plugin) throw new Error(`Plugin not found: ${pluginId}`); return plugin; } async function ensureNamespaceWithClient( client: PluginDatabaseClient, pluginId: string, manifest: PaperclipPluginManifestV1, ) { if (!manifest.database) return null; const namespaceName = derivePluginDatabaseNamespace( manifest.id, manifest.database.namespaceSlug, ); await client.execute(sql.raw(`CREATE SCHEMA IF NOT EXISTS ${quoteIdentifier(namespaceName)}`)); const rows = await client .insert(pluginDatabaseNamespaces) .values({ pluginId, pluginKey: manifest.id, namespaceName, namespaceMode: "schema", status: "active", }) .onConflictDoUpdate({ target: pluginDatabaseNamespaces.pluginId, set: { pluginKey: manifest.id, namespaceName, namespaceMode: "schema", status: "active", updatedAt: new Date(), }, }) .returning(); return rows[0] ?? null; } async function ensureNamespace(pluginId: string, manifest: PaperclipPluginManifestV1) { return ensureNamespaceWithClient(db, pluginId, manifest); } async function getNamespace(pluginId: string) { const rows = await db .select() .from(pluginDatabaseNamespaces) .where(eq(pluginDatabaseNamespaces.pluginId, pluginId)) .limit(1); return rows[0] ?? null; } async function getRuntimeNamespace(pluginId: string) { const namespace = await getNamespace(pluginId); if (!namespace || namespace.status !== "active") { throw new Error("Plugin database namespace is not active"); } return namespace.namespaceName; } async function recordMigrationFailure(client: PluginDatabaseClient, input: { pluginId: string; pluginKey: string; namespaceName: string; migrationKey: string; checksum: string; pluginVersion: string; error: unknown; }): Promise { const message = input.error instanceof Error ? input.error.message : String(input.error); await client .insert(pluginMigrations) .values({ pluginId: input.pluginId, pluginKey: input.pluginKey, namespaceName: input.namespaceName, migrationKey: input.migrationKey, checksum: input.checksum, pluginVersion: input.pluginVersion, status: "failed", errorMessage: message, }) .onConflictDoUpdate({ target: [pluginMigrations.pluginId, pluginMigrations.migrationKey], set: { checksum: input.checksum, pluginVersion: input.pluginVersion, status: "failed", errorMessage: message, startedAt: new Date(), appliedAt: null, }, }); await client .update(pluginDatabaseNamespaces) .set({ status: "migration_failed", updatedAt: new Date() }) .where(eq(pluginDatabaseNamespaces.pluginId, input.pluginId)); } return { ensureNamespace, async applyMigrations( pluginId: string, manifest: PaperclipPluginManifestV1, packageRoot: string, options: ApplyPluginMigrationsOptions = {}, ) { if (!manifest.database) return null; const namespace = await ensureNamespace(pluginId, manifest); if (!namespace) return null; const migrationDir = resolveMigrationsDir(packageRoot, manifest.database.migrationsDir); const migrationFiles = await listSqlMigrationFiles(migrationDir); const coreReadTables = manifest.database.coreReadTables ?? []; const lockKey = Number.parseInt(createHash("sha256").update(pluginId).digest("hex").slice(0, 12), 16); const persistFailure = options.persistFailure ?? true; const applyWithClient = async (client: PluginDatabaseClient) => { await client.execute(sql`SELECT pg_advisory_xact_lock(${lockKey})`); for (const migrationKey of migrationFiles) { const content = await readFile(path.join(migrationDir, migrationKey), "utf8"); const checksum = createHash("sha256").update(content).digest("hex"); const existingRows = await client .select() .from(pluginMigrations) .where(and(eq(pluginMigrations.pluginId, pluginId), eq(pluginMigrations.migrationKey, migrationKey))) .limit(1); const existing = existingRows[0] as PluginMigrationRecord | undefined; if (existing?.status === "applied") { if (existing.checksum !== checksum) { throw new Error(`Plugin migration checksum mismatch for ${migrationKey}`); } continue; } const statements = splitSqlStatements(content); try { if (statements.length === 0) { throw new Error(`Plugin migration ${migrationKey} is empty`); } for (const statement of statements) { validatePluginMigrationStatement(statement, namespace.namespaceName, coreReadTables); await client.execute(sql.raw(statement)); } await client .insert(pluginMigrations) .values({ pluginId, pluginKey: manifest.id, namespaceName: namespace.namespaceName, migrationKey, checksum, pluginVersion: manifest.version, status: "applied", appliedAt: new Date(), }) .onConflictDoUpdate({ target: [pluginMigrations.pluginId, pluginMigrations.migrationKey], set: { checksum, pluginVersion: manifest.version, status: "applied", errorMessage: null, startedAt: new Date(), appliedAt: new Date(), }, }); } catch (error) { if (persistFailure) { await recordMigrationFailure(db, { pluginId, pluginKey: manifest.id, namespaceName: namespace.namespaceName, migrationKey, checksum, pluginVersion: manifest.version, error, }); } throw error; } } }; if (typeof db.transaction === "function") { await db.transaction(async (tx) => applyWithClient(tx as PluginDatabaseClient)); } else { await applyWithClient(db); } return namespace; }, getRuntimeNamespace, async query>(pluginId: string, statement: string, params?: unknown[]): Promise { const plugin = await getPluginRecord(pluginId); const namespace = await getRuntimeNamespace(pluginId); validatePluginRuntimeQuery(statement, namespace, plugin.manifestJson.database?.coreReadTables ?? []); const result = await db.execute(bindSql(statement, params)); return Array.from(result as Iterable); }, async execute(pluginId: string, statement: string, params?: unknown[]): Promise<{ rowCount: number }> { const namespace = await getRuntimeNamespace(pluginId); validatePluginRuntimeExecute(statement, namespace); const result = await db.execute(bindSql(statement, params)); return { rowCount: Number((result as { count?: number | string }).count ?? 0) }; }, }; }