From 58d1b19206f72fa920d0420891fdff6c0d409fd5 Mon Sep 17 00:00:00 2001 From: Dotta Date: Wed, 13 May 2026 12:44:48 -0500 Subject: [PATCH] fix(plugin): address kubernetes fast upload review --- .../sandbox-providers/kubernetes/README.md | 2 +- .../sandbox-providers/kubernetes/src/plugin.ts | 18 +++++++++++++++++- .../kubernetes/src/pod-exec.ts | 1 - .../kubernetes/src/upload-interceptor.ts | 10 ++++++++-- .../test/unit/upload-interceptor.test.ts | 18 ++++++++++++++++++ 5 files changed, 44 insertions(+), 5 deletions(-) diff --git a/packages/plugins/sandbox-providers/kubernetes/README.md b/packages/plugins/sandbox-providers/kubernetes/README.md index be148f48..ddbfa759 100644 --- a/packages/plugins/sandbox-providers/kubernetes/README.md +++ b/packages/plugins/sandbox-providers/kubernetes/README.md @@ -108,7 +108,7 @@ Secret pc-{ulid}-env (owned by Sandbox CR; cascade The `sandbox-cr` backend recognizes the chunked base64 upload protocol emitted by `@paperclipai/adapter-utils` for workspace, skill, and config-seed file transfers. Instead of running one Kubernetes exec per base64 chunk, the plugin buffers the upload in worker memory and flushes the final payload through a single `head -c | base64 -d` exec with stdin. -The interceptor is intentionally narrow: only the exact `mkdir`/`printf`/`base64 -d` command shape generated by adapter-utils is optimized. Unknown commands, missing init state, or uploads over the 100 MB buffer cap fall back to normal exec behavior. +The interceptor is intentionally narrow: only the exact `mkdir`/`printf`/`base64 -d` command shape generated by adapter-utils is optimized. Unknown commands and missing init state fall back to normal exec behavior. Uploads over the 100 MB buffer cap fail fast instead of falling back, because earlier chunks were already acknowledged without being written to the pod. For each agent run (job backend): diff --git a/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts b/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts index b0ad14aa..321a0ea4 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts @@ -559,7 +559,7 @@ const plugin = definePlugin({ const dir = slashIndex > 0 ? decision.flush.targetPath.slice(0, slashIndex) : "."; const script = `mkdir -p ${shellQuoteArg(dir)} && ` + - `head -c ${base64Body.length} | base64 -d > ${shellQuoteArg(decision.flush.targetPath)}`; + `base64 -d > ${shellQuoteArg(decision.flush.targetPath)}`; const flushResult = await execInPod( kc, namespace, @@ -585,6 +585,22 @@ const plugin = definePlugin({ }, }; } + if (decision.action === "error") { + return { + exitCode: 1, + timedOut: false, + stdout: "", + stderr: decision.message, + metadata: { + provider: "kubernetes", + backend: "sandbox-cr", + namespace, + sandboxName: lease.providerLeaseId, + podName, + fastUpload: "error", + }, + }; + } } const execCommand = buildSandboxExecCommand(params); diff --git a/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts b/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts index 946a97a2..79604352 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts @@ -151,7 +151,6 @@ export async function execInPod( .then((webSocket) => { ws = webSocket as WebSocketLike; if (stdinStream && stdinPayload) { - stdinStream.removeAllListeners("end"); stdinStream.end(stdinPayload); } ws.on("close", (code: number, reason: Buffer) => { diff --git a/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts b/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts index 74ae763c..45dca39f 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts @@ -31,6 +31,7 @@ export interface FastUploadFlush { export type FastUploadDecision = | { action: "ack"; reason: string } | { action: "flush"; flush: FastUploadFlush } + | { action: "error"; message: string } | { action: "passthrough"; reason: string }; interface BufferedUpload { @@ -42,6 +43,8 @@ interface BufferedUpload { export class FastUploadInterceptor { private readonly buffers = new Map(); + constructor(private readonly maxBufferBytes = MAX_BUFFER_BYTES) {} + decide(command: string): FastUploadDecision { const initMatch = INIT_RE.exec(command); if (initMatch) { @@ -69,9 +72,12 @@ export class FastUploadInterceptor { return { action: "passthrough", reason: "chunk without prior init" }; } - if (upload.totalBase64Chars + base64Chunk.length > (MAX_BUFFER_BYTES * 4) / 3) { + if (upload.totalBase64Chars + base64Chunk.length > (this.maxBufferBytes * 4) / 3) { this.buffers.delete(tempPath); - return { action: "passthrough", reason: "buffer cap exceeded" }; + return { + action: "error", + message: `Fast upload buffer cap exceeded for ${upload.targetPath}; retry the upload with a smaller payload.`, + }; } upload.chunks.push(base64Chunk); diff --git a/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts b/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts index 8f9ea466..6f31a980 100644 --- a/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts +++ b/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts @@ -57,6 +57,24 @@ describe("FastUploadInterceptor", () => { expect(interceptor.pendingCount).toBe(0); }); + it("fails fast instead of falling through after acknowledged chunks exceed the buffer cap", () => { + const interceptor = new FastUploadInterceptor(1); + const target = "/workspace/file.bin"; + + expect( + interceptor.decide( + `mkdir -p '/workspace' && rm -f '${target}.paperclip-upload.b64' && : > '${target}.paperclip-upload.b64'`, + ), + ).toMatchObject({ action: "ack" }); + + const decision = interceptor.decide(`printf '%s' 'AAAA' >> '${target}.paperclip-upload.b64'`); + expect(decision).toMatchObject({ + action: "error", + message: expect.stringContaining("Fast upload buffer cap exceeded"), + }); + expect(interceptor.pendingCount).toBe(0); + }); + it("clears buffered uploads on reset", () => { const interceptor = new FastUploadInterceptor(); const target = "/workspace/file.bin";