From fcbbd50b6041e6ee491d41d50e7971ce2ca0a754 Mon Sep 17 00:00:00 2001 From: Dotta Date: Wed, 13 May 2026 12:01:34 -0500 Subject: [PATCH] feat(plugin): add kubernetes fast upload interceptor --- .../sandbox-providers/kubernetes/README.md | 6 + .../sandbox-providers/kubernetes/SMOKE.md | 12 +- .../kubernetes/src/plugin.ts | 107 +++++++++++++++-- .../kubernetes/src/pod-exec.ts | 69 +++++++++-- .../kubernetes/src/upload-interceptor.ts | 111 ++++++++++++++++++ .../kubernetes/test/unit/plugin.test.ts | 18 +++ .../kubernetes/test/unit/pod-exec.test.ts | 29 +++++ .../test/unit/upload-interceptor.test.ts | 72 ++++++++++++ 8 files changed, 400 insertions(+), 24 deletions(-) create mode 100644 packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts create mode 100644 packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts diff --git a/packages/plugins/sandbox-providers/kubernetes/README.md b/packages/plugins/sandbox-providers/kubernetes/README.md index 1e521033..be148f48 100644 --- a/packages/plugins/sandbox-providers/kubernetes/README.md +++ b/packages/plugins/sandbox-providers/kubernetes/README.md @@ -104,6 +104,12 @@ Pod pc-{ulid}-{podSuffix} (managed by Sandbox controlle Secret pc-{ulid}-env (owned by Sandbox CR; cascade-deleted) ``` +## Fast workspace uploads + +The `sandbox-cr` backend recognizes the chunked base64 upload protocol emitted by `@paperclipai/adapter-utils` for workspace, skill, and config-seed file transfers. Instead of running one Kubernetes exec per base64 chunk, the plugin buffers the upload in worker memory and flushes the final payload through a single `head -c | base64 -d` exec with stdin. + +The interceptor is intentionally narrow: only the exact `mkdir`/`printf`/`base64 -d` command shape generated by adapter-utils is optimized. Unknown commands, missing init state, or uploads over the 100 MB buffer cap fall back to normal exec behavior. + For each agent run (job backend): ``` diff --git a/packages/plugins/sandbox-providers/kubernetes/SMOKE.md b/packages/plugins/sandbox-providers/kubernetes/SMOKE.md index 9b23828f..72b32e0b 100644 --- a/packages/plugins/sandbox-providers/kubernetes/SMOKE.md +++ b/packages/plugins/sandbox-providers/kubernetes/SMOKE.md @@ -94,9 +94,10 @@ The plugin's `onEnvironmentAcquireLease` will: 1. `ensureTenant` — provision the `paperclip-smoke` namespace, SA, Role, RoleBinding, ResourceQuota, LimitRange, NetworkPolicies -2. `buildJobManifest` — render the security-hardened Job manifest -3. `createJob` — submit to `batch/v1` -4. `createPerRunSecret` — owned by the Job for cascade-delete +2. `buildSandboxCrManifest` — render the security-hardened Sandbox CR manifest +3. `createNamespacedCustomObject` — submit to `agents.x-k8s.io/v1alpha1` +4. `createPerRunSecret` — owned by the Sandbox CR for cascade-delete +5. Fast-upload workspace/config/skill payloads by collapsing adapter-utils chunked uploads into a single stdin-backed exec per file ### 7. Verify the tenant resources @@ -113,8 +114,9 @@ Expected: - Role `paperclip-tenant-role`, RoleBinding `paperclip-tenant-rb` - ResourceQuota `paperclip-quota`, LimitRange `paperclip-limits` - NetworkPolicies `paperclip-deny-all` + `paperclip-egress-allow` -- Job `pc-{ulid}` and its child Pod -- Secret `pc-{ulid}-env` with `ownerReferences` pointing at the Job +- Sandbox `pc-{ulid}` and its managed Pod +- Secret `pc-{ulid}-env` with `ownerReferences` pointing at the Sandbox CR +- Run logs or plugin metadata include `fastUpload: "flush"` entries during workspace/config/skill upload ### 8. Tear down diff --git a/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts b/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts index 05bb763c..b0ad14aa 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts @@ -25,6 +25,7 @@ import { buildJobManifest } from "./pod-spec-builder.js"; import { buildSandboxCrManifest } from "./sandbox-cr-builder.js"; import { ensureTenant } from "./tenant-orchestrator.js"; import { createPerRunSecret } from "./secret-manager.js"; +import { FastUploadInterceptor } from "./upload-interceptor.js"; import { jobOrchestrator, JobTimeoutError } from "./job-orchestrator.js"; import { sandboxCrOrchestrator, @@ -51,6 +52,29 @@ const DEFAULT_RESOURCE_QUOTA = { limitsMemory: "40Gi", }; +const uploadInterceptorsByLease = new Map(); + +function getOrCreateUploadInterceptor(leaseId: string): FastUploadInterceptor { + let interceptor = uploadInterceptorsByLease.get(leaseId); + if (!interceptor) { + interceptor = new FastUploadInterceptor(); + uploadInterceptorsByLease.set(leaseId, interceptor); + } + return interceptor; +} + +function extractShellScript( + params: Pick, +): string | null { + const command = typeof params.command === "string" ? params.command.trim() : ""; + const args = Array.isArray(params.args) ? params.args : []; + const isShell = command === "sh" || command === "bash" || command.endsWith("/sh") || command.endsWith("/bash"); + if (isShell && args[0] === "-c" && typeof args[1] === "string") { + return args[1]; + } + return null; +} + function deriveTenantNamespace(config: KubernetesProviderConfig, companyId: string): string { // TODO: future versions could thread companyName through AcquireLeaseParams // to get a friendlier slug (e.g. "acme-corp") instead of the UUID-derived one. @@ -93,6 +117,24 @@ function shellQuoteArg(arg: string): string { return "'" + arg.replace(/'/g, "'\\''") + "'"; } +export function buildSandboxExecCommand( + params: Pick, +): string[] { + const command = typeof params.command === "string" ? params.command.trim() : ""; + const args = Array.isArray(params.args) ? params.args : []; + + if (command.length > 0 && args.length > 0) { + return [command, ...args]; + } + if (command.length > 0) { + return ["/bin/sh", "-lc", command]; + } + if (args.length > 0) { + return ["/bin/sh", "-lc", args.map(shellQuoteArg).join(" ")]; + } + return ["/bin/sh", "-l"]; +} + export function buildSandboxExecShellCommand( params: Pick, ): string { @@ -379,6 +421,8 @@ const plugin = definePlugin({ const releaseOrchestrator = leaseBackend === "sandbox-cr" ? sandboxCrOrchestrator : jobOrchestrator; + uploadInterceptorsByLease.delete(params.providerLeaseId); + try { await releaseOrchestrator.release(clients, namespace, params.providerLeaseId); } catch (err) { @@ -488,15 +532,62 @@ const plugin = definePlugin({ }; } - // Build the command to exec. If params.command is provided use it; - // otherwise wrap in a login shell so profile scripts run. - const rawCommand = buildSandboxExecShellCommand(params); - - const execCommand = rawCommand.length > 0 - ? ["/bin/sh", "-lc", rawCommand] - : ["/bin/sh", "-l"]; - const remainingTimeoutMs = Math.max(1, effectiveTimeoutMs - (Date.now() - executeStartedAt)); + + const shellScript = extractShellScript(params); + if (shellScript) { + const decision = getOrCreateUploadInterceptor(lease.providerLeaseId).decide(shellScript); + if (decision.action === "ack") { + return { + exitCode: 0, + timedOut: false, + stdout: "", + stderr: "", + metadata: { + provider: "kubernetes", + backend: "sandbox-cr", + namespace, + sandboxName: lease.providerLeaseId, + podName, + fastUpload: "ack", + }, + }; + } + if (decision.action === "flush") { + const base64Body = decision.flush.payload.toString("base64"); + const slashIndex = decision.flush.targetPath.lastIndexOf("/"); + const dir = slashIndex > 0 ? decision.flush.targetPath.slice(0, slashIndex) : "."; + const script = + `mkdir -p ${shellQuoteArg(dir)} && ` + + `head -c ${base64Body.length} | base64 -d > ${shellQuoteArg(decision.flush.targetPath)}`; + const flushResult = await execInPod( + kc, + namespace, + podName, + "agent", + ["/bin/sh", "-c", script], + base64Body, + remainingTimeoutMs, + ); + return { + exitCode: flushResult.exitCode, + timedOut: flushResult.timedOut, + stdout: flushResult.stdout, + stderr: flushResult.stderr, + metadata: { + provider: "kubernetes", + backend: "sandbox-cr", + namespace, + sandboxName: lease.providerLeaseId, + podName, + fastUpload: "flush", + uploadedBytes: decision.flush.payload.length, + }, + }; + } + } + + const execCommand = buildSandboxExecCommand(params); const execResult = await execInPod( kc, namespace, diff --git a/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts b/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts index 4d7d7f83..946a97a2 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts @@ -14,6 +14,12 @@ import { Exec } from "@kubernetes/client-node"; import { PassThrough } from "node:stream"; import type { KubeConfig } from "@kubernetes/client-node"; +type WebSocketLike = { + close(): void; + on(event: "close", listener: (code: number, reason: Buffer) => void): void; + on(event: "error", listener: (err: Error) => void): void; +}; + export interface ExecInPodResult { exitCode: number; timedOut: boolean; @@ -21,24 +27,31 @@ export interface ExecInPodResult { stderr: string; } +function shQuoteArg(arg: string): string { + return "'" + arg.replace(/'/g, "'\\''") + "'"; +} + export async function execInPod( kc: KubeConfig, namespace: string, podName: string, containerName: string, command: string[], - stdin?: string, + stdin?: string | Buffer, timeoutMs?: number, ): Promise { const exec = new Exec(kc); const stdoutStream = new PassThrough(); const stderrStream = new PassThrough(); - // If stdin is provided build a readable stream from it; the Exec API accepts - // a Readable | null for stdin. - const stdinStream: import("node:stream").Readable | null = stdin - ? PassThrough.from(stdin) + const stdinPayload: Buffer | null = + Buffer.isBuffer(stdin) ? stdin + : typeof stdin === "string" && stdin.length > 0 ? Buffer.from(stdin, "utf-8") : null; + const stdinStream: PassThrough | null = stdinPayload ? new PassThrough() : null; + const effectiveCommand = stdinPayload + ? ["/bin/sh", "-c", `head -c ${stdinPayload.length} | ${command.map(shQuoteArg).join(" ")}`] + : command; let stdoutData = ""; let stderrData = ""; @@ -52,17 +65,27 @@ export async function execInPod( return await new Promise( (resolve, reject) => { + let ws: WebSocketLike | null = null; let settled = false; + let pendingResult: Omit | null = null; + let stdoutEnded = false; + let stderrEnded = false; const timeout = typeof timeoutMs === "number" && timeoutMs > 0 ? setTimeout(() => { finishWithTransportFailure(`Kubernetes exec timed out after ${timeoutMs}ms`, true); }, timeoutMs) : null; + const finish = (result: ExecInPodResult) => { if (settled) return; settled = true; if (timeout) clearTimeout(timeout); + try { + ws?.close(); + } catch { + // Ignore best-effort close failures. + } resolve(result); }; const finishWithTransportFailure = (message: string, timedOut = false) => { @@ -74,13 +97,30 @@ export async function execInPod( stderr: `${stderrData}${separator}${message}`, }); }; + const tryFinish = () => { + if (settled || !pendingResult || !stdoutEnded || !stderrEnded) return; + finish({ + ...pendingResult, + stdout: stdoutData, + stderr: stderrData, + }); + }; + + stdoutStream.on("end", () => { + stdoutEnded = true; + tryFinish(); + }); + stderrStream.on("end", () => { + stderrEnded = true; + tryFinish(); + }); const websocketPromise = exec .exec( namespace, podName, containerName, - command, + effectiveCommand, stdoutStream, stderrStream, stdinStream, @@ -88,7 +128,8 @@ export async function execInPod( (status) => { // status.status is "Success" | "Failure" if (status.status === "Success") { - finish({ exitCode: 0, timedOut: false, stdout: stdoutData, stderr: stderrData }); + pendingResult = { exitCode: 0, timedOut: false }; + tryFinish(); return; } // On failure, the exit code surfaces via @@ -101,19 +142,25 @@ export async function execInPod( const exitCode = exitCodeCause?.message ? Number(exitCodeCause.message) : 1; - finish({ exitCode, timedOut: false, stdout: stdoutData, stderr: stderrData }); + pendingResult = { exitCode, timedOut: false }; + tryFinish(); }, ); websocketPromise - .then((ws) => { + .then((webSocket) => { + ws = webSocket as WebSocketLike; + if (stdinStream && stdinPayload) { + stdinStream.removeAllListeners("end"); + stdinStream.end(stdinPayload); + } ws.on("close", (code: number, reason: Buffer) => { - if (settled) return; + if (settled || pendingResult) return; const reasonText = reason.length > 0 ? `: ${reason.toString("utf-8")}` : ""; finishWithTransportFailure(`Kubernetes exec websocket closed before status frame (${code})${reasonText}`); }); ws.on("error", (err: Error) => { - if (settled) return; + if (settled || pendingResult) return; finishWithTransportFailure(`Kubernetes exec websocket failed before status frame: ${err.message}`); }); }) diff --git a/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts b/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts new file mode 100644 index 00000000..74ae763c --- /dev/null +++ b/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts @@ -0,0 +1,111 @@ +/** + * Fast-upload interceptor for the chunked-shell file transfer protocol used by + * `@paperclipai/adapter-utils` command-managed runtimes. + * + * The normal path writes files through many shell execs: + * 1. mkdir/rm/touch `.paperclip-upload.b64` + * 2. append many base64 chunks with printf + * 3. base64-decode the temp file into the final target + * + * On Kubernetes each exec is a new WebSocket round trip. This state machine + * recognizes that exact protocol, buffers the base64 chunks in the plugin + * worker, and lets the caller flush the final payload through one exec. + * Pattern drift or missing state falls through to the original exec path. + */ +import { posix as pathPosix } from "node:path"; + +const INIT_RE = + /^mkdir -p '([^']+)' && rm -f '([^']+)\.paperclip-upload\.b64' && : > '\2\.paperclip-upload\.b64'$/; +const CHUNK_RE = + /^printf '%s' '([A-Za-z0-9+/=]+)' >> '([^']+)\.paperclip-upload\.b64'$/; +const FINALIZE_RE = + /^base64 -d < '([^']+)\.paperclip-upload\.b64' > '\1' && rm -f '\1\.paperclip-upload\.b64'$/; + +const MAX_BUFFER_BYTES = 100 * 1024 * 1024; + +export interface FastUploadFlush { + targetPath: string; + payload: Buffer; +} + +export type FastUploadDecision = + | { action: "ack"; reason: string } + | { action: "flush"; flush: FastUploadFlush } + | { action: "passthrough"; reason: string }; + +interface BufferedUpload { + targetPath: string; + chunks: string[]; + totalBase64Chars: number; +} + +export class FastUploadInterceptor { + private readonly buffers = new Map(); + + decide(command: string): FastUploadDecision { + const initMatch = INIT_RE.exec(command); + if (initMatch) { + const dir = initMatch[1]; + const targetPath = initMatch[2]; + if (pathPosix.dirname(targetPath) !== dir) { + return { action: "passthrough", reason: "init dir/target mismatch" }; + } + + this.buffers.set(`${targetPath}.paperclip-upload.b64`, { + targetPath, + chunks: [], + totalBase64Chars: 0, + }); + return { action: "ack", reason: `init upload to ${targetPath}` }; + } + + const chunkMatch = CHUNK_RE.exec(command); + if (chunkMatch) { + const base64Chunk = chunkMatch[1]; + const targetPath = chunkMatch[2]; + const tempPath = `${targetPath}.paperclip-upload.b64`; + const upload = this.buffers.get(tempPath); + if (!upload) { + return { action: "passthrough", reason: "chunk without prior init" }; + } + + if (upload.totalBase64Chars + base64Chunk.length > (MAX_BUFFER_BYTES * 4) / 3) { + this.buffers.delete(tempPath); + return { action: "passthrough", reason: "buffer cap exceeded" }; + } + + upload.chunks.push(base64Chunk); + upload.totalBase64Chars += base64Chunk.length; + return { action: "ack", reason: `buffered ${base64Chunk.length} base64 chars` }; + } + + const finalizeMatch = FINALIZE_RE.exec(command); + if (finalizeMatch) { + const targetPath = finalizeMatch[1]; + const tempPath = `${targetPath}.paperclip-upload.b64`; + const upload = this.buffers.get(tempPath); + if (!upload) { + return { action: "passthrough", reason: "finalize without buffered state" }; + } + + this.buffers.delete(tempPath); + return { + action: "flush", + flush: { + targetPath: upload.targetPath, + payload: Buffer.from(upload.chunks.join(""), "base64"), + }, + }; + } + + return { action: "passthrough", reason: "no upload pattern" }; + } + + reset(): void { + this.buffers.clear(); + } + + get pendingCount(): number { + return this.buffers.size; + } +} diff --git a/packages/plugins/sandbox-providers/kubernetes/test/unit/plugin.test.ts b/packages/plugins/sandbox-providers/kubernetes/test/unit/plugin.test.ts index 777de3b0..be098949 100644 --- a/packages/plugins/sandbox-providers/kubernetes/test/unit/plugin.test.ts +++ b/packages/plugins/sandbox-providers/kubernetes/test/unit/plugin.test.ts @@ -1,5 +1,6 @@ import { describe, it, expect } from "vitest"; import plugin, { + buildSandboxExecCommand, buildSandboxExecShellCommand, extractAdapterEnvFromProcess, } from "../../src/plugin.js"; @@ -161,4 +162,21 @@ describe("plugin", () => { }), ).toBe("pnpm test -- --runInBand"); }); + + it("passes command and args directly to Kubernetes exec", () => { + expect( + buildSandboxExecCommand({ + command: "sh", + args: ["-c", "printf '%s' ok"], + }), + ).toEqual(["sh", "-c", "printf '%s' ok"]); + }); + + it("wraps command-only execution in a login shell", () => { + expect( + buildSandboxExecCommand({ + command: "pnpm test -- --runInBand", + }), + ).toEqual(["/bin/sh", "-lc", "pnpm test -- --runInBand"]); + }); }); diff --git a/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts b/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts index 24648da1..0730ca2d 100644 --- a/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts +++ b/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts @@ -17,6 +17,8 @@ describe("execInPod", () => { it("returns success when the Kubernetes exec status callback reports success", async () => { execMock.mockImplementation((_namespace, _pod, _container, _command, stdout, _stderr, _stdin, _tty, statusCallback) => { stdout.write("ok\n"); + stdout.end(); + _stderr.end(); statusCallback({ status: "Success" }); return Promise.resolve(new EventEmitter()); }); @@ -49,4 +51,31 @@ describe("execInPod", () => { expect(result.timedOut).toBe(true); expect(result.stderr).toContain("Kubernetes exec timed out after 5ms"); }); + + it("wraps stdin commands with a byte-counted head prefix", async () => { + let observedCommand: string[] | undefined; + let observedStdin = ""; + let observedStdinFinished = false; + + execMock.mockImplementation((_namespace, _pod, _container, command, stdout, stderr, stdin, _tty, statusCallback) => { + observedCommand = command; + stdin?.on("data", (chunk: Buffer) => { + observedStdin += chunk.toString("utf8"); + }); + stdin?.on("finish", () => { + observedStdinFinished = true; + }); + stdout.end(); + stderr.end(); + statusCallback({ status: "Success" }); + return Promise.resolve(new EventEmitter()); + }); + + await execInPod({} as never, "ns", "pod-1", "agent", ["base64", "-d"], "abc"); + await Promise.resolve(); + + expect(observedCommand).toEqual(["/bin/sh", "-c", "head -c 3 | 'base64' '-d'"]); + expect(observedStdin).toBe("abc"); + expect(observedStdinFinished).toBe(true); + }); }); diff --git a/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts b/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts new file mode 100644 index 00000000..8f9ea466 --- /dev/null +++ b/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts @@ -0,0 +1,72 @@ +import { describe, expect, it } from "vitest"; +import { FastUploadInterceptor } from "../../src/upload-interceptor.js"; + +describe("FastUploadInterceptor", () => { + it("collapses the adapter-utils chunked upload protocol into one flush", () => { + const interceptor = new FastUploadInterceptor(); + const target = "/workspace/.paperclip-runtime/skills.tar"; + const chunkA = Buffer.from("hello ").toString("base64").slice(0, 4); + const chunkB = Buffer.from("hello ").toString("base64").slice(4) + Buffer.from("world").toString("base64"); + + expect( + interceptor.decide( + `mkdir -p '/workspace/.paperclip-runtime' && rm -f '${target}.paperclip-upload.b64' && : > '${target}.paperclip-upload.b64'`, + ), + ).toMatchObject({ action: "ack" }); + expect(interceptor.pendingCount).toBe(1); + + expect( + interceptor.decide(`printf '%s' '${chunkA}' >> '${target}.paperclip-upload.b64'`), + ).toMatchObject({ action: "ack" }); + expect( + interceptor.decide(`printf '%s' '${chunkB}' >> '${target}.paperclip-upload.b64'`), + ).toMatchObject({ action: "ack" }); + + const decision = interceptor.decide( + `base64 -d < '${target}.paperclip-upload.b64' > '${target}' && rm -f '${target}.paperclip-upload.b64'`, + ); + expect(decision.action).toBe("flush"); + if (decision.action !== "flush") throw new Error("expected flush"); + expect(decision.flush.targetPath).toBe(target); + expect(decision.flush.payload.toString("utf8")).toBe("hello world"); + expect(interceptor.pendingCount).toBe(0); + }); + + it("passes through chunks and finalizers without a matching init", () => { + const interceptor = new FastUploadInterceptor(); + const target = "/workspace/file.bin"; + + expect( + interceptor.decide(`printf '%s' 'aGVsbG8=' >> '${target}.paperclip-upload.b64'`), + ).toMatchObject({ action: "passthrough", reason: "chunk without prior init" }); + expect( + interceptor.decide( + `base64 -d < '${target}.paperclip-upload.b64' > '${target}' && rm -f '${target}.paperclip-upload.b64'`, + ), + ).toMatchObject({ action: "passthrough", reason: "finalize without buffered state" }); + }); + + it("falls through when the init command does not match the target parent directory", () => { + const interceptor = new FastUploadInterceptor(); + + expect( + interceptor.decide( + "mkdir -p '/tmp' && rm -f '/workspace/file.bin.paperclip-upload.b64' && : > '/workspace/file.bin.paperclip-upload.b64'", + ), + ).toMatchObject({ action: "passthrough", reason: "init dir/target mismatch" }); + expect(interceptor.pendingCount).toBe(0); + }); + + it("clears buffered uploads on reset", () => { + const interceptor = new FastUploadInterceptor(); + const target = "/workspace/file.bin"; + + interceptor.decide( + `mkdir -p '/workspace' && rm -f '${target}.paperclip-upload.b64' && : > '${target}.paperclip-upload.b64'`, + ); + expect(interceptor.pendingCount).toBe(1); + + interceptor.reset(); + expect(interceptor.pendingCount).toBe(0); + }); +});