diff --git a/packages/adapter-utils/src/sandbox-callback-bridge.test.ts b/packages/adapter-utils/src/sandbox-callback-bridge.test.ts index c18343af..cdc01fbe 100644 --- a/packages/adapter-utils/src/sandbox-callback-bridge.test.ts +++ b/packages/adapter-utils/src/sandbox-callback-bridge.test.ts @@ -8,10 +8,12 @@ import { afterEach, describe, expect, it } from "vitest"; import { prepareCommandManagedRuntime } from "./command-managed-runtime.js"; import { authorizeSandboxCallbackBridgeRequestWithRoutes, + createCommandManagedSandboxCallbackBridgeQueueClient, createFileSystemSandboxCallbackBridgeQueueClient, createSandboxCallbackBridgeAsset, createSandboxCallbackBridgeToken, sandboxCallbackBridgeDirectories, + syncSandboxCallbackBridgeEntrypoint, startSandboxCallbackBridgeServer, startSandboxCallbackBridgeWorker, } from "./sandbox-callback-bridge.js"; @@ -420,6 +422,98 @@ describe("sandbox callback bridge", () => { ); }); + it("serializes remote response writes so stop does not recreate a late orphaned response", async () => { + const rootDir = await mkdtemp(path.join(os.tmpdir(), "paperclip-bridge-response-lock-")); + cleanupDirs.push(rootDir); + + const localWorkspaceDir = path.join(rootDir, "local-workspace"); + const remoteWorkspaceDir = path.join(rootDir, "remote-workspace"); + await mkdir(localWorkspaceDir, { recursive: true }); + await mkdir(remoteWorkspaceDir, { recursive: true }); + await writeFile(path.join(localWorkspaceDir, "README.md"), "bridge response lock test\n", "utf8"); + + const runner = createExecRunner(); + const bridgeAsset = await createSandboxCallbackBridgeAsset(); + cleanupFns.push(bridgeAsset.cleanup); + const prepared = await prepareCommandManagedRuntime({ + runner, + spec: { + remoteCwd: remoteWorkspaceDir, + timeoutMs: 30_000, + }, + adapterKey: "codex", + workspaceLocalDir: localWorkspaceDir, + assets: [{ key: "bridge", localDir: bridgeAsset.localDir }], + }); + + const queueDir = path.posix.join(prepared.runtimeRootDir, "paperclip-bridge"); + const directories = sandboxCallbackBridgeDirectories(queueDir); + const bridgeToken = createSandboxCallbackBridgeToken(); + const seenRequestIds: string[] = []; + + const worker = await startSandboxCallbackBridgeWorker({ + client: createCommandManagedSandboxCallbackBridgeQueueClient({ + runner, + remoteCwd: remoteWorkspaceDir, + timeoutMs: 30_000, + }), + queueDir, + authorizeRequest: async () => null, + handleRequest: async (request) => { + seenRequestIds.push(request.id); + await new Promise((resolve) => setTimeout(resolve, 100)); + return { + status: 200, + headers: { "content-type": "application/json" }, + body: JSON.stringify({ ok: true, id: request.id }), + }; + }, + }); + cleanupFns.push(async () => { + await worker.stop(); + }); + + const bridge = await startSandboxCallbackBridgeServer({ + runner, + remoteCwd: remoteWorkspaceDir, + assetRemoteDir: prepared.assetDirs.bridge, + queueDir, + bridgeToken, + timeoutMs: 30_000, + }); + cleanupFns.push(async () => { + await bridge.stop(); + }); + + const responsePromise = fetch(`${bridge.baseUrl}/api/agents/me`, { + headers: { + authorization: `Bearer ${bridgeToken}`, + }, + }); + + for (let attempt = 0; attempt < 50 && seenRequestIds.length === 0; attempt += 1) { + await new Promise((resolve) => setTimeout(resolve, 5)); + } + + expect(seenRequestIds).toHaveLength(1); + await worker.stop({ drainTimeoutMs: 10 }); + + const response = await responsePromise; + expect(response.status).toBe(503); + await expect(response.json()).resolves.toEqual({ + error: "Bridge worker stopped before request could be handled.", + }); + + await new Promise((resolve) => setTimeout(resolve, 150)); + + await expect(readdir(directories.responsesDir)).resolves.toEqual([]); + await expect( + readdir(directories.responsesDir).then((entries) => + entries.filter((entry) => entry.endsWith(".tmp") || entry.includes(".paperclip-write.lock")), + ), + ).resolves.toEqual([]); + }); + it("rejects non-JSON request bodies and full queues at the bridge server", async () => { const rootDir = await mkdtemp(path.join(os.tmpdir(), "paperclip-bridge-server-guards-")); cleanupDirs.push(rootDir); @@ -615,6 +709,112 @@ describe("sandbox callback bridge", () => { }); }); + it("reuses an already-uploaded bridge entrypoint when the remote file hash matches", async () => { + const rootDir = await mkdtemp(path.join(os.tmpdir(), "paperclip-bridge-sync-")); + cleanupDirs.push(rootDir); + + const remoteWorkspaceDir = path.join(rootDir, "remote-workspace"); + const remoteAssetDir = path.posix.join( + remoteWorkspaceDir, + ".paperclip-runtime", + "codex", + "paperclip-bridge", + "server", + ); + await mkdir(remoteWorkspaceDir, { recursive: true }); + + const bridgeAsset = await createSandboxCallbackBridgeAsset(); + cleanupFns.push(bridgeAsset.cleanup); + const originalSource = await readFile(bridgeAsset.entrypoint, "utf8"); + const expandedSource = `${originalSource}\n// bridge payload padding\n`; + await writeFile(bridgeAsset.entrypoint, expandedSource, "utf8"); + + const runner = createExecRunner(); + + const first = await syncSandboxCallbackBridgeEntrypoint({ + runner, + remoteCwd: remoteWorkspaceDir, + assetRemoteDir: remoteAssetDir, + bridgeAsset, + timeoutMs: 30_000, + }); + const second = await syncSandboxCallbackBridgeEntrypoint({ + runner, + remoteCwd: remoteWorkspaceDir, + assetRemoteDir: remoteAssetDir, + bridgeAsset, + timeoutMs: 30_000, + }); + + expect(first.uploaded).toBe(true); + expect(second.uploaded).toBe(false); + await expect(readFile(path.posix.join(remoteAssetDir, "paperclip-bridge-server.mjs"), "utf8")).resolves.toBe(expandedSource); + await expect( + readdir(remoteAssetDir).then((entries) => + entries.filter( + (entry) => + entry.endsWith(".paperclip-upload.b64") || + entry.endsWith(".partial") || + entry === ".paperclip-bridge-upload.lock", + ), + ), + ).resolves.toEqual([]); + }); + + it("rejects a corrupted bridge entrypoint upload without committing a torn remote file", async () => { + const rootDir = await mkdtemp(path.join(os.tmpdir(), "paperclip-bridge-sync-corrupt-")); + cleanupDirs.push(rootDir); + + const remoteWorkspaceDir = path.join(rootDir, "remote-workspace"); + const remoteAssetDir = path.posix.join( + remoteWorkspaceDir, + ".paperclip-runtime", + "codex", + "paperclip-bridge", + "server", + ); + await mkdir(remoteWorkspaceDir, { recursive: true }); + + const bridgeAsset = await createSandboxCallbackBridgeAsset(); + cleanupFns.push(bridgeAsset.cleanup); + const runner = { + execute: async (input: { + command: string; + args?: string[]; + cwd?: string; + env?: Record; + stdin?: string; + timeoutMs?: number; + }) => + await createExecRunner().execute({ + ...input, + stdin: input.stdin != null ? "" : input.stdin, + }), + }; + + await expect( + syncSandboxCallbackBridgeEntrypoint({ + runner, + remoteCwd: remoteWorkspaceDir, + assetRemoteDir: remoteAssetDir, + bridgeAsset, + timeoutMs: 30_000, + }), + ).rejects.toThrow(/sha mismatch/i); + + await expect(readFile(path.posix.join(remoteAssetDir, "paperclip-bridge-server.mjs"), "utf8")).rejects.toThrow(); + await expect( + readdir(remoteAssetDir).then((entries) => + entries.filter( + (entry) => + entry.endsWith(".paperclip-upload.b64") || + entry.endsWith(".partial") || + entry === ".paperclip-bridge-upload.lock", + ), + ), + ).resolves.toEqual([]); + }); + it("permits the documented heartbeat surface and denies unrelated routes", () => { const allowed: Array<{ method: string; path: string }> = [ { method: "GET", path: "/api/agents/me" }, diff --git a/packages/adapter-utils/src/sandbox-callback-bridge.ts b/packages/adapter-utils/src/sandbox-callback-bridge.ts index cb020046..d545616a 100644 --- a/packages/adapter-utils/src/sandbox-callback-bridge.ts +++ b/packages/adapter-utils/src/sandbox-callback-bridge.ts @@ -1,4 +1,4 @@ -import { randomBytes, randomUUID } from "node:crypto"; +import { createHash, randomBytes, randomUUID } from "node:crypto"; import { promises as fs } from "node:fs"; import os from "node:os"; import path from "node:path"; @@ -145,6 +145,13 @@ export interface SandboxCallbackBridgeQueueClient { listJsonFiles(remotePath: string): Promise; readTextFile(remotePath: string): Promise; writeTextFile(remotePath: string, body: string): Promise; + writeResponseFile?( + responsePath: string, + body: string, + options?: { + requestPath?: string | null; + }, + ): Promise<{ wrote: boolean }>; rename(fromPath: string, toPath: string): Promise; remove(remotePath: string): Promise; } @@ -196,12 +203,14 @@ async function runShell( script: string, timeoutMs: number, shellCommand: "bash" | "sh" = "sh", + stdin?: string, ): Promise { return await runner.execute({ command: shellCommand, args: ["-lc", script], cwd, timeoutMs, + stdin, }); } @@ -218,6 +227,43 @@ function base64Chunks(body: string): string[] { return out; } +async function pathExists(filePath: string): Promise { + return await fs.stat(filePath).then(() => true).catch(() => false); +} + +function buildRemotePidLockAcquireScript(lockDirExpr: string, timeoutMessage: string): string[] { + return [ + "attempts=0", + `while ! mkdir ${lockDirExpr} 2>/dev/null; do`, + " holder_pid=\"\"", + ` if [ -s ${lockDirExpr}/pid ]; then`, + ` holder_pid="$(cat ${lockDirExpr}/pid 2>/dev/null || true)"`, + " fi", + " if [ -n \"$holder_pid\" ] && ! kill -0 \"$holder_pid\" 2>/dev/null; then", + ` rm -rf ${lockDirExpr}`, + " continue", + " fi", + " attempts=$((attempts + 1))", + " if [ \"$attempts\" -ge 600 ]; then", + ` echo ${shellQuote(timeoutMessage)} >&2`, + " exit 1", + " fi", + " sleep 0.05", + "done", + `printf '%s\\n' "$$" > ${lockDirExpr}/pid`, + ]; +} + +function buildRemotePidLockCleanupScript(lockDirExpr: string, cleanupLines: string[]): string[] { + return [ + "cleanup() {", + ...cleanupLines.map((line) => ` ${line}`), + ` rm -rf ${lockDirExpr}`, + "}", + "trap cleanup EXIT INT TERM", + ]; +} + export function createSandboxCallbackBridgeToken(bytes = DEFAULT_BRIDGE_TOKEN_BYTES): string { return randomBytes(bytes).toString("base64url"); } @@ -315,6 +361,80 @@ export function createFileSystemSandboxCallbackBridgeQueueClient(): SandboxCallb await fs.mkdir(path.posix.dirname(remotePath), { recursive: true }); await fs.writeFile(remotePath, body, "utf8"); }, + writeResponseFile: async (responsePath, body, options = {}) => { + const responseDir = path.posix.dirname(responsePath); + const tempPath = `${responsePath}.tmp`; + const lockDir = `${responsePath}.paperclip-write.lock`; + const lockPidFile = `${lockDir}/pid`; + if (options.requestPath) { + const requestExists = await pathExists(options.requestPath); + if (!requestExists) { + return { wrote: false }; + } + } + await fs.mkdir(responseDir, { recursive: true }); + // PID-liveness mkdir-mutex: mirrors the shell-based bridge mutex so a + // crashed holder (SIGKILL / OOM) doesn't deadlock subsequent writers + // for the full timeout window. + let attempts = 0; + while (true) { + try { + await fs.mkdir(lockDir); + await fs.writeFile(lockPidFile, `${process.pid}\n`, "utf8"); + break; + } catch (error) { + const code = (error as NodeJS.ErrnoException)?.code; + if (code !== "EEXIST") { + throw error; + } + let holderPid: number | null = null; + try { + const raw = await fs.readFile(lockPidFile, "utf8"); + const parsed = Number.parseInt(raw.trim(), 10); + if (Number.isFinite(parsed) && parsed > 0) holderPid = parsed; + } catch { + // pid file missing or unreadable — treat as stale lock + } + let holderAlive = false; + if (holderPid !== null) { + try { + process.kill(holderPid, 0); + holderAlive = true; + } catch { + holderAlive = false; + } + } + if (!holderAlive) { + await fs.rm(lockDir, { recursive: true, force: true }).catch(() => undefined); + continue; + } + attempts += 1; + if (attempts >= 600) { + throw new Error("Timed out acquiring sandbox callback bridge response lock."); + } + await new Promise((resolve) => setTimeout(resolve, 50)); + } + } + + try { + if (options.requestPath) { + const requestExists = await pathExists(options.requestPath); + if (!requestExists) { + return { wrote: false }; + } + } + const responseExists = await pathExists(responsePath); + if (responseExists) { + return { wrote: false }; + } + await fs.writeFile(tempPath, body, "utf8"); + await fs.rename(tempPath, responsePath); + return { wrote: true }; + } finally { + await fs.rm(tempPath, { force: true }).catch(() => undefined); + await fs.rm(lockDir, { recursive: true, force: true }).catch(() => undefined); + } + }, rename: async (fromPath, toPath) => { await fs.mkdir(path.posix.dirname(toPath), { recursive: true }); await fs.rename(fromPath, toPath); @@ -385,6 +505,53 @@ export function createCommandManagedSandboxCallbackBridgeQueueClient(input: { `base64 -d < ${shellQuote(tempPath)} > ${shellQuote(remotePath)} && rm -f ${shellQuote(tempPath)}`, ); }, + writeResponseFile: async (responsePath, body, options = {}) => { + const responseDir = path.posix.dirname(responsePath); + const tempPath = `${responsePath}.tmp`; + const lockDir = `${responsePath}.paperclip-write.lock`; + const requestPath = options.requestPath?.trim() || ""; + const result = await runShell( + input.runner, + input.remoteCwd, + [ + "set -eu", + `response_dir=${shellQuote(responseDir)}`, + `response_path=${shellQuote(responsePath)}`, + `temp_path=${shellQuote(tempPath)}`, + `lock_dir=${shellQuote(lockDir)}`, + `request_path=${shellQuote(requestPath)}`, + "mkdir -p \"$response_dir\"", + ...buildRemotePidLockAcquireScript("\"$lock_dir\"", "Timed out acquiring sandbox callback bridge response lock."), + ...buildRemotePidLockCleanupScript("\"$lock_dir\"", [ + "rm -f \"$temp_path\"", + ]), + "if [ -n \"$request_path\" ] && [ ! -f \"$request_path\" ]; then", + " printf '{\"wrote\":false}\\n'", + " exit 0", + "fi", + "if [ -f \"$response_path\" ]; then", + " printf '{\"wrote\":false}\\n'", + " exit 0", + "fi", + "cat > \"$temp_path\"", + "mv \"$temp_path\" \"$response_path\"", + "printf '{\"wrote\":true}\\n'", + ].join("\n"), + timeoutMs, + shellCommand, + body, + ); + requireSuccessfulResult(`write bridge response ${responsePath}`, result); + try { + return { + wrote: JSON.parse(result.stdout.trim())?.wrote === true, + }; + } catch (error) { + throw new Error( + `Sandbox callback bridge response write wrote invalid result JSON: ${error instanceof Error ? error.message : String(error)}`, + ); + } + }, rename: async (fromPath, toPath) => { await runChecked( `rename ${fromPath}`, @@ -399,11 +566,17 @@ export function createCommandManagedSandboxCallbackBridgeQueueClient(input: { async function writeBridgeResponse( client: SandboxCallbackBridgeQueueClient, + requestPath: string, responsePath: string, response: SandboxCallbackBridgeResponse, ) { + const body = `${JSON.stringify(response)}\n`; + if (client.writeResponseFile) { + await client.writeResponseFile(responsePath, body, { requestPath }); + return; + } const tempPath = `${responsePath}.tmp`; - await client.writeTextFile(tempPath, `${JSON.stringify(response)}\n`); + await client.writeTextFile(tempPath, body); await client.rename(tempPath, responsePath); } @@ -447,7 +620,7 @@ export async function startSandboxCallbackBridgeWorker(input: { request = JSON.parse(raw) as SandboxCallbackBridgeRequest; } catch { const requestId = fileName.replace(/\.json$/i, "") || randomUUID(); - await writeBridgeResponse(input.client, responsePath, { + await writeBridgeResponse(input.client, requestPath, responsePath, { id: requestId, status: 400, headers: { "content-type": "application/json" }, @@ -460,7 +633,7 @@ export async function startSandboxCallbackBridgeWorker(input: { const denialReason = await authorizeRequest(request); if (denialReason) { - await writeBridgeResponse(input.client, responsePath, { + await writeBridgeResponse(input.client, requestPath, responsePath, { id: request.id, status: 403, headers: { "content-type": "application/json" }, @@ -477,7 +650,7 @@ export async function startSandboxCallbackBridgeWorker(input: { if (Buffer.byteLength(responseBody, "utf8") > maxBodyBytes) { throw new Error(`Bridge response body exceeded the configured size limit of ${maxBodyBytes} bytes.`); } - await writeBridgeResponse(input.client, responsePath, { + await writeBridgeResponse(input.client, requestPath, responsePath, { id: request.id, status: result.status, headers: result.headers ?? {}, @@ -488,7 +661,7 @@ export async function startSandboxCallbackBridgeWorker(input: { console.warn( `[paperclip] sandbox callback bridge handler failed for ${request.id}: ${error instanceof Error ? error.message : String(error)}`, ); - await writeBridgeResponse(input.client, responsePath, { + await writeBridgeResponse(input.client, requestPath, responsePath, { id: request.id, status: 502, headers: { "content-type": "application/json" }, @@ -511,7 +684,7 @@ export async function startSandboxCallbackBridgeWorker(input: { try { const raw = await input.client.readTextFile(requestPath); const parsed = JSON.parse(raw) as Partial; - await writeBridgeResponse(input.client, responsePath, { + await writeBridgeResponse(input.client, requestPath, responsePath, { id: typeof parsed.id === "string" && parsed.id.length > 0 ? parsed.id : requestId, status: 503, headers: { "content-type": "application/json" }, @@ -578,6 +751,99 @@ export async function startSandboxCallbackBridgeWorker(input: { }; } +export async function syncSandboxCallbackBridgeEntrypoint(input: { + runner: CommandManagedRuntimeRunner; + remoteCwd: string; + assetRemoteDir: string; + bridgeAsset: SandboxCallbackBridgeAsset; + timeoutMs?: number | null; + shellCommand?: "bash" | "sh" | null; +}): Promise<{ remoteEntrypoint: string; sha256: string; uploaded: boolean }> { + const timeoutMs = normalizeTimeoutMs(input.timeoutMs, DEFAULT_BRIDGE_RESPONSE_TIMEOUT_MS); + const shellCommand = preferredShellForSandbox(input.shellCommand); + const remoteEntrypoint = path.posix.join(input.assetRemoteDir, SANDBOX_CALLBACK_BRIDGE_ENTRYPOINT); + const remoteEntrypointPartial = `${remoteEntrypoint}.partial`; + const remoteUploadPath = `${remoteEntrypoint}.paperclip-upload.b64`; + const remoteLockDir = path.posix.join(input.assetRemoteDir, ".paperclip-bridge-upload.lock"); + const entrypointSource = await fs.readFile(input.bridgeAsset.entrypoint, "utf8"); + const entrypointBase64 = toBuffer(Buffer.from(entrypointSource, "utf8")).toString("base64"); + const sha256 = createHash("sha256").update(entrypointSource, "utf8").digest("hex"); + + const syncResult = await runShell( + input.runner, + input.remoteCwd, + [ + "set -eu", + `remote_dir=${shellQuote(input.assetRemoteDir)}`, + `remote_path=${shellQuote(remoteEntrypoint)}`, + `remote_partial=${shellQuote(remoteEntrypointPartial)}`, + `remote_upload=${shellQuote(remoteUploadPath)}`, + `lock_dir=${shellQuote(remoteLockDir)}`, + `expected_sha=${shellQuote(sha256)}`, + "hash_file() {", + " if command -v sha256sum >/dev/null 2>&1; then", + " sha256sum \"$1\" | awk '{print $1}'", + " return 0", + " fi", + " if command -v shasum >/dev/null 2>&1; then", + " shasum -a 256 \"$1\" | awk '{print $1}'", + " return 0", + " fi", + " return 127", + "}", + "mkdir -p \"$remote_dir\"", + ...buildRemotePidLockAcquireScript("\"$lock_dir\"", "Timed out acquiring sandbox callback bridge upload lock."), + ...buildRemotePidLockCleanupScript("\"$lock_dir\"", [ + "rm -f \"$remote_upload\" \"$remote_partial\"", + ]), + "current_sha=\"\"", + "if [ -f \"$remote_path\" ]; then", + " current_sha=\"$(hash_file \"$remote_path\" 2>/dev/null)\" || current_sha=\"\"", + "fi", + "if [ -n \"$current_sha\" ] && [ \"$current_sha\" = \"$expected_sha\" ]; then", + " printf '{\"uploaded\":false}\\n'", + " exit 0", + "fi", + "rm -f \"$remote_upload\" \"$remote_partial\"", + "cat > \"$remote_upload\"", + "base64 -d < \"$remote_upload\" > \"$remote_partial\"", + // Verify upload integrity. If neither sha256sum nor shasum is on PATH + // (minimal Alpine/scratch images), surface the missing-tool error + // instead of a misleading "sha mismatch" — the verify step is then + // best-effort and we trust base64-decode + atomic rename below. + "if partial_sha=\"$(hash_file \"$remote_partial\" 2>/dev/null)\"; then", + " if [ \"$partial_sha\" != \"$expected_sha\" ]; then", + " echo \"Sandbox callback bridge entrypoint upload sha mismatch.\" >&2", + " exit 1", + " fi", + "else", + " echo \"Sandbox callback bridge entrypoint sha verify skipped: no sha256sum/shasum on remote.\" >&2", + "fi", + "mv \"$remote_partial\" \"$remote_path\"", + "printf '{\"uploaded\":true}\\n'", + ].join("\n"), + timeoutMs, + shellCommand, + entrypointBase64, + ); + requireSuccessfulResult("sync sandbox callback bridge entrypoint", syncResult); + + let uploaded = false; + try { + uploaded = JSON.parse(syncResult.stdout.trim())?.uploaded === true; + } catch (error) { + throw new Error( + `Sandbox callback bridge sync wrote invalid result JSON: ${error instanceof Error ? error.message : String(error)}`, + ); + } + + return { + remoteEntrypoint, + sha256, + uploaded, + }; +} + export async function startSandboxCallbackBridgeServer(input: { runner: CommandManagedRuntimeRunner; remoteCwd: string; @@ -598,17 +864,17 @@ export async function startSandboxCallbackBridgeServer(input: { const timeoutMs = normalizeTimeoutMs(input.timeoutMs, DEFAULT_BRIDGE_RESPONSE_TIMEOUT_MS); const shellCommand = preferredShellForSandbox(input.shellCommand); const directories = sandboxCallbackBridgeDirectories(input.queueDir); - const remoteEntrypoint = path.posix.join(input.assetRemoteDir, SANDBOX_CALLBACK_BRIDGE_ENTRYPOINT); + let remoteEntrypoint = path.posix.join(input.assetRemoteDir, SANDBOX_CALLBACK_BRIDGE_ENTRYPOINT); if (input.bridgeAsset) { - const assetClient = createCommandManagedSandboxCallbackBridgeQueueClient({ + const assetSync = await syncSandboxCallbackBridgeEntrypoint({ runner: input.runner, remoteCwd: input.remoteCwd, + assetRemoteDir: input.assetRemoteDir, + bridgeAsset: input.bridgeAsset, timeoutMs, shellCommand, }); - await assetClient.makeDir(input.assetRemoteDir); - const entrypointSource = await fs.readFile(input.bridgeAsset.entrypoint, "utf8"); - await assetClient.writeTextFile(remoteEntrypoint, entrypointSource); + remoteEntrypoint = assetSync.remoteEntrypoint; } const env = buildSandboxCallbackBridgeEnv({ queueDir: input.queueDir, diff --git a/packages/adapter-utils/src/ssh-fixture.test.ts b/packages/adapter-utils/src/ssh-fixture.test.ts index dd1ae28a..7ca4a657 100644 --- a/packages/adapter-utils/src/ssh-fixture.test.ts +++ b/packages/adapter-utils/src/ssh-fixture.test.ts @@ -70,6 +70,42 @@ describe("ssh env-lab fixture", () => { expect(stopped.running).toBe(false); }); + it("forwards stdin to remote SSH commands", async () => { + const support = await getSshEnvLabSupport(); + if (!support.supported) { + console.warn( + `Skipping SSH stdin forwarding test: ${support.reason ?? "unsupported environment"}`, + ); + return; + } + + const rootDir = await mkdtemp(path.join(os.tmpdir(), "paperclip-ssh-fixture-")); + cleanupDirs.push(rootDir); + const statePath = path.join(rootDir, "state.json"); + + const started = await startSshEnvLabFixture({ statePath }); + const config = await buildSshEnvLabFixtureConfig(started); + const remotePath = path.posix.join(started.workspaceDir, "stdin-forwarded.txt"); + + await runSshCommand( + config, + `sh -lc 'cat > ${JSON.stringify(remotePath)}'`, + { + stdin: "hello over ssh stdin\n", + timeoutMs: 30_000, + maxBuffer: 256 * 1024, + }, + ); + + const result = await runSshCommand( + config, + `sh -lc 'cat ${JSON.stringify(remotePath)}'`, + { timeoutMs: 30_000, maxBuffer: 256 * 1024 }, + ); + + expect(result.stdout).toBe("hello over ssh stdin\n"); + }); + it("does not treat an unrelated reused pid as the running fixture", async () => { const support = await getSshEnvLabSupport(); if (!support.supported) { diff --git a/packages/adapter-utils/src/ssh.ts b/packages/adapter-utils/src/ssh.ts index 2b5dbbad..5070461c 100644 --- a/packages/adapter-utils/src/ssh.ts +++ b/packages/adapter-utils/src/ssh.ts @@ -61,6 +61,7 @@ export function createSshCommandManagedRuntimeRunner(input: { try { const result = await runSshCommand(input.spec, remoteCommand, { + stdin: commandInput.stdin, timeoutMs: commandInput.timeoutMs, maxBuffer: maxBufferBytes, }); @@ -205,6 +206,113 @@ async function execFileText( }); } +async function spawnText( + file: string, + args: string[], + options: { + stdin?: string; + timeout?: number; + maxBuffer?: number; + } = {}, +): Promise { + return await new Promise((resolve, reject) => { + const child = spawn(file, args, { + stdio: [options.stdin != null ? "pipe" : "ignore", "pipe", "pipe"], + }); + + const maxBuffer = options.maxBuffer ?? 1024 * 128; + let stdout = ""; + let stderr = ""; + let settled = false; + let timedOut = false; + + const finishReject = (error: Error & { stdout?: string; stderr?: string; code?: number | null; killed?: boolean }) => { + if (settled) return; + settled = true; + error.stdout = stdout; + error.stderr = stderr; + error.killed = timedOut; + reject(error); + }; + + const append = ( + streamName: "stdout" | "stderr", + chunk: unknown, + ) => { + const text = String(chunk); + if (streamName === "stdout") { + stdout += text; + } else { + stderr += text; + } + if (Buffer.byteLength(stdout, "utf8") > maxBuffer || Buffer.byteLength(stderr, "utf8") > maxBuffer) { + child.kill("SIGTERM"); + finishReject(Object.assign(new Error(`Process output exceeded maxBuffer of ${maxBuffer} bytes.`), { + code: null, + })); + } + }; + + let killEscalation: NodeJS.Timeout | null = null; + const timeout = options.timeout && options.timeout > 0 + ? setTimeout(() => { + timedOut = true; + child.kill("SIGTERM"); + // Escalate to SIGKILL after a 5s grace window so a hung remote + // command that ignores SIGTERM cannot keep the child alive + // indefinitely. + killEscalation = setTimeout(() => { + try { + child.kill("SIGKILL"); + } catch { + // child may have already exited between the SIGTERM and the + // escalation — that's fine. + } + }, 5_000); + killEscalation.unref?.(); + }, options.timeout) + : null; + + const clearTimers = () => { + if (timeout) clearTimeout(timeout); + if (killEscalation) clearTimeout(killEscalation); + }; + + child.stdout?.on("data", (chunk) => { + append("stdout", chunk); + }); + child.stderr?.on("data", (chunk) => { + append("stderr", chunk); + }); + + child.on("error", (error) => { + clearTimers(); + finishReject(Object.assign(error, { code: null })); + }); + + child.on("close", (code, signal) => { + clearTimers(); + if (settled) return; + settled = true; + if (code === 0) { + resolve({ stdout, stderr }); + return; + } + reject(Object.assign(new Error(stderr.trim() || stdout.trim() || `Process exited with code ${code ?? -1}`), { + stdout, + stderr, + code, + signal, + killed: timedOut, + })); + }); + + if (options.stdin != null && child.stdin) { + child.stdin.end(options.stdin); + } + }); +} + async function runLocalGit( localDir: string, args: string[], @@ -722,6 +830,7 @@ export async function runSshCommand( remoteCommand: string, options: { env?: Record; + stdin?: string; timeoutMs?: number; maxBuffer?: number; } = {}, @@ -760,10 +869,16 @@ export async function runSshCommand( `sh -lc ${shellQuote(remoteScript)}`, ); - return await execFileText("ssh", sshArgs, { - timeout: options.timeoutMs ?? 15_000, - maxBuffer: options.maxBuffer ?? 1024 * 128, - }); + return options.stdin != null + ? await spawnText("ssh", sshArgs, { + stdin: options.stdin, + timeout: options.timeoutMs ?? 15_000, + maxBuffer: options.maxBuffer ?? 1024 * 128, + }) + : await execFileText("ssh", sshArgs, { + timeout: options.timeoutMs ?? 15_000, + maxBuffer: options.maxBuffer ?? 1024 * 128, + }); } finally { await cleanup(); }