Files
paperclip/server/src/services/plugin-worker-manager.ts
T
Dotta ece8a51e22 [codex] Bundle local branch fixes from PAP-10032 (#6604)
## Thinking Path

> - Paperclip orchestrates AI agents for zero-human companies.
> - This branch accumulated multiple already-tested control-plane,
adapter runtime, invite, workspace, plugin, and UI quality fixes on the
primary Paperclip checkout.
> - `origin/master` advanced while those commits were still local, so
the branch needed to be preserved and reconciled before review.
> - Splitting the branch commit-by-commit against the new base produced
overlapping conflicts with recently merged upstream PRs.
> - This pull request keeps the remaining branch as one standalone PR
because the final diff is 38 files after removing screenshot artifacts,
under Greptile's 100-file cap, and can be merged independently after
review.
> - The benefit is that none of the local work is lost, the branch is
now based on current `origin/master`, and reviewers can evaluate the
reconciled changes in one place.

## What Changed

- Merged the local accumulated branch with current `origin/master` and
resolved the invite-flow overlaps from the newer upstream companies
query helper.
- Preserved the local fixes for invite existing-member behavior, invite
link copy fallback, reusable workspace selection, worktree auth, static
SPA fallback, markdown wrapping, plugin slot registration, cloud
upstream UX/server polish, project sorting, and related tests.
- Removed screenshot artifacts from the PR per review request.
- Kept the PR under the requested file limit: 38 files changed, with no
`pnpm-lock.yaml` or `.github/workflows/*` changes.

## Verification

- `NODE_ENV=test pnpm exec vitest run
ui/src/pages/CompanyInvites.test.tsx ui/src/pages/InviteLanding.test.tsx
ui/src/pages/Projects.test.tsx ui/src/plugins/slots.test.ts
ui/src/components/MarkdownBody.test.tsx
server/src/__tests__/invite-accept-existing-member.test.ts
server/src/__tests__/static-index-html.test.ts
server/src/__tests__/execution-workspaces-service.test.ts
server/src/__tests__/better-auth.test.ts
server/src/__tests__/worktree-config.test.ts`
- `NODE_ENV=test pnpm --filter @paperclipai/ui typecheck`
- `NODE_ENV=test pnpm --filter @paperclipai/server typecheck`
- Confirmed `git diff --name-only origin/master...HEAD | wc -l` is `38`.
- Confirmed no PR diff entries match `pnpm-lock.yaml`,
`.github/workflows/*`, or `screenshots/*`.

## Risks

- Medium review risk because this is a bundled rescue PR rather than
several narrow feature PRs.
- Invite flow and company cache behavior overlapped with newer upstream
changes; the merge resolution intentionally keeps the shared
`companiesListQueryOptions` helper while preserving local
existing-member invite behavior.
- Visual review evidence is no longer attached in-repo because
screenshots were removed from this PR per review request.

> For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and
discuss it in `#dev` before opening the PR. Feature PRs that overlap
with planned core work may need to be redirected — check the roadmap
first. See `CONTRIBUTING.md`.

## Model Used

- OpenAI Codex, GPT-5-based coding agent, with repository tool access,
terminal execution, and git/GitHub CLI operations.

## Checklist

- [x] I have included a thinking path that traces from project context
to this change
- [x] I have specified the model used (with version and capability
details)
- [x] I have checked ROADMAP.md and confirmed this PR does not duplicate
planned core work
- [x] I have run tests locally and they pass
- [x] I have added or updated tests where applicable
- [x] UI screenshots were intentionally removed from this PR per review
request
- [x] I have updated relevant documentation to reflect my changes
- [x] I have considered and documented any risks above
- [x] I will address all Greptile and reviewer comments before
requesting merge

---------

Co-authored-by: Paperclip <noreply@paperclip.ing>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: CodexCoder <codexcoder@paperclip.local>
2026-05-25 07:25:26 -05:00

1481 lines
45 KiB
TypeScript

/**
* PluginWorkerManager — spawns and manages out-of-process plugin worker child
* processes, routes JSON-RPC 2.0 calls over stdio, and handles lifecycle
* management including crash recovery with exponential backoff.
*
* Each installed plugin gets one dedicated worker process. The host sends
* JSON-RPC requests over the child's stdin and reads responses from stdout.
* Worker stderr is captured and forwarded to the host logger.
*
* Process Model (from PLUGIN_SPEC.md §12):
* - One worker process per installed plugin
* - Failure isolation: plugin crashes do not affect the host
* - Graceful shutdown: 10-second drain, then SIGTERM, then SIGKILL
* - Automatic restart with exponential backoff on unexpected exits
*
* @see PLUGIN_SPEC.md §12 — Process Model
* @see PLUGIN_SPEC.md §12.5 — Graceful Shutdown Policy
* @see PLUGIN_SPEC.md §13 — Host-Worker Protocol
*/
import { fork, type ChildProcess } from "node:child_process";
import { randomUUID } from "node:crypto";
import { EventEmitter } from "node:events";
import { createInterface, type Interface as ReadlineInterface } from "node:readline";
import type { PaperclipPluginManifestV1 } from "@paperclipai/shared";
import {
JSONRPC_VERSION,
JSONRPC_ERROR_CODES,
PLUGIN_RPC_ERROR_CODES,
createRequest,
createErrorResponse,
parseMessage,
serializeMessage,
isJsonRpcResponse,
isJsonRpcRequest,
isJsonRpcNotification,
isJsonRpcSuccessResponse,
JsonRpcParseError,
JsonRpcCallError,
} from "@paperclipai/plugin-sdk";
import type {
JsonRpcId,
PluginInvocationContext,
PluginInvocationScope,
JsonRpcResponse,
JsonRpcRequest,
JsonRpcNotification,
WorkerHostCallContext,
HostToWorkerMethodName,
HostToWorkerMethods,
WorkerToHostMethodName,
WorkerToHostMethods,
InitializeParams,
} from "@paperclipai/plugin-sdk";
import { logger } from "../middleware/logger.js";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** Default timeout for RPC calls in milliseconds. */
const DEFAULT_RPC_TIMEOUT_MS = 30_000;
/** Hard upper bound for any RPC timeout (15 minutes). Prevents unbounded waits. */
const MAX_RPC_TIMEOUT_MS = 15 * 60 * 1_000;
/** Timeout for the initialize RPC call. */
const INITIALIZE_TIMEOUT_MS = 15_000;
/** Timeout for the shutdown RPC call before escalating to SIGTERM. */
const SHUTDOWN_DRAIN_MS = 10_000;
/** Time to wait after SIGTERM before sending SIGKILL. */
const SIGTERM_GRACE_MS = 5_000;
/** Minimum backoff delay for crash recovery (1 second). */
const MIN_BACKOFF_MS = 1_000;
/** Maximum backoff delay for crash recovery (5 minutes). */
const MAX_BACKOFF_MS = 5 * 60 * 1_000;
/** Backoff multiplier on each consecutive crash. */
const BACKOFF_MULTIPLIER = 2;
/** Maximum number of consecutive crashes before giving up on auto-restart. */
const MAX_CONSECUTIVE_CRASHES = 10;
/** Time window in which crashes are considered consecutive (10 minutes). */
const CRASH_WINDOW_MS = 10 * 60 * 1_000;
/** Maximum number of stderr characters retained for worker failure context. */
const MAX_STDERR_EXCERPT_CHARS = 8_000;
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/**
* Status of a managed worker process.
*/
export type WorkerStatus =
| "stopped"
| "starting"
| "running"
| "stopping"
| "crashed"
| "backoff";
/**
* Worker-to-host method handler. The host registers these to service calls
* that the plugin worker makes back to the host (e.g. state.get, events.emit).
*/
export type WorkerToHostHandler<M extends WorkerToHostMethodName> = (
params: WorkerToHostMethods[M][0],
context?: WorkerHostCallContext,
) => Promise<WorkerToHostMethods[M][1]>;
/**
* A map of all worker-to-host method handlers provided by the host.
*/
export type WorkerToHostHandlers = {
[M in WorkerToHostMethodName]?: WorkerToHostHandler<M>;
};
/**
* Events emitted by a PluginWorkerHandle.
*/
export interface WorkerHandleEvents {
/** Worker process started and is ready (initialize succeeded). */
"ready": { pluginId: string };
/** Worker process exited. */
"exit": { pluginId: string; code: number | null; signal: NodeJS.Signals | null };
/** Worker process crashed unexpectedly. */
"crash": { pluginId: string; code: number | null; signal: NodeJS.Signals | null; willRestart: boolean };
/** Worker process errored (e.g. spawn failure). */
"error": { pluginId: string; error: Error };
/** Worker status changed. */
"status": { pluginId: string; status: WorkerStatus; previousStatus: WorkerStatus };
}
type WorkerHandleEventName = keyof WorkerHandleEvents;
export function appendStderrExcerpt(current: string, chunk: string): string {
const next = current ? `${current}\n${chunk}` : chunk;
return next.length <= MAX_STDERR_EXCERPT_CHARS
? next
: next.slice(-MAX_STDERR_EXCERPT_CHARS);
}
export function formatWorkerFailureMessage(message: string, stderrExcerpt: string): string {
const excerpt = stderrExcerpt.trim();
if (!excerpt) return message;
if (message.includes(excerpt)) return message;
return `${message}\n\nWorker stderr:\n${excerpt}`;
}
/**
* Options for starting a worker process.
*/
export interface WorkerStartOptions {
/** Absolute path to the plugin worker entrypoint (CJS bundle). */
entrypointPath: string;
/** Plugin manifest. */
manifest: PaperclipPluginManifestV1;
/** Resolved plugin configuration. */
config: Record<string, unknown>;
/** Host instance information for the initialize call. */
instanceInfo: {
instanceId: string;
hostVersion: string;
};
/** Host API version. */
apiVersion: number;
/** Host-derived plugin database namespace, when declared. */
databaseNamespace?: string | null;
/** Handlers for worker→host RPC calls. */
hostHandlers: WorkerToHostHandlers;
/** Default timeout for RPC calls (ms). Defaults to 30s. */
rpcTimeoutMs?: number;
/** Whether to auto-restart on crash. Defaults to true. */
autoRestart?: boolean;
/** Node.js execArgv passed to the child process. */
execArgv?: string[];
/** Environment variables passed to the child process. */
env?: Record<string, string>;
/**
* Callback for stream notifications from the worker (streams.open/emit/close).
* The host wires this to the PluginStreamBus to fan out events to SSE clients.
*/
onStreamNotification?: (method: string, params: Record<string, unknown>) => void;
}
/**
* A pending RPC call waiting for a response from the worker.
*/
interface PendingRequest {
/** The request ID. */
id: JsonRpcId;
/** Method name (for logging). */
method: string;
/** Resolve the promise with the response. */
resolve: (response: JsonRpcResponse) => void;
/** Timeout timer handle. */
timer: ReturnType<typeof setTimeout>;
/** Timestamp when the request was sent. */
sentAt: number;
/** Active host-owned invocation id attached to this host→worker call. */
invocationId?: string;
}
interface ActiveInvocation {
scope: PluginInvocationScope;
timer?: ReturnType<typeof setTimeout>;
}
// ---------------------------------------------------------------------------
// PluginWorkerHandle — manages a single worker process
// ---------------------------------------------------------------------------
/**
* Handle for a single plugin worker process.
*
* Callers use `start()` to spawn the worker, `call()` to send RPC requests,
* and `stop()` to gracefully shut down. The handle manages crash recovery
* with exponential backoff automatically when `autoRestart` is enabled.
*/
export interface PluginWorkerHandle {
/** The plugin ID this worker serves. */
readonly pluginId: string;
/** Current worker status. */
readonly status: WorkerStatus;
/** Start the worker process. Resolves when initialize completes. */
start(): Promise<void>;
/**
* Stop the worker process gracefully.
*
* Sends a `shutdown` RPC call, waits up to 10 seconds for the worker to
* exit, then escalates to SIGTERM, and finally SIGKILL if needed.
*/
stop(): Promise<void>;
/**
* Restart the worker process (stop + start).
*/
restart(): Promise<void>;
/**
* Send a typed host→worker RPC call.
*
* @param method - The RPC method name
* @param params - Method parameters
* @param timeoutMs - Optional per-call timeout override
* @returns The method result
* @throws {JsonRpcCallError} if the worker returns an error response
* @throws {Error} if the worker is not running or the call times out
*/
call<M extends HostToWorkerMethodName>(
method: M,
params: HostToWorkerMethods[M][0],
timeoutMs?: number,
): Promise<HostToWorkerMethods[M][1]>;
/**
* Send a fire-and-forget notification to the worker (no response expected).
*/
notify(method: string, params: unknown): void;
/** Subscribe to worker events. */
on<K extends WorkerHandleEventName>(
event: K,
listener: (payload: WorkerHandleEvents[K]) => void,
): void;
/** Unsubscribe from worker events. */
off<K extends WorkerHandleEventName>(
event: K,
listener: (payload: WorkerHandleEvents[K]) => void,
): void;
/** Optional methods the worker reported during initialization. */
readonly supportedMethods: string[];
/** Get diagnostic info about the worker. */
diagnostics(): WorkerDiagnostics;
}
/**
* Diagnostic information about a worker process.
*/
export interface WorkerDiagnostics {
pluginId: string;
status: WorkerStatus;
pid: number | null;
uptime: number | null;
consecutiveCrashes: number;
totalCrashes: number;
pendingRequests: number;
lastCrashAt: number | null;
nextRestartAt: number | null;
}
// ---------------------------------------------------------------------------
// PluginWorkerManager — manages all plugin workers
// ---------------------------------------------------------------------------
/**
* The top-level manager that holds all plugin worker handles.
*
* Provides a registry of workers keyed by plugin ID, with convenience methods
* for starting/stopping all workers and routing RPC calls.
*/
export interface PluginWorkerManager {
/**
* Register and start a worker for a plugin.
*
* @returns The worker handle
* @throws if a worker is already registered for this plugin
*/
startWorker(pluginId: string, options: WorkerStartOptions): Promise<PluginWorkerHandle>;
/**
* Stop and unregister a specific plugin worker.
*/
stopWorker(pluginId: string): Promise<void>;
/**
* Get the worker handle for a plugin.
*/
getWorker(pluginId: string): PluginWorkerHandle | undefined;
/**
* Check if a worker is registered and running for a plugin.
*/
isRunning(pluginId: string): boolean;
/**
* Stop all managed workers. Called during server shutdown.
*/
stopAll(): Promise<void>;
/**
* Get diagnostic info for all workers.
*/
diagnostics(): WorkerDiagnostics[];
/**
* Send an RPC call to a specific plugin worker.
*
* @throws if the worker is not running
*/
call<M extends HostToWorkerMethodName>(
pluginId: string,
method: M,
params: HostToWorkerMethods[M][0],
timeoutMs?: number,
): Promise<HostToWorkerMethods[M][1]>;
}
// ---------------------------------------------------------------------------
// Implementation: createPluginWorkerHandle
// ---------------------------------------------------------------------------
/**
* Create a handle for a single plugin worker process.
*
* @internal Exported for testing; consumers should use `createPluginWorkerManager`.
*/
export function createPluginWorkerHandle(
pluginId: string,
options: WorkerStartOptions,
): PluginWorkerHandle {
const log = logger.child({ service: "plugin-worker", pluginId });
const emitter = new EventEmitter();
/**
* Higher than default (10) to accommodate multiple subscribers to
* crash/ready/exit events during integration tests and runtime monitoring.
*/
emitter.setMaxListeners(50);
// Worker process state
let childProcess: ChildProcess | null = null;
let readline: ReadlineInterface | null = null;
let stderrReadline: ReadlineInterface | null = null;
let status: WorkerStatus = "stopped";
let startedAt: number | null = null;
let stderrExcerpt = "";
// Pending RPC requests awaiting a response
const pendingRequests = new Map<string | number, PendingRequest>();
let nextRequestId = 1;
const activeInvocations = new Map<string, ActiveInvocation>();
// Optional methods reported by the worker during initialization
let supportedMethods: string[] = [];
// Crash tracking for exponential backoff
let consecutiveCrashes = 0;
let totalCrashes = 0;
let lastCrashAt: number | null = null;
let backoffTimer: ReturnType<typeof setTimeout> | null = null;
let nextRestartAt: number | null = null;
// Track open stream channels so we can emit synthetic close on crash.
// Maps channel → companyId.
const openStreamChannels = new Map<string, string>();
// Shutdown coordination
let intentionalStop = false;
const rpcTimeoutMs = options.rpcTimeoutMs ?? DEFAULT_RPC_TIMEOUT_MS;
const autoRestart = options.autoRestart ?? true;
// -----------------------------------------------------------------------
// Status management
// -----------------------------------------------------------------------
function setStatus(newStatus: WorkerStatus): void {
const prev = status;
if (prev === newStatus) return;
status = newStatus;
log.debug({ from: prev, to: newStatus }, "worker status change");
emitter.emit("status", { pluginId, status: newStatus, previousStatus: prev });
}
// -----------------------------------------------------------------------
// JSON-RPC message sending
// -----------------------------------------------------------------------
function sendMessage(message: unknown): void {
if (!childProcess?.stdin?.writable) {
throw new Error(`Worker process for plugin "${pluginId}" is not writable`);
}
const serialized = serializeMessage(message as any);
childProcess.stdin.write(serialized);
}
function errorCodeForWorkerHostError(err: unknown): number {
const code = (err as { code?: unknown } | null)?.code;
const pluginErrorCodes: readonly number[] = Object.values(PLUGIN_RPC_ERROR_CODES);
return typeof code === "number" && pluginErrorCodes.includes(code)
? code
: JSONRPC_ERROR_CODES.INTERNAL_ERROR;
}
// -----------------------------------------------------------------------
// Incoming message handling
// -----------------------------------------------------------------------
function handleLine(line: string): void {
if (!line.trim()) return;
let message: unknown;
try {
message = parseMessage(line);
} catch (err) {
if (err instanceof JsonRpcParseError) {
log.warn({ rawLine: line.slice(0, 200) }, "unparseable message from worker");
} else {
log.warn({ err }, "error parsing worker message");
}
return;
}
if (isJsonRpcResponse(message)) {
handleResponse(message);
} else if (isJsonRpcRequest(message)) {
handleWorkerRequest(message as JsonRpcRequest);
} else if (isJsonRpcNotification(message)) {
handleWorkerNotification(message as JsonRpcNotification);
} else {
log.warn("unknown message type from worker");
}
}
/**
* Handle a JSON-RPC response from the worker (matching a pending request).
*/
function handleResponse(response: JsonRpcResponse): void {
const id = response.id;
if (id === null || id === undefined) {
log.warn("received response with null/undefined id");
return;
}
const pending = pendingRequests.get(id);
if (!pending) {
log.warn({ id }, "received response for unknown request id");
return;
}
clearTimeout(pending.timer);
pendingRequests.delete(id);
pending.resolve(response);
}
function readNonEmptyString(value: unknown): string | null {
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function deriveInvocationScope(
method: HostToWorkerMethodName | string,
params: unknown,
): PluginInvocationScope | null {
if (!isRecord(params)) return null;
const directCompanyId = readNonEmptyString(params.companyId);
if (directCompanyId) return { companyId: directCompanyId };
if (method === "performAction" && isRecord(params.actorContext)) {
const companyId = readNonEmptyString(params.actorContext.companyId);
return companyId ? { companyId } : null;
}
if (method === "executeTool" && isRecord(params.runContext)) {
const companyId = readNonEmptyString(params.runContext.companyId);
return companyId ? { companyId } : null;
}
if (method === "onEvent" && isRecord(params.event)) {
const companyId = readNonEmptyString(params.event.companyId);
return companyId ? { companyId } : null;
}
return null;
}
function registerInvocation(scope: PluginInvocationScope, ttlMs?: number): PluginInvocationContext {
const invocation: PluginInvocationContext = {
id: randomUUID(),
scope,
};
const entry: ActiveInvocation = { scope };
if (ttlMs !== undefined) {
entry.timer = setTimeout(() => {
activeInvocations.delete(invocation.id);
}, ttlMs);
if (entry.timer.unref) entry.timer.unref();
}
activeInvocations.set(invocation.id, entry);
return invocation;
}
function clearInvocation(invocation: PluginInvocationContext | null): void {
if (!invocation) return;
const entry = activeInvocations.get(invocation.id);
if (entry?.timer) clearTimeout(entry.timer);
activeInvocations.delete(invocation.id);
}
function contextForWorkerMessage(message: JsonRpcRequest | JsonRpcNotification): WorkerHostCallContext {
const invocationId = readNonEmptyString(
(message as { paperclipInvocationId?: unknown }).paperclipInvocationId,
);
if (!invocationId) {
const hasActiveInvocation = activeInvocations.size > 0 ||
Array.from(pendingRequests.values()).some((pending) => pending.invocationId);
return hasActiveInvocation ? { invalidInvocationScope: true } : {};
}
const entry = activeInvocations.get(invocationId);
if (!entry) return { invalidInvocationScope: true };
return { invocationScope: entry.scope };
}
/**
* Handle a JSON-RPC request from the worker (worker→host call).
*/
async function handleWorkerRequest(request: JsonRpcRequest): Promise<void> {
const method = request.method as WorkerToHostMethodName;
const handler = options.hostHandlers[method] as
| ((params: unknown, context?: WorkerHostCallContext) => Promise<unknown>)
| undefined;
if (!handler) {
log.warn({ method }, "worker called unregistered host method");
try {
sendMessage(
createErrorResponse(
request.id,
JSONRPC_ERROR_CODES.METHOD_NOT_FOUND,
`Host does not handle method "${method}"`,
),
);
} catch {
// Worker may have exited, ignore send error
}
return;
}
try {
const result = await handler(request.params, contextForWorkerMessage(request));
sendMessage({
jsonrpc: JSONRPC_VERSION,
id: request.id,
result: result ?? null,
});
} catch (err) {
const errorMessage = err instanceof Error ? err.message : String(err);
log.error({ method, err: errorMessage }, "host handler error");
try {
sendMessage(
createErrorResponse(
request.id,
errorCodeForWorkerHostError(err),
errorMessage,
),
);
} catch {
// Worker may have exited, ignore send error
}
}
}
/**
* Handle a JSON-RPC notification from the worker (fire-and-forget).
*
* The `log` notification is the primary case — worker `ctx.logger` calls
* arrive here. We append structured plugin context (pluginId, timestamp,
* level) so that every log entry is queryable per the spec (§26.1).
*/
function handleWorkerNotification(notification: JsonRpcNotification): void {
if (notification.method === "log") {
const params = notification.params as {
level?: string;
message?: string;
meta?: Record<string, unknown>;
} | null;
const level = params?.level ?? "info";
const msg = params?.message ?? "";
const meta = params?.meta;
// Build a structured log object that includes the plugin context fields
// required by §26.1: pluginId, timestamp, level, message, and metadata.
// The child logger already carries `pluginId` in its bindings, but we
// add explicit `pluginLogLevel` and `pluginTimestamp` so downstream
// consumers (log storage, UI queries) can filter without parsing.
const logFields: Record<string, unknown> = {
...meta,
pluginLogLevel: level,
pluginTimestamp: new Date().toISOString(),
};
if (level === "error") {
log.error(logFields, `[plugin] ${msg}`);
} else if (level === "warn") {
log.warn(logFields, `[plugin] ${msg}`);
} else if (level === "debug") {
log.debug(logFields, `[plugin] ${msg}`);
} else {
log.info(logFields, `[plugin] ${msg}`);
}
return;
}
// Stream notifications: forward to the stream bus via callback
if (
notification.method === "streams.open" ||
notification.method === "streams.emit" ||
notification.method === "streams.close"
) {
const params = (notification.params ?? {}) as Record<string, unknown>;
const companyId = String(params.companyId ?? "");
const context = contextForWorkerMessage(notification);
if (context.invalidInvocationScope) {
log.warn(
{ method: notification.method, companyId },
"dropping plugin stream notification with invalid invocation scope",
);
return;
}
const allowedCompanyId = context.invocationScope?.companyId;
if (allowedCompanyId && companyId !== allowedCompanyId) {
log.warn(
{ method: notification.method, companyId, allowedCompanyId },
"dropping plugin stream notification outside invocation company scope",
);
return;
}
// Track open channels so we can emit synthetic close on crash
if (notification.method === "streams.open") {
const ch = String(params.channel ?? "");
if (ch) openStreamChannels.set(ch, companyId);
} else if (notification.method === "streams.close") {
openStreamChannels.delete(String(params.channel ?? ""));
}
if (options.onStreamNotification) {
try {
options.onStreamNotification(notification.method, params);
} catch (err) {
log.error(
{
method: notification.method,
err: err instanceof Error ? err.message : String(err),
},
"stream notification handler failed",
);
}
}
return;
}
log.debug({ method: notification.method }, "received notification from worker");
}
// -----------------------------------------------------------------------
// Process lifecycle
// -----------------------------------------------------------------------
function spawnProcess(): ChildProcess {
// Security: Do NOT spread process.env into the worker. Plugins should only
// receive a minimal, controlled environment to prevent leaking host
// secrets (like DATABASE_URL, internal API keys, etc.).
const workerEnv: Record<string, string> = {
...options.env,
PATH: process.env.PATH ?? "",
NODE_PATH: process.env.NODE_PATH ?? "",
PAPERCLIP_PLUGIN_ID: pluginId,
NODE_ENV: process.env.NODE_ENV ?? "production",
TZ: process.env.TZ ?? "UTC",
};
const child = fork(options.entrypointPath, [], {
stdio: ["pipe", "pipe", "pipe", "ipc"],
execArgv: options.execArgv ?? [],
env: workerEnv,
// Don't let the child keep the parent alive
detached: false,
});
return child;
}
function attachStdioHandlers(child: ChildProcess): void {
// Read NDJSON from stdout
if (child.stdout) {
readline = createInterface({ input: child.stdout });
readline.on("line", handleLine);
}
// Capture stderr for logging
if (child.stderr) {
stderrReadline = createInterface({ input: child.stderr });
stderrReadline.on("line", (line: string) => {
stderrExcerpt = appendStderrExcerpt(stderrExcerpt, line);
log.warn({ stream: "stderr" }, `[plugin stderr] ${line}`);
});
}
// Handle process exit
child.on("exit", (code, signal) => {
handleProcessExit(code, signal);
});
// Handle process errors (e.g. spawn failure)
child.on("error", (err) => {
log.error({ err: err.message }, "worker process error");
if (emitter.listenerCount("error") > 0) {
emitter.emit("error", { pluginId, error: err });
}
if (status === "starting") {
setStatus("crashed");
rejectAllPending(
new Error(formatWorkerFailureMessage(
`Worker process failed to start: ${err.message}`,
stderrExcerpt,
)),
);
}
});
}
function handleProcessExit(
code: number | null,
signal: NodeJS.Signals | null,
): void {
const wasIntentional = intentionalStop;
// Clean up readline interfaces
if (readline) {
readline.close();
readline = null;
}
if (stderrReadline) {
stderrReadline.close();
stderrReadline = null;
}
childProcess = null;
startedAt = null;
// Reject all pending requests
rejectAllPending(
new Error(formatWorkerFailureMessage(
`Worker process exited (code=${code}, signal=${signal})`,
stderrExcerpt,
)),
);
// Emit synthetic close for any orphaned stream channels so SSE clients
// are notified instead of hanging indefinitely.
if (openStreamChannels.size > 0 && options.onStreamNotification) {
for (const [channel, companyId] of openStreamChannels) {
try {
options.onStreamNotification("streams.close", { channel, companyId });
} catch {
// Best-effort cleanup — don't let it interfere with exit handling
}
}
openStreamChannels.clear();
}
emitter.emit("exit", { pluginId, code, signal });
if (wasIntentional) {
// Graceful stop — status is already "stopping" or will be set to "stopped"
setStatus("stopped");
log.info({ code, signal }, "worker process stopped");
return;
}
// Unexpected exit — crash recovery
totalCrashes++;
const now = Date.now();
// Reset consecutive crash counter if enough time passed
if (lastCrashAt !== null && now - lastCrashAt > CRASH_WINDOW_MS) {
consecutiveCrashes = 0;
}
consecutiveCrashes++;
lastCrashAt = now;
log.error(
{ code, signal, consecutiveCrashes, totalCrashes },
"worker process crashed",
);
const willRestart =
autoRestart && consecutiveCrashes <= MAX_CONSECUTIVE_CRASHES;
setStatus("crashed");
emitter.emit("crash", { pluginId, code, signal, willRestart });
if (willRestart) {
scheduleRestart();
} else {
log.error(
{ consecutiveCrashes, maxCrashes: MAX_CONSECUTIVE_CRASHES },
"max consecutive crashes reached, not restarting",
);
}
}
function rejectAllPending(error: Error): void {
for (const [id, pending] of pendingRequests) {
clearTimeout(pending.timer);
pending.resolve(
createErrorResponse(
pending.id,
PLUGIN_RPC_ERROR_CODES.WORKER_UNAVAILABLE,
error.message,
) as JsonRpcResponse,
);
}
pendingRequests.clear();
for (const invocation of activeInvocations.values()) {
if (invocation.timer) clearTimeout(invocation.timer);
}
activeInvocations.clear();
}
// -----------------------------------------------------------------------
// Crash recovery with exponential backoff
// -----------------------------------------------------------------------
function computeBackoffMs(): number {
// Exponential backoff: MIN_BACKOFF * MULTIPLIER^(consecutiveCrashes - 1)
const delay =
MIN_BACKOFF_MS * Math.pow(BACKOFF_MULTIPLIER, consecutiveCrashes - 1);
// Add jitter: ±25%
const jitter = delay * 0.25 * (Math.random() * 2 - 1);
return Math.min(Math.round(delay + jitter), MAX_BACKOFF_MS);
}
function scheduleRestart(): void {
const delay = computeBackoffMs();
nextRestartAt = Date.now() + delay;
setStatus("backoff");
log.info(
{ delayMs: delay, consecutiveCrashes },
"scheduling restart with backoff",
);
backoffTimer = setTimeout(async () => {
backoffTimer = null;
nextRestartAt = null;
try {
await startInternal();
} catch (err) {
log.error(
{ err: err instanceof Error ? err.message : String(err) },
"restart after backoff failed",
);
}
}, delay);
}
function cancelPendingRestart(): void {
if (backoffTimer !== null) {
clearTimeout(backoffTimer);
backoffTimer = null;
nextRestartAt = null;
}
}
// -----------------------------------------------------------------------
// Start / Stop
// -----------------------------------------------------------------------
async function startInternal(): Promise<void> {
if (status === "running" || status === "starting") {
throw new Error(`Worker for plugin "${pluginId}" is already ${status}`);
}
intentionalStop = false;
setStatus("starting");
stderrExcerpt = "";
const child = spawnProcess();
childProcess = child;
attachStdioHandlers(child);
startedAt = Date.now();
// Send the initialize RPC call
const initParams: InitializeParams = {
manifest: options.manifest,
config: options.config,
instanceInfo: options.instanceInfo,
apiVersion: options.apiVersion,
databaseNamespace: options.databaseNamespace ?? null,
};
try {
const result = await callInternal(
"initialize",
initParams,
INITIALIZE_TIMEOUT_MS,
) as { ok?: boolean; supportedMethods?: string[] } | undefined;
if (!result || !result.ok) {
throw new Error("Worker initialize returned ok=false");
}
supportedMethods = result.supportedMethods ?? [];
} catch (err) {
// Initialize failed — kill the process and propagate
const msg = err instanceof Error ? err.message : String(err);
log.error({ err: msg }, "worker initialize failed");
await killProcess();
setStatus("crashed");
throw new Error(`Worker initialize failed for "${pluginId}": ${msg}`);
}
// Reset crash counter on successful start
consecutiveCrashes = 0;
setStatus("running");
emitter.emit("ready", { pluginId });
log.info({ pid: child.pid }, "worker process started and initialized");
}
async function stopInternal(): Promise<void> {
cancelPendingRestart();
if (status === "stopped" || status === "stopping") {
return;
}
intentionalStop = true;
setStatus("stopping");
if (!childProcess) {
setStatus("stopped");
return;
}
// Step 1: Send shutdown RPC and wait for the worker to exit gracefully.
// We race the shutdown call against a timeout. The worker should process
// the shutdown and exit on its own within the drain period.
try {
await Promise.race([
callInternal("shutdown", {} as Record<string, never>, SHUTDOWN_DRAIN_MS),
waitForExit(SHUTDOWN_DRAIN_MS),
]);
} catch {
// Shutdown call failed or timed out — proceed to kill
log.warn("shutdown RPC failed or timed out, escalating to SIGTERM");
}
// Give the process a brief moment to exit after the shutdown response
if (childProcess) {
await waitForExit(500);
}
// Check if process already exited
if (!childProcess) {
setStatus("stopped");
return;
}
// Step 2: Send SIGTERM and wait
log.info("worker did not exit after shutdown RPC, sending SIGTERM");
await killWithSignal("SIGTERM", SIGTERM_GRACE_MS);
if (!childProcess) {
setStatus("stopped");
return;
}
// Step 3: Forcefully kill with SIGKILL
log.warn("worker did not exit after SIGTERM, sending SIGKILL");
await killWithSignal("SIGKILL", 2_000);
if (childProcess) {
log.error("worker process still alive after SIGKILL — this should not happen");
}
setStatus("stopped");
}
/**
* Wait for the child process to exit, up to `timeoutMs`.
* Resolves immediately if the process is already gone.
*/
function waitForExit(timeoutMs: number): Promise<void> {
return new Promise<void>((resolve) => {
if (!childProcess) {
resolve();
return;
}
let settled = false;
const timer = setTimeout(() => {
if (settled) return;
settled = true;
resolve();
}, timeoutMs);
childProcess.once("exit", () => {
if (settled) return;
settled = true;
clearTimeout(timer);
resolve();
});
});
}
function killWithSignal(
signal: NodeJS.Signals,
waitMs: number,
): Promise<void> {
return new Promise<void>((resolve) => {
if (!childProcess) {
resolve();
return;
}
const timer = setTimeout(() => {
resolve();
}, waitMs);
childProcess.once("exit", () => {
clearTimeout(timer);
resolve();
});
try {
childProcess.kill(signal);
} catch {
clearTimeout(timer);
resolve();
}
});
}
async function killProcess(): Promise<void> {
if (!childProcess) return;
intentionalStop = true;
try {
childProcess.kill("SIGKILL");
} catch {
// Process may already be dead
}
// Wait briefly for exit event
await new Promise<void>((resolve) => {
if (!childProcess) {
resolve();
return;
}
const timer = setTimeout(() => {
resolve();
}, 1_000);
childProcess.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
// -----------------------------------------------------------------------
// RPC call implementation
// -----------------------------------------------------------------------
function callInternal<M extends HostToWorkerMethodName>(
method: M,
params: HostToWorkerMethods[M][0],
timeoutMs?: number,
): Promise<HostToWorkerMethods[M][1]> {
const rpcPromise = new Promise<HostToWorkerMethods[M][1]>((resolve, reject) => {
if (!childProcess?.stdin?.writable) {
reject(
new Error(
`Cannot call "${method}" — worker for "${pluginId}" is not running`,
),
);
return;
}
const id = nextRequestId++;
const timeout = Math.min(timeoutMs ?? rpcTimeoutMs, MAX_RPC_TIMEOUT_MS);
const invocationScope = deriveInvocationScope(method, params);
const invocation = invocationScope ? registerInvocation(invocationScope) : null;
// Guard against double-settlement. When a process exits all pending
// requests are rejected via rejectAllPending(), but the timeout timer
// may still be running. Without this guard the timer's reject fires on
// an already-settled promise, producing an unhandled rejection.
let settled = false;
const settle = <T>(fn: (value: T) => void, value: T): void => {
if (settled) return;
settled = true;
clearTimeout(timer);
pendingRequests.delete(id);
clearInvocation(invocation);
fn(value);
};
const timer = setTimeout(() => {
settle(
reject,
new JsonRpcCallError({
code: PLUGIN_RPC_ERROR_CODES.TIMEOUT,
message: `RPC call "${method}" timed out after ${timeout}ms`,
}),
);
}, timeout);
const pending: PendingRequest = {
id,
method,
resolve: (response: JsonRpcResponse) => {
if (isJsonRpcSuccessResponse(response)) {
settle(resolve, response.result as HostToWorkerMethods[M][1]);
} else if ("error" in response && response.error) {
settle(reject, new JsonRpcCallError(response.error));
} else {
settle(reject, new Error(`Unexpected response format for "${method}"`));
}
},
timer,
sentAt: Date.now(),
invocationId: invocation?.id,
};
pendingRequests.set(id, pending);
try {
const request = {
...createRequest(method, params, id),
...(invocation ? { paperclipInvocation: invocation } : {}),
};
sendMessage(request);
} catch (err) {
clearTimeout(timer);
pendingRequests.delete(id);
clearInvocation(invocation);
reject(
new Error(
`Failed to send "${method}" to worker: ${
err instanceof Error ? err.message : String(err)
}`,
),
);
}
});
// Some call sites hand these promises across async boundaries before
// attaching their own handlers. Mark the promise as handled here so a
// worker-side JSON-RPC error can fail the caller without killing the host
// process via an unhandled rejection.
void rpcPromise.catch(() => undefined);
return rpcPromise;
}
// -----------------------------------------------------------------------
// Public API
// -----------------------------------------------------------------------
const handle: PluginWorkerHandle = {
get pluginId() {
return pluginId;
},
get status() {
return status;
},
get supportedMethods() {
return supportedMethods;
},
async start() {
await startInternal();
},
async stop() {
await stopInternal();
},
async restart() {
await stopInternal();
await startInternal();
},
call<M extends HostToWorkerMethodName>(
method: M,
params: HostToWorkerMethods[M][0],
timeoutMs?: number,
): Promise<HostToWorkerMethods[M][1]> {
if (status !== "running" && status !== "starting") {
return Promise.reject(
new Error(
`Cannot call "${method}" — worker for "${pluginId}" is ${status}`,
),
);
}
return callInternal(method, params, timeoutMs);
},
notify(method: string, params: unknown) {
if (status !== "running") return;
const invocationScope = deriveInvocationScope(method, params);
const invocation = invocationScope ? registerInvocation(invocationScope, MAX_RPC_TIMEOUT_MS) : null;
try {
sendMessage({
jsonrpc: JSONRPC_VERSION,
method,
params,
...(invocation ? { paperclipInvocation: invocation } : {}),
});
} catch {
clearInvocation(invocation);
log.warn({ method }, "failed to send notification to worker");
}
},
on<K extends WorkerHandleEventName>(
event: K,
listener: (payload: WorkerHandleEvents[K]) => void,
) {
emitter.on(event, listener);
},
off<K extends WorkerHandleEventName>(
event: K,
listener: (payload: WorkerHandleEvents[K]) => void,
) {
emitter.off(event, listener);
},
diagnostics(): WorkerDiagnostics {
return {
pluginId,
status,
pid: childProcess?.pid ?? null,
uptime:
startedAt !== null && status === "running"
? Date.now() - startedAt
: null,
consecutiveCrashes,
totalCrashes,
pendingRequests: pendingRequests.size,
lastCrashAt,
nextRestartAt,
};
},
};
return handle;
}
// ---------------------------------------------------------------------------
// Implementation: createPluginWorkerManager
// ---------------------------------------------------------------------------
/**
* Options for creating a PluginWorkerManager.
*/
export interface PluginWorkerManagerOptions {
/**
* Optional callback invoked when a worker emits a lifecycle event
* (crash, restart). Used by the server to publish global live events.
*/
onWorkerEvent?: (event: {
type: "plugin.worker.crashed" | "plugin.worker.restarted";
pluginId: string;
code?: number | null;
signal?: string | null;
willRestart?: boolean;
}) => void;
}
/**
* Create a new PluginWorkerManager.
*
* The manager holds all plugin worker handles and provides a unified API for
* starting, stopping, and communicating with plugin workers.
*
* @example
* ```ts
* const manager = createPluginWorkerManager();
*
* const handle = await manager.startWorker("acme.linear", {
* entrypointPath: "/path/to/worker.cjs",
* manifest,
* config: resolvedConfig,
* instanceInfo: { instanceId: "inst-1", hostVersion: "1.0.0" },
* apiVersion: 1,
* hostHandlers: { "config.get": async () => resolvedConfig, ... },
* });
*
* // Send RPC call to the worker
* const health = await manager.call("acme.linear", "health", {});
*
* // Shutdown all workers on server exit
* await manager.stopAll();
* ```
*/
export function createPluginWorkerManager(
managerOptions?: PluginWorkerManagerOptions,
): PluginWorkerManager {
const log = logger.child({ service: "plugin-worker-manager" });
const workers = new Map<string, PluginWorkerHandle>();
/** Per-plugin startup locks to prevent concurrent spawn races. */
const startupLocks = new Map<string, Promise<PluginWorkerHandle>>();
return {
async startWorker(
pluginId: string,
options: WorkerStartOptions,
): Promise<PluginWorkerHandle> {
// Mutex: if a start is already in-flight for this plugin, wait for it
const inFlight = startupLocks.get(pluginId);
if (inFlight) {
log.warn({ pluginId }, "concurrent startWorker call — waiting for in-flight start");
return inFlight;
}
const existing = workers.get(pluginId);
if (existing && existing.status !== "stopped") {
throw new Error(
`Worker already registered for plugin "${pluginId}" (status: ${existing.status})`,
);
}
const handle = createPluginWorkerHandle(pluginId, options);
workers.set(pluginId, handle);
// Subscribe to crash/ready events for live event forwarding
if (managerOptions?.onWorkerEvent) {
const notify = managerOptions.onWorkerEvent;
handle.on("crash", (payload) => {
notify({
type: "plugin.worker.crashed",
pluginId: payload.pluginId,
code: payload.code,
signal: payload.signal,
willRestart: payload.willRestart,
});
});
handle.on("ready", (payload) => {
// Only emit restarted if this was a crash recovery (totalCrashes > 0)
const diag = handle.diagnostics();
if (diag.totalCrashes > 0) {
notify({
type: "plugin.worker.restarted",
pluginId: payload.pluginId,
});
}
});
}
log.info({ pluginId }, "starting plugin worker");
// Set the lock before awaiting start() to prevent concurrent spawns
const startPromise = handle.start().then(() => handle).finally(() => {
startupLocks.delete(pluginId);
});
startupLocks.set(pluginId, startPromise);
return startPromise;
},
async stopWorker(pluginId: string): Promise<void> {
const handle = workers.get(pluginId);
if (!handle) {
log.warn({ pluginId }, "no worker registered for plugin, nothing to stop");
return;
}
log.info({ pluginId }, "stopping plugin worker");
await handle.stop();
workers.delete(pluginId);
},
getWorker(pluginId: string): PluginWorkerHandle | undefined {
return workers.get(pluginId);
},
isRunning(pluginId: string): boolean {
const handle = workers.get(pluginId);
return handle?.status === "running";
},
async stopAll(): Promise<void> {
log.info({ count: workers.size }, "stopping all plugin workers");
const promises = Array.from(workers.values()).map(async (handle) => {
try {
await handle.stop();
} catch (err) {
log.error(
{
pluginId: handle.pluginId,
err: err instanceof Error ? err.message : String(err),
},
"error stopping worker during shutdown",
);
}
});
await Promise.all(promises);
workers.clear();
},
diagnostics(): WorkerDiagnostics[] {
return Array.from(workers.values()).map((h) => h.diagnostics());
},
call<M extends HostToWorkerMethodName>(
pluginId: string,
method: M,
params: HostToWorkerMethods[M][0],
timeoutMs?: number,
): Promise<HostToWorkerMethods[M][1]> {
const handle = workers.get(pluginId);
if (!handle) {
return Promise.reject(
new Error(`No worker registered for plugin "${pluginId}"`),
);
}
return handle.call(method, params, timeoutMs);
},
};
}