Fix adapter plugin conflicts caused by concurrent install/reload/delete operations

Add an async mutex (promise-chain FIFO queue) to serialise all adapter
mutation routes (install, reinstall, reload, delete) so that concurrent
requests cannot race on npm, the adapter-plugins.json store, or the
in-memory adapter registry.

Also switch adapter-plugin-store file writes to atomic write-tmp-then-rename
to prevent partial/corrupted reads from concurrent processes.

Includes the packageName.trim() fix for whitespace-induced npm failures.

9 new tests covering mutex serialisation, error recovery, FIFO ordering,
and atomic store operations.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-04-12 15:54:09 +00:00
parent 9c7a17e16c
commit 92afa0fb67
3 changed files with 410 additions and 198 deletions
@@ -0,0 +1,177 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { withAdapterMutex } from "../routes/adapters.js";
// ---------------------------------------------------------------------------
// withAdapterMutex — serialisation tests
// ---------------------------------------------------------------------------
describe("withAdapterMutex", () => {
it("serialises concurrent calls so they do not overlap", async () => {
const log: string[] = [];
const taskA = withAdapterMutex(async () => {
log.push("A-start");
await new Promise((r) => setTimeout(r, 50));
log.push("A-end");
return "a";
});
const taskB = withAdapterMutex(async () => {
log.push("B-start");
await new Promise((r) => setTimeout(r, 10));
log.push("B-end");
return "b";
});
const [resultA, resultB] = await Promise.all([taskA, taskB]);
expect(resultA).toBe("a");
expect(resultB).toBe("b");
// A must fully complete before B starts (FIFO serialisation)
expect(log).toEqual(["A-start", "A-end", "B-start", "B-end"]);
});
it("a failed task does not block subsequent tasks", async () => {
const failing = withAdapterMutex(async () => {
throw new Error("boom");
});
await expect(failing).rejects.toThrow("boom");
const ok = await withAdapterMutex(async () => "ok");
expect(ok).toBe("ok");
});
it("preserves FIFO order for many concurrent callers", async () => {
const order: number[] = [];
const tasks = Array.from({ length: 5 }, (_, i) =>
withAdapterMutex(async () => {
order.push(i);
}),
);
await Promise.all(tasks);
expect(order).toEqual([0, 1, 2, 3, 4]);
});
it("returns the value from the inner function", async () => {
const result = await withAdapterMutex(async () => ({ answer: 42 }));
expect(result).toEqual({ answer: 42 });
});
it("propagates errors from the inner function", async () => {
await expect(
withAdapterMutex(async () => {
throw new TypeError("bad input");
}),
).rejects.toThrow("bad input");
});
});
// ---------------------------------------------------------------------------
// adapter-plugin-store — atomic write tests
// ---------------------------------------------------------------------------
describe("adapter-plugin-store atomic writes", () => {
// We test the store in an isolated temp directory by stubbing HOME
const originalHome = process.env.HOME;
let tmpDir: string;
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "adapter-store-test-"));
process.env.HOME = tmpDir;
// Force the store module to pick up the new HOME by resetting its cache.
// We re-import it fresh for each test.
});
afterEach(() => {
process.env.HOME = originalHome;
fs.rmSync(tmpDir, { recursive: true, force: true });
});
it("writeStore creates the file atomically via rename", async () => {
// Dynamically import the store so it uses the stubbed HOME
vi.resetModules();
const store = await import("../services/adapter-plugin-store.js");
const record = {
packageName: "test-adapter",
type: "test_type",
installedAt: new Date().toISOString(),
};
store.addAdapterPlugin(record);
// The store file should exist and contain the record
const storePath = path.join(tmpDir, ".paperclip", "adapter-plugins.json");
expect(fs.existsSync(storePath)).toBe(true);
const contents = JSON.parse(fs.readFileSync(storePath, "utf-8"));
expect(contents).toHaveLength(1);
expect(contents[0].packageName).toBe("test-adapter");
// No lingering temp files
const dir = path.dirname(storePath);
const tmpFiles = fs.readdirSync(dir).filter((f) => f.endsWith(".tmp"));
expect(tmpFiles).toHaveLength(0);
});
it("addAdapterPlugin is idempotent for the same type", async () => {
vi.resetModules();
const store = await import("../services/adapter-plugin-store.js");
const record1 = {
packageName: "pkg-a",
type: "my_adapter",
version: "1.0.0",
installedAt: new Date().toISOString(),
};
const record2 = {
packageName: "pkg-a",
type: "my_adapter",
version: "2.0.0",
installedAt: new Date().toISOString(),
};
store.addAdapterPlugin(record1);
store.addAdapterPlugin(record2);
const plugins = store.listAdapterPlugins();
expect(plugins).toHaveLength(1);
expect(plugins[0].version).toBe("2.0.0");
});
it("removeAdapterPlugin removes the correct record", async () => {
vi.resetModules();
const store = await import("../services/adapter-plugin-store.js");
store.addAdapterPlugin({
packageName: "a",
type: "type_a",
installedAt: new Date().toISOString(),
});
store.addAdapterPlugin({
packageName: "b",
type: "type_b",
installedAt: new Date().toISOString(),
});
expect(store.listAdapterPlugins()).toHaveLength(2);
const removed = store.removeAdapterPlugin("type_a");
expect(removed).toBe(true);
expect(store.listAdapterPlugins()).toHaveLength(1);
expect(store.listAdapterPlugins()[0].type).toBe("type_b");
});
it("removeAdapterPlugin returns false for unknown type", async () => {
vi.resetModules();
const store = await import("../services/adapter-plugin-store.js");
expect(store.removeAdapterPlugin("nonexistent")).toBe(false);
});
});
+224 -196
View File
@@ -46,6 +46,26 @@ import { BUILTIN_ADAPTER_TYPES } from "../adapters/builtin-adapter-types.js";
const execFileAsync = promisify(execFile);
// ---------------------------------------------------------------------------
// Concurrency control — serialise all install / reinstall / reload / delete
// operations so that concurrent requests cannot race on npm, the plugin store
// JSON file, or the in-memory adapter registry.
// ---------------------------------------------------------------------------
let mutexQueue: Promise<void> = Promise.resolve();
/**
* Enqueue `fn` behind any in-flight adapter mutation. Only one mutation runs
* at a time; the rest wait in FIFO order.
*/
export function withAdapterMutex<T>(fn: () => Promise<T>): Promise<T> {
const ticket = mutexQueue.then(fn, fn); // always run `fn` after the queue settles
// Swallow rejections on the queue itself so a failed operation doesn't
// permanently block subsequent ones.
mutexQueue = ticket.then(() => {}, () => {});
return ticket;
}
// ---------------------------------------------------------------------------
// Request / Response types
// ---------------------------------------------------------------------------
@@ -211,7 +231,7 @@ export function adapterRoutes() {
// Strip version suffix if the UI sends "pkg@1.2.3" instead of separating it
// e.g. "@henkey/hermes-paperclip-adapter@0.3.0" → packageName + version
let canonicalName = packageName;
let canonicalName = packageName.trim();
let explicitVersion = version;
const versionSuffix = packageName.match(/@(\d+\.\d+\.\d+.*)$/);
if (versionSuffix) {
@@ -224,103 +244,105 @@ export function adapterRoutes() {
}
}
try {
let installedVersion: string | undefined;
let moduleLocalPath: string | undefined;
await withAdapterMutex(async () => {
try {
let installedVersion: string | undefined;
let moduleLocalPath: string | undefined;
if (!isLocalPath) {
// npm install into the managed directory
const pluginsDir = getAdapterPluginsDir();
const spec = explicitVersion ? `${canonicalName}@${explicitVersion}` : canonicalName;
if (!isLocalPath) {
// npm install into the managed directory
const pluginsDir = getAdapterPluginsDir();
const spec = explicitVersion ? `${canonicalName}@${explicitVersion}` : canonicalName;
logger.info({ spec, pluginsDir }, "Installing adapter package via npm");
logger.info({ spec, pluginsDir }, "Installing adapter package via npm");
await execFileAsync("npm", ["install", "--no-save", spec], {
cwd: pluginsDir,
timeout: 120_000,
});
await execFileAsync("npm", ["install", "--no-save", spec], {
cwd: pluginsDir,
timeout: 120_000,
});
// Read installed version from package.json
try {
const pkgJsonPath = path.join(pluginsDir, "node_modules", canonicalName, "package.json");
const pkgContent = await import("node:fs/promises");
const pkgRaw = await pkgContent.readFile(pkgJsonPath, "utf-8");
const pkg = JSON.parse(pkgRaw);
const v = pkg.version;
installedVersion =
typeof v === "string" && v.trim().length > 0 ? v.trim() : explicitVersion;
} catch {
installedVersion = explicitVersion;
}
} else {
// Local path — normalize (e.g., Windows → WSL) and use the resolved path
moduleLocalPath = path.resolve(await normalizeLocalPath(packageName));
try {
const pkgRaw = await readFile(path.join(moduleLocalPath, "package.json"), "utf-8");
const v = JSON.parse(pkgRaw).version;
if (typeof v === "string" && v.trim().length > 0) {
installedVersion = v.trim();
// Read installed version from package.json
try {
const pkgJsonPath = path.join(pluginsDir, "node_modules", canonicalName, "package.json");
const pkgContent = await import("node:fs/promises");
const pkgRaw = await pkgContent.readFile(pkgJsonPath, "utf-8");
const pkg = JSON.parse(pkgRaw);
const v = pkg.version;
installedVersion =
typeof v === "string" && v.trim().length > 0 ? v.trim() : explicitVersion;
} catch {
installedVersion = explicitVersion;
}
} catch {
// leave installedVersion undefined if package.json is missing
} else {
// Local path — normalize (e.g., Windows → WSL) and use the resolved path
moduleLocalPath = path.resolve(await normalizeLocalPath(packageName));
try {
const pkgRaw = await readFile(path.join(moduleLocalPath, "package.json"), "utf-8");
const v = JSON.parse(pkgRaw).version;
if (typeof v === "string" && v.trim().length > 0) {
installedVersion = v.trim();
}
} catch {
// leave installedVersion undefined if package.json is missing
}
}
// Load and register the adapter (use canonicalName for path resolution)
const adapterModule = await loadExternalAdapterPackage(canonicalName, moduleLocalPath);
// Check if this type conflicts with a built-in adapter
if (BUILTIN_ADAPTER_TYPES.has(adapterModule.type)) {
res.status(409).json({
error: `Adapter type "${adapterModule.type}" is a built-in adapter and cannot be overwritten.`,
});
return;
}
// Check if already registered (indicates a reinstall/update)
const existing = findServerAdapter(adapterModule.type);
const isReinstall = existing !== null;
if (existing) {
unregisterServerAdapter(adapterModule.type);
logger.info({ type: adapterModule.type }, "Unregistered existing adapter for replacement");
}
// Register the new adapter
registerWithSessionManagement(adapterModule);
// Persist the record (use canonicalName without version suffix)
const record: AdapterPluginRecord = {
packageName: canonicalName,
localPath: moduleLocalPath,
version: installedVersion ?? explicitVersion,
type: adapterModule.type,
installedAt: new Date().toISOString(),
};
addAdapterPlugin(record);
logger.info(
{ type: adapterModule.type, packageName: canonicalName },
"External adapter installed and registered",
);
res.status(201).json({
type: adapterModule.type,
packageName: canonicalName,
version: installedVersion ?? explicitVersion,
installedAt: record.installedAt,
requiresRestart: isReinstall,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
logger.error({ err, packageName }, "Failed to install external adapter");
// Distinguish npm errors from load errors
if (message.includes("npm") || message.includes("ERR!")) {
res.status(500).json({ error: `npm install failed: ${message}` });
} else {
res.status(500).json({ error: `Failed to install adapter: ${message}` });
}
}
// Load and register the adapter (use canonicalName for path resolution)
const adapterModule = await loadExternalAdapterPackage(canonicalName, moduleLocalPath);
// Check if this type conflicts with a built-in adapter
if (BUILTIN_ADAPTER_TYPES.has(adapterModule.type)) {
res.status(409).json({
error: `Adapter type "${adapterModule.type}" is a built-in adapter and cannot be overwritten.`,
});
return;
}
// Check if already registered (indicates a reinstall/update)
const existing = findServerAdapter(adapterModule.type);
const isReinstall = existing !== null;
if (existing) {
unregisterServerAdapter(adapterModule.type);
logger.info({ type: adapterModule.type }, "Unregistered existing adapter for replacement");
}
// Register the new adapter
registerWithSessionManagement(adapterModule);
// Persist the record (use canonicalName without version suffix)
const record: AdapterPluginRecord = {
packageName: canonicalName,
localPath: moduleLocalPath,
version: installedVersion ?? explicitVersion,
type: adapterModule.type,
installedAt: new Date().toISOString(),
};
addAdapterPlugin(record);
logger.info(
{ type: adapterModule.type, packageName: canonicalName },
"External adapter installed and registered",
);
res.status(201).json({
type: adapterModule.type,
packageName: canonicalName,
version: installedVersion ?? explicitVersion,
installedAt: record.installedAt,
requiresRestart: isReinstall,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
logger.error({ err, packageName }, "Failed to install external adapter");
// Distinguish npm errors from load errors
if (message.includes("npm") || message.includes("ERR!")) {
res.status(500).json({ error: `npm install failed: ${message}` });
} else {
res.status(500).json({ error: `Failed to install adapter: ${message}` });
}
}
});
});
/**
@@ -412,53 +434,55 @@ export function adapterRoutes() {
return;
}
// Check that the adapter exists in the registry
const existing = findServerAdapter(adapterType);
if (!existing) {
res.status(404).json({
error: `Adapter "${adapterType}" is not registered.`,
});
return;
}
// Check that it's an external adapter
const externalRecord = getAdapterPluginByType(adapterType);
if (!externalRecord) {
res.status(404).json({
error: `Adapter "${adapterType}" is not an externally installed adapter.`,
});
return;
}
// If installed via npm (has packageName but no localPath), run npm uninstall
if (externalRecord.packageName && !externalRecord.localPath) {
try {
const pluginsDir = getAdapterPluginsDir();
await execFileAsync("npm", ["uninstall", externalRecord.packageName], {
cwd: pluginsDir,
timeout: 60_000,
await withAdapterMutex(async () => {
// Check that the adapter exists in the registry
const existing = findServerAdapter(adapterType);
if (!existing) {
res.status(404).json({
error: `Adapter "${adapterType}" is not registered.`,
});
logger.info(
{ type: adapterType, packageName: externalRecord.packageName },
"npm uninstall completed for external adapter",
);
} catch (err) {
logger.warn(
{ err, type: adapterType, packageName: externalRecord.packageName },
"npm uninstall failed for external adapter; continuing with unregister",
);
return;
}
}
// Unregister from the runtime registry
unregisterServerAdapter(adapterType);
// Check that it's an external adapter
const externalRecord = getAdapterPluginByType(adapterType);
if (!externalRecord) {
res.status(404).json({
error: `Adapter "${adapterType}" is not an externally installed adapter.`,
});
return;
}
// Remove from the persistent store
removeAdapterPlugin(adapterType);
// If installed via npm (has packageName but no localPath), run npm uninstall
if (externalRecord.packageName && !externalRecord.localPath) {
try {
const pluginsDir = getAdapterPluginsDir();
await execFileAsync("npm", ["uninstall", externalRecord.packageName], {
cwd: pluginsDir,
timeout: 60_000,
});
logger.info(
{ type: adapterType, packageName: externalRecord.packageName },
"npm uninstall completed for external adapter",
);
} catch (err) {
logger.warn(
{ err, type: adapterType, packageName: externalRecord.packageName },
"npm uninstall failed for external adapter; continuing with unregister",
);
}
}
logger.info({ type: adapterType }, "External adapter unregistered and removed");
// Unregister from the runtime registry
unregisterServerAdapter(adapterType);
res.json({ type: adapterType, removed: true });
// Remove from the persistent store
removeAdapterPlugin(adapterType);
logger.info({ type: adapterType }, "External adapter unregistered and removed");
res.json({ type: adapterType, removed: true });
});
});
/**
@@ -480,39 +504,41 @@ export function adapterRoutes() {
return;
}
// Reload the adapter module (busts ESM cache, re-imports)
try {
const newModule = await reloadExternalAdapter(type);
await withAdapterMutex(async () => {
// Reload the adapter module (busts ESM cache, re-imports)
try {
const newModule = await reloadExternalAdapter(type);
// Not found in the external adapter store
if (!newModule) {
res.status(404).json({ error: `Adapter "${type}" is not an externally installed adapter.` });
return;
}
// Swap in the reloaded module
unregisterServerAdapter(type);
registerWithSessionManagement(newModule);
configSchemaCache.delete(type);
// Sync store.version from package.json (store may be missing version for local installs).
const record = getAdapterPluginByType(type);
let newVersion: string | undefined;
if (record) {
newVersion = readAdapterPackageVersionFromDisk(record);
if (newVersion) {
addAdapterPlugin({ ...record, version: newVersion });
// Not found in the external adapter store
if (!newModule) {
res.status(404).json({ error: `Adapter "${type}" is not an externally installed adapter.` });
return;
}
// Swap in the reloaded module
unregisterServerAdapter(type);
registerWithSessionManagement(newModule);
configSchemaCache.delete(type);
// Sync store.version from package.json (store may be missing version for local installs).
const record = getAdapterPluginByType(type);
let newVersion: string | undefined;
if (record) {
newVersion = readAdapterPackageVersionFromDisk(record);
if (newVersion) {
addAdapterPlugin({ ...record, version: newVersion });
}
}
logger.info({ type, version: newVersion }, "External adapter reloaded at runtime");
res.json({ type, version: newVersion, reloaded: true });
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
logger.error({ err, type }, "Failed to reload external adapter");
res.status(500).json({ error: `Failed to reload adapter: ${message}` });
}
logger.info({ type, version: newVersion }, "External adapter reloaded at runtime");
res.json({ type, version: newVersion, reloaded: true });
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
logger.error({ err, type }, "Failed to reload external adapter");
res.status(500).json({ error: `Failed to reload adapter: ${message}` });
}
});
});
// ── POST /api/adapters/:type/reinstall ──────────────────────────────────
@@ -542,45 +568,47 @@ export function adapterRoutes() {
return;
}
try {
const pluginsDir = getAdapterPluginsDir();
await withAdapterMutex(async () => {
try {
const pluginsDir = getAdapterPluginsDir();
logger.info({ type, packageName: record.packageName }, "Reinstalling adapter package via npm");
logger.info({ type, packageName: record.packageName }, "Reinstalling adapter package via npm");
await execFileAsync("npm", ["install", "--no-save", record.packageName], {
cwd: pluginsDir,
timeout: 120_000,
});
await execFileAsync("npm", ["install", "--no-save", record.packageName.trim()], {
cwd: pluginsDir,
timeout: 120_000,
});
// Reload the freshly installed adapter
const newModule = await reloadExternalAdapter(type);
if (!newModule) {
res.status(500).json({ error: "npm install succeeded but adapter reload failed." });
return;
}
unregisterServerAdapter(type);
registerWithSessionManagement(newModule);
configSchemaCache.delete(type);
// Sync store version from disk
let newVersion: string | undefined;
const updatedRecord = getAdapterPluginByType(type);
if (updatedRecord) {
newVersion = readAdapterPackageVersionFromDisk(updatedRecord);
if (newVersion) {
addAdapterPlugin({ ...updatedRecord, version: newVersion });
// Reload the freshly installed adapter
const newModule = await reloadExternalAdapter(type);
if (!newModule) {
res.status(500).json({ error: "npm install succeeded but adapter reload failed." });
return;
}
unregisterServerAdapter(type);
registerWithSessionManagement(newModule);
configSchemaCache.delete(type);
// Sync store version from disk
let newVersion: string | undefined;
const updatedRecord = getAdapterPluginByType(type);
if (updatedRecord) {
newVersion = readAdapterPackageVersionFromDisk(updatedRecord);
if (newVersion) {
addAdapterPlugin({ ...updatedRecord, version: newVersion });
}
}
logger.info({ type, version: newVersion }, "Adapter reinstalled from npm");
res.json({ type, version: newVersion, reinstalled: true });
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
logger.error({ err, type }, "Failed to reinstall adapter");
res.status(500).json({ error: `Reinstall failed: ${message}` });
}
logger.info({ type, version: newVersion }, "Adapter reinstalled from npm");
res.json({ type, version: newVersion, reinstalled: true });
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
logger.error({ err, type }, "Failed to reinstall adapter");
res.status(500).json({ error: `Reinstall failed: ${message}` });
}
});
});
// ── GET /api/adapters/:type/config-schema ────────────────────────────────
+9 -2
View File
@@ -86,7 +86,12 @@ function readStore(): AdapterPluginRecord[] {
function writeStore(records: AdapterPluginRecord[]): void {
ensureDirs();
fs.writeFileSync(ADAPTER_PLUGINS_STORE_PATH, JSON.stringify(records, null, 2), "utf-8");
// Atomic write: write to a temp file in the same directory then rename.
// rename() is atomic on POSIX when source and target are on the same
// filesystem, preventing partial/corrupted reads from concurrent processes.
const tmpPath = `${ADAPTER_PLUGINS_STORE_PATH}.${process.pid}.tmp`;
fs.writeFileSync(tmpPath, JSON.stringify(records, null, 2), "utf-8");
fs.renameSync(tmpPath, ADAPTER_PLUGINS_STORE_PATH);
storeCache = records;
}
@@ -106,7 +111,9 @@ function readSettings(): AdapterSettings {
function writeSettings(settings: AdapterSettings): void {
ensureDirs();
fs.writeFileSync(ADAPTER_SETTINGS_PATH, JSON.stringify(settings, null, 2), "utf-8");
const tmpPath = `${ADAPTER_SETTINGS_PATH}.${process.pid}.tmp`;
fs.writeFileSync(tmpPath, JSON.stringify(settings, null, 2), "utf-8");
fs.renameSync(tmpPath, ADAPTER_SETTINGS_PATH);
settingsCache = settings;
}