From 92afa0fb674f3369e69040003901451a54e659d9 Mon Sep 17 00:00:00 2001 From: "Pawla Abdul (Bot)" Date: Sun, 12 Apr 2026 15:54:09 +0000 Subject: [PATCH] Fix adapter plugin conflicts caused by concurrent install/reload/delete operations Add an async mutex (promise-chain FIFO queue) to serialise all adapter mutation routes (install, reinstall, reload, delete) so that concurrent requests cannot race on npm, the adapter-plugins.json store, or the in-memory adapter registry. Also switch adapter-plugin-store file writes to atomic write-tmp-then-rename to prevent partial/corrupted reads from concurrent processes. Includes the packageName.trim() fix for whitespace-induced npm failures. 9 new tests covering mutex serialisation, error recovery, FIFO ordering, and atomic store operations. Co-Authored-By: Paperclip --- .../adapter-plugin-concurrency.test.ts | 177 ++++++++ server/src/routes/adapters.ts | 420 ++++++++++-------- server/src/services/adapter-plugin-store.ts | 11 +- 3 files changed, 410 insertions(+), 198 deletions(-) create mode 100644 server/src/__tests__/adapter-plugin-concurrency.test.ts diff --git a/server/src/__tests__/adapter-plugin-concurrency.test.ts b/server/src/__tests__/adapter-plugin-concurrency.test.ts new file mode 100644 index 00000000..2ad25f30 --- /dev/null +++ b/server/src/__tests__/adapter-plugin-concurrency.test.ts @@ -0,0 +1,177 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { withAdapterMutex } from "../routes/adapters.js"; + +// --------------------------------------------------------------------------- +// withAdapterMutex — serialisation tests +// --------------------------------------------------------------------------- + +describe("withAdapterMutex", () => { + it("serialises concurrent calls so they do not overlap", async () => { + const log: string[] = []; + + const taskA = withAdapterMutex(async () => { + log.push("A-start"); + await new Promise((r) => setTimeout(r, 50)); + log.push("A-end"); + return "a"; + }); + + const taskB = withAdapterMutex(async () => { + log.push("B-start"); + await new Promise((r) => setTimeout(r, 10)); + log.push("B-end"); + return "b"; + }); + + const [resultA, resultB] = await Promise.all([taskA, taskB]); + + expect(resultA).toBe("a"); + expect(resultB).toBe("b"); + + // A must fully complete before B starts (FIFO serialisation) + expect(log).toEqual(["A-start", "A-end", "B-start", "B-end"]); + }); + + it("a failed task does not block subsequent tasks", async () => { + const failing = withAdapterMutex(async () => { + throw new Error("boom"); + }); + + await expect(failing).rejects.toThrow("boom"); + + const ok = await withAdapterMutex(async () => "ok"); + expect(ok).toBe("ok"); + }); + + it("preserves FIFO order for many concurrent callers", async () => { + const order: number[] = []; + + const tasks = Array.from({ length: 5 }, (_, i) => + withAdapterMutex(async () => { + order.push(i); + }), + ); + + await Promise.all(tasks); + expect(order).toEqual([0, 1, 2, 3, 4]); + }); + + it("returns the value from the inner function", async () => { + const result = await withAdapterMutex(async () => ({ answer: 42 })); + expect(result).toEqual({ answer: 42 }); + }); + + it("propagates errors from the inner function", async () => { + await expect( + withAdapterMutex(async () => { + throw new TypeError("bad input"); + }), + ).rejects.toThrow("bad input"); + }); +}); + +// --------------------------------------------------------------------------- +// adapter-plugin-store — atomic write tests +// --------------------------------------------------------------------------- + +describe("adapter-plugin-store atomic writes", () => { + // We test the store in an isolated temp directory by stubbing HOME + const originalHome = process.env.HOME; + let tmpDir: string; + + beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "adapter-store-test-")); + process.env.HOME = tmpDir; + // Force the store module to pick up the new HOME by resetting its cache. + // We re-import it fresh for each test. + }); + + afterEach(() => { + process.env.HOME = originalHome; + fs.rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("writeStore creates the file atomically via rename", async () => { + // Dynamically import the store so it uses the stubbed HOME + vi.resetModules(); + const store = await import("../services/adapter-plugin-store.js"); + + const record = { + packageName: "test-adapter", + type: "test_type", + installedAt: new Date().toISOString(), + }; + + store.addAdapterPlugin(record); + + // The store file should exist and contain the record + const storePath = path.join(tmpDir, ".paperclip", "adapter-plugins.json"); + expect(fs.existsSync(storePath)).toBe(true); + + const contents = JSON.parse(fs.readFileSync(storePath, "utf-8")); + expect(contents).toHaveLength(1); + expect(contents[0].packageName).toBe("test-adapter"); + + // No lingering temp files + const dir = path.dirname(storePath); + const tmpFiles = fs.readdirSync(dir).filter((f) => f.endsWith(".tmp")); + expect(tmpFiles).toHaveLength(0); + }); + + it("addAdapterPlugin is idempotent for the same type", async () => { + vi.resetModules(); + const store = await import("../services/adapter-plugin-store.js"); + + const record1 = { + packageName: "pkg-a", + type: "my_adapter", + version: "1.0.0", + installedAt: new Date().toISOString(), + }; + const record2 = { + packageName: "pkg-a", + type: "my_adapter", + version: "2.0.0", + installedAt: new Date().toISOString(), + }; + + store.addAdapterPlugin(record1); + store.addAdapterPlugin(record2); + + const plugins = store.listAdapterPlugins(); + expect(plugins).toHaveLength(1); + expect(plugins[0].version).toBe("2.0.0"); + }); + + it("removeAdapterPlugin removes the correct record", async () => { + vi.resetModules(); + const store = await import("../services/adapter-plugin-store.js"); + + store.addAdapterPlugin({ + packageName: "a", + type: "type_a", + installedAt: new Date().toISOString(), + }); + store.addAdapterPlugin({ + packageName: "b", + type: "type_b", + installedAt: new Date().toISOString(), + }); + + expect(store.listAdapterPlugins()).toHaveLength(2); + + const removed = store.removeAdapterPlugin("type_a"); + expect(removed).toBe(true); + expect(store.listAdapterPlugins()).toHaveLength(1); + expect(store.listAdapterPlugins()[0].type).toBe("type_b"); + }); + + it("removeAdapterPlugin returns false for unknown type", async () => { + vi.resetModules(); + const store = await import("../services/adapter-plugin-store.js"); + expect(store.removeAdapterPlugin("nonexistent")).toBe(false); + }); +}); diff --git a/server/src/routes/adapters.ts b/server/src/routes/adapters.ts index 27e32a06..a6882b8f 100644 --- a/server/src/routes/adapters.ts +++ b/server/src/routes/adapters.ts @@ -46,6 +46,26 @@ import { BUILTIN_ADAPTER_TYPES } from "../adapters/builtin-adapter-types.js"; const execFileAsync = promisify(execFile); +// --------------------------------------------------------------------------- +// Concurrency control — serialise all install / reinstall / reload / delete +// operations so that concurrent requests cannot race on npm, the plugin store +// JSON file, or the in-memory adapter registry. +// --------------------------------------------------------------------------- + +let mutexQueue: Promise = Promise.resolve(); + +/** + * Enqueue `fn` behind any in-flight adapter mutation. Only one mutation runs + * at a time; the rest wait in FIFO order. + */ +export function withAdapterMutex(fn: () => Promise): Promise { + const ticket = mutexQueue.then(fn, fn); // always run `fn` after the queue settles + // Swallow rejections on the queue itself so a failed operation doesn't + // permanently block subsequent ones. + mutexQueue = ticket.then(() => {}, () => {}); + return ticket; +} + // --------------------------------------------------------------------------- // Request / Response types // --------------------------------------------------------------------------- @@ -211,7 +231,7 @@ export function adapterRoutes() { // Strip version suffix if the UI sends "pkg@1.2.3" instead of separating it // e.g. "@henkey/hermes-paperclip-adapter@0.3.0" → packageName + version - let canonicalName = packageName; + let canonicalName = packageName.trim(); let explicitVersion = version; const versionSuffix = packageName.match(/@(\d+\.\d+\.\d+.*)$/); if (versionSuffix) { @@ -224,103 +244,105 @@ export function adapterRoutes() { } } - try { - let installedVersion: string | undefined; - let moduleLocalPath: string | undefined; + await withAdapterMutex(async () => { + try { + let installedVersion: string | undefined; + let moduleLocalPath: string | undefined; - if (!isLocalPath) { - // npm install into the managed directory - const pluginsDir = getAdapterPluginsDir(); - const spec = explicitVersion ? `${canonicalName}@${explicitVersion}` : canonicalName; + if (!isLocalPath) { + // npm install into the managed directory + const pluginsDir = getAdapterPluginsDir(); + const spec = explicitVersion ? `${canonicalName}@${explicitVersion}` : canonicalName; - logger.info({ spec, pluginsDir }, "Installing adapter package via npm"); + logger.info({ spec, pluginsDir }, "Installing adapter package via npm"); - await execFileAsync("npm", ["install", "--no-save", spec], { - cwd: pluginsDir, - timeout: 120_000, - }); + await execFileAsync("npm", ["install", "--no-save", spec], { + cwd: pluginsDir, + timeout: 120_000, + }); - // Read installed version from package.json - try { - const pkgJsonPath = path.join(pluginsDir, "node_modules", canonicalName, "package.json"); - const pkgContent = await import("node:fs/promises"); - const pkgRaw = await pkgContent.readFile(pkgJsonPath, "utf-8"); - const pkg = JSON.parse(pkgRaw); - const v = pkg.version; - installedVersion = - typeof v === "string" && v.trim().length > 0 ? v.trim() : explicitVersion; - } catch { - installedVersion = explicitVersion; - } - } else { - // Local path — normalize (e.g., Windows → WSL) and use the resolved path - moduleLocalPath = path.resolve(await normalizeLocalPath(packageName)); - try { - const pkgRaw = await readFile(path.join(moduleLocalPath, "package.json"), "utf-8"); - const v = JSON.parse(pkgRaw).version; - if (typeof v === "string" && v.trim().length > 0) { - installedVersion = v.trim(); + // Read installed version from package.json + try { + const pkgJsonPath = path.join(pluginsDir, "node_modules", canonicalName, "package.json"); + const pkgContent = await import("node:fs/promises"); + const pkgRaw = await pkgContent.readFile(pkgJsonPath, "utf-8"); + const pkg = JSON.parse(pkgRaw); + const v = pkg.version; + installedVersion = + typeof v === "string" && v.trim().length > 0 ? v.trim() : explicitVersion; + } catch { + installedVersion = explicitVersion; } - } catch { - // leave installedVersion undefined if package.json is missing + } else { + // Local path — normalize (e.g., Windows → WSL) and use the resolved path + moduleLocalPath = path.resolve(await normalizeLocalPath(packageName)); + try { + const pkgRaw = await readFile(path.join(moduleLocalPath, "package.json"), "utf-8"); + const v = JSON.parse(pkgRaw).version; + if (typeof v === "string" && v.trim().length > 0) { + installedVersion = v.trim(); + } + } catch { + // leave installedVersion undefined if package.json is missing + } + } + + // Load and register the adapter (use canonicalName for path resolution) + const adapterModule = await loadExternalAdapterPackage(canonicalName, moduleLocalPath); + + // Check if this type conflicts with a built-in adapter + if (BUILTIN_ADAPTER_TYPES.has(adapterModule.type)) { + res.status(409).json({ + error: `Adapter type "${adapterModule.type}" is a built-in adapter and cannot be overwritten.`, + }); + return; + } + + // Check if already registered (indicates a reinstall/update) + const existing = findServerAdapter(adapterModule.type); + const isReinstall = existing !== null; + if (existing) { + unregisterServerAdapter(adapterModule.type); + logger.info({ type: adapterModule.type }, "Unregistered existing adapter for replacement"); + } + + // Register the new adapter + registerWithSessionManagement(adapterModule); + + // Persist the record (use canonicalName without version suffix) + const record: AdapterPluginRecord = { + packageName: canonicalName, + localPath: moduleLocalPath, + version: installedVersion ?? explicitVersion, + type: adapterModule.type, + installedAt: new Date().toISOString(), + }; + addAdapterPlugin(record); + + logger.info( + { type: adapterModule.type, packageName: canonicalName }, + "External adapter installed and registered", + ); + + res.status(201).json({ + type: adapterModule.type, + packageName: canonicalName, + version: installedVersion ?? explicitVersion, + installedAt: record.installedAt, + requiresRestart: isReinstall, + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + logger.error({ err, packageName }, "Failed to install external adapter"); + + // Distinguish npm errors from load errors + if (message.includes("npm") || message.includes("ERR!")) { + res.status(500).json({ error: `npm install failed: ${message}` }); + } else { + res.status(500).json({ error: `Failed to install adapter: ${message}` }); } } - - // Load and register the adapter (use canonicalName for path resolution) - const adapterModule = await loadExternalAdapterPackage(canonicalName, moduleLocalPath); - - // Check if this type conflicts with a built-in adapter - if (BUILTIN_ADAPTER_TYPES.has(adapterModule.type)) { - res.status(409).json({ - error: `Adapter type "${adapterModule.type}" is a built-in adapter and cannot be overwritten.`, - }); - return; - } - - // Check if already registered (indicates a reinstall/update) - const existing = findServerAdapter(adapterModule.type); - const isReinstall = existing !== null; - if (existing) { - unregisterServerAdapter(adapterModule.type); - logger.info({ type: adapterModule.type }, "Unregistered existing adapter for replacement"); - } - - // Register the new adapter - registerWithSessionManagement(adapterModule); - - // Persist the record (use canonicalName without version suffix) - const record: AdapterPluginRecord = { - packageName: canonicalName, - localPath: moduleLocalPath, - version: installedVersion ?? explicitVersion, - type: adapterModule.type, - installedAt: new Date().toISOString(), - }; - addAdapterPlugin(record); - - logger.info( - { type: adapterModule.type, packageName: canonicalName }, - "External adapter installed and registered", - ); - - res.status(201).json({ - type: adapterModule.type, - packageName: canonicalName, - version: installedVersion ?? explicitVersion, - installedAt: record.installedAt, - requiresRestart: isReinstall, - }); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - logger.error({ err, packageName }, "Failed to install external adapter"); - - // Distinguish npm errors from load errors - if (message.includes("npm") || message.includes("ERR!")) { - res.status(500).json({ error: `npm install failed: ${message}` }); - } else { - res.status(500).json({ error: `Failed to install adapter: ${message}` }); - } - } + }); }); /** @@ -412,53 +434,55 @@ export function adapterRoutes() { return; } - // Check that the adapter exists in the registry - const existing = findServerAdapter(adapterType); - if (!existing) { - res.status(404).json({ - error: `Adapter "${adapterType}" is not registered.`, - }); - return; - } - - // Check that it's an external adapter - const externalRecord = getAdapterPluginByType(adapterType); - if (!externalRecord) { - res.status(404).json({ - error: `Adapter "${adapterType}" is not an externally installed adapter.`, - }); - return; - } - - // If installed via npm (has packageName but no localPath), run npm uninstall - if (externalRecord.packageName && !externalRecord.localPath) { - try { - const pluginsDir = getAdapterPluginsDir(); - await execFileAsync("npm", ["uninstall", externalRecord.packageName], { - cwd: pluginsDir, - timeout: 60_000, + await withAdapterMutex(async () => { + // Check that the adapter exists in the registry + const existing = findServerAdapter(adapterType); + if (!existing) { + res.status(404).json({ + error: `Adapter "${adapterType}" is not registered.`, }); - logger.info( - { type: adapterType, packageName: externalRecord.packageName }, - "npm uninstall completed for external adapter", - ); - } catch (err) { - logger.warn( - { err, type: adapterType, packageName: externalRecord.packageName }, - "npm uninstall failed for external adapter; continuing with unregister", - ); + return; } - } - // Unregister from the runtime registry - unregisterServerAdapter(adapterType); + // Check that it's an external adapter + const externalRecord = getAdapterPluginByType(adapterType); + if (!externalRecord) { + res.status(404).json({ + error: `Adapter "${adapterType}" is not an externally installed adapter.`, + }); + return; + } - // Remove from the persistent store - removeAdapterPlugin(adapterType); + // If installed via npm (has packageName but no localPath), run npm uninstall + if (externalRecord.packageName && !externalRecord.localPath) { + try { + const pluginsDir = getAdapterPluginsDir(); + await execFileAsync("npm", ["uninstall", externalRecord.packageName], { + cwd: pluginsDir, + timeout: 60_000, + }); + logger.info( + { type: adapterType, packageName: externalRecord.packageName }, + "npm uninstall completed for external adapter", + ); + } catch (err) { + logger.warn( + { err, type: adapterType, packageName: externalRecord.packageName }, + "npm uninstall failed for external adapter; continuing with unregister", + ); + } + } - logger.info({ type: adapterType }, "External adapter unregistered and removed"); + // Unregister from the runtime registry + unregisterServerAdapter(adapterType); - res.json({ type: adapterType, removed: true }); + // Remove from the persistent store + removeAdapterPlugin(adapterType); + + logger.info({ type: adapterType }, "External adapter unregistered and removed"); + + res.json({ type: adapterType, removed: true }); + }); }); /** @@ -480,39 +504,41 @@ export function adapterRoutes() { return; } - // Reload the adapter module (busts ESM cache, re-imports) - try { - const newModule = await reloadExternalAdapter(type); + await withAdapterMutex(async () => { + // Reload the adapter module (busts ESM cache, re-imports) + try { + const newModule = await reloadExternalAdapter(type); - // Not found in the external adapter store - if (!newModule) { - res.status(404).json({ error: `Adapter "${type}" is not an externally installed adapter.` }); - return; - } - - // Swap in the reloaded module - unregisterServerAdapter(type); - registerWithSessionManagement(newModule); - configSchemaCache.delete(type); - - // Sync store.version from package.json (store may be missing version for local installs). - const record = getAdapterPluginByType(type); - let newVersion: string | undefined; - if (record) { - newVersion = readAdapterPackageVersionFromDisk(record); - if (newVersion) { - addAdapterPlugin({ ...record, version: newVersion }); + // Not found in the external adapter store + if (!newModule) { + res.status(404).json({ error: `Adapter "${type}" is not an externally installed adapter.` }); + return; } + + // Swap in the reloaded module + unregisterServerAdapter(type); + registerWithSessionManagement(newModule); + configSchemaCache.delete(type); + + // Sync store.version from package.json (store may be missing version for local installs). + const record = getAdapterPluginByType(type); + let newVersion: string | undefined; + if (record) { + newVersion = readAdapterPackageVersionFromDisk(record); + if (newVersion) { + addAdapterPlugin({ ...record, version: newVersion }); + } + } + + logger.info({ type, version: newVersion }, "External adapter reloaded at runtime"); + + res.json({ type, version: newVersion, reloaded: true }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + logger.error({ err, type }, "Failed to reload external adapter"); + res.status(500).json({ error: `Failed to reload adapter: ${message}` }); } - - logger.info({ type, version: newVersion }, "External adapter reloaded at runtime"); - - res.json({ type, version: newVersion, reloaded: true }); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - logger.error({ err, type }, "Failed to reload external adapter"); - res.status(500).json({ error: `Failed to reload adapter: ${message}` }); - } + }); }); // ── POST /api/adapters/:type/reinstall ────────────────────────────────── @@ -542,45 +568,47 @@ export function adapterRoutes() { return; } - try { - const pluginsDir = getAdapterPluginsDir(); + await withAdapterMutex(async () => { + try { + const pluginsDir = getAdapterPluginsDir(); - logger.info({ type, packageName: record.packageName }, "Reinstalling adapter package via npm"); + logger.info({ type, packageName: record.packageName }, "Reinstalling adapter package via npm"); - await execFileAsync("npm", ["install", "--no-save", record.packageName], { - cwd: pluginsDir, - timeout: 120_000, - }); + await execFileAsync("npm", ["install", "--no-save", record.packageName.trim()], { + cwd: pluginsDir, + timeout: 120_000, + }); - // Reload the freshly installed adapter - const newModule = await reloadExternalAdapter(type); - if (!newModule) { - res.status(500).json({ error: "npm install succeeded but adapter reload failed." }); - return; - } - - unregisterServerAdapter(type); - registerWithSessionManagement(newModule); - configSchemaCache.delete(type); - - // Sync store version from disk - let newVersion: string | undefined; - const updatedRecord = getAdapterPluginByType(type); - if (updatedRecord) { - newVersion = readAdapterPackageVersionFromDisk(updatedRecord); - if (newVersion) { - addAdapterPlugin({ ...updatedRecord, version: newVersion }); + // Reload the freshly installed adapter + const newModule = await reloadExternalAdapter(type); + if (!newModule) { + res.status(500).json({ error: "npm install succeeded but adapter reload failed." }); + return; } + + unregisterServerAdapter(type); + registerWithSessionManagement(newModule); + configSchemaCache.delete(type); + + // Sync store version from disk + let newVersion: string | undefined; + const updatedRecord = getAdapterPluginByType(type); + if (updatedRecord) { + newVersion = readAdapterPackageVersionFromDisk(updatedRecord); + if (newVersion) { + addAdapterPlugin({ ...updatedRecord, version: newVersion }); + } + } + + logger.info({ type, version: newVersion }, "Adapter reinstalled from npm"); + + res.json({ type, version: newVersion, reinstalled: true }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + logger.error({ err, type }, "Failed to reinstall adapter"); + res.status(500).json({ error: `Reinstall failed: ${message}` }); } - - logger.info({ type, version: newVersion }, "Adapter reinstalled from npm"); - - res.json({ type, version: newVersion, reinstalled: true }); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - logger.error({ err, type }, "Failed to reinstall adapter"); - res.status(500).json({ error: `Reinstall failed: ${message}` }); - } + }); }); // ── GET /api/adapters/:type/config-schema ──────────────────────────────── diff --git a/server/src/services/adapter-plugin-store.ts b/server/src/services/adapter-plugin-store.ts index 8c26abe8..3df1ba7d 100644 --- a/server/src/services/adapter-plugin-store.ts +++ b/server/src/services/adapter-plugin-store.ts @@ -86,7 +86,12 @@ function readStore(): AdapterPluginRecord[] { function writeStore(records: AdapterPluginRecord[]): void { ensureDirs(); - fs.writeFileSync(ADAPTER_PLUGINS_STORE_PATH, JSON.stringify(records, null, 2), "utf-8"); + // Atomic write: write to a temp file in the same directory then rename. + // rename() is atomic on POSIX when source and target are on the same + // filesystem, preventing partial/corrupted reads from concurrent processes. + const tmpPath = `${ADAPTER_PLUGINS_STORE_PATH}.${process.pid}.tmp`; + fs.writeFileSync(tmpPath, JSON.stringify(records, null, 2), "utf-8"); + fs.renameSync(tmpPath, ADAPTER_PLUGINS_STORE_PATH); storeCache = records; } @@ -106,7 +111,9 @@ function readSettings(): AdapterSettings { function writeSettings(settings: AdapterSettings): void { ensureDirs(); - fs.writeFileSync(ADAPTER_SETTINGS_PATH, JSON.stringify(settings, null, 2), "utf-8"); + const tmpPath = `${ADAPTER_SETTINGS_PATH}.${process.pid}.tmp`; + fs.writeFileSync(tmpPath, JSON.stringify(settings, null, 2), "utf-8"); + fs.renameSync(tmpPath, ADAPTER_SETTINGS_PATH); settingsCache = settings; }