feat(plugin): add kubernetes fast upload interceptor

This commit is contained in:
Dotta
2026-05-13 12:01:34 -05:00
committed by Chris Farhood
parent a6c2e0392b
commit fcbbd50b60
8 changed files with 400 additions and 24 deletions
@@ -104,6 +104,12 @@ Pod pc-{ulid}-{podSuffix} (managed by Sandbox controlle
Secret pc-{ulid}-env (owned by Sandbox CR; cascade-deleted)
```
## Fast workspace uploads
The `sandbox-cr` backend recognizes the chunked base64 upload protocol emitted by `@paperclipai/adapter-utils` for workspace, skill, and config-seed file transfers. Instead of running one Kubernetes exec per base64 chunk, the plugin buffers the upload in worker memory and flushes the final payload through a single `head -c <bytes> | base64 -d` exec with stdin.
The interceptor is intentionally narrow: only the exact `mkdir`/`printf`/`base64 -d` command shape generated by adapter-utils is optimized. Unknown commands, missing init state, or uploads over the 100 MB buffer cap fall back to normal exec behavior.
For each agent run (job backend):
```
@@ -94,9 +94,10 @@ The plugin's `onEnvironmentAcquireLease` will:
1. `ensureTenant` — provision the `paperclip-smoke` namespace, SA, Role,
RoleBinding, ResourceQuota, LimitRange, NetworkPolicies
2. `buildJobManifest` — render the security-hardened Job manifest
3. `createJob` — submit to `batch/v1`
4. `createPerRunSecret` — owned by the Job for cascade-delete
2. `buildSandboxCrManifest` — render the security-hardened Sandbox CR manifest
3. `createNamespacedCustomObject` — submit to `agents.x-k8s.io/v1alpha1`
4. `createPerRunSecret` — owned by the Sandbox CR for cascade-delete
5. Fast-upload workspace/config/skill payloads by collapsing adapter-utils chunked uploads into a single stdin-backed exec per file
### 7. Verify the tenant resources
@@ -113,8 +114,9 @@ Expected:
- Role `paperclip-tenant-role`, RoleBinding `paperclip-tenant-rb`
- ResourceQuota `paperclip-quota`, LimitRange `paperclip-limits`
- NetworkPolicies `paperclip-deny-all` + `paperclip-egress-allow`
- Job `pc-{ulid}` and its child Pod
- Secret `pc-{ulid}-env` with `ownerReferences` pointing at the Job
- Sandbox `pc-{ulid}` and its managed Pod
- Secret `pc-{ulid}-env` with `ownerReferences` pointing at the Sandbox CR
- Run logs or plugin metadata include `fastUpload: "flush"` entries during workspace/config/skill upload
### 8. Tear down
@@ -25,6 +25,7 @@ import { buildJobManifest } from "./pod-spec-builder.js";
import { buildSandboxCrManifest } from "./sandbox-cr-builder.js";
import { ensureTenant } from "./tenant-orchestrator.js";
import { createPerRunSecret } from "./secret-manager.js";
import { FastUploadInterceptor } from "./upload-interceptor.js";
import { jobOrchestrator, JobTimeoutError } from "./job-orchestrator.js";
import {
sandboxCrOrchestrator,
@@ -51,6 +52,29 @@ const DEFAULT_RESOURCE_QUOTA = {
limitsMemory: "40Gi",
};
const uploadInterceptorsByLease = new Map<string, FastUploadInterceptor>();
function getOrCreateUploadInterceptor(leaseId: string): FastUploadInterceptor {
let interceptor = uploadInterceptorsByLease.get(leaseId);
if (!interceptor) {
interceptor = new FastUploadInterceptor();
uploadInterceptorsByLease.set(leaseId, interceptor);
}
return interceptor;
}
function extractShellScript(
params: Pick<PluginEnvironmentExecuteParams, "args" | "command">,
): string | null {
const command = typeof params.command === "string" ? params.command.trim() : "";
const args = Array.isArray(params.args) ? params.args : [];
const isShell = command === "sh" || command === "bash" || command.endsWith("/sh") || command.endsWith("/bash");
if (isShell && args[0] === "-c" && typeof args[1] === "string") {
return args[1];
}
return null;
}
function deriveTenantNamespace(config: KubernetesProviderConfig, companyId: string): string {
// TODO: future versions could thread companyName through AcquireLeaseParams
// to get a friendlier slug (e.g. "acme-corp") instead of the UUID-derived one.
@@ -93,6 +117,24 @@ function shellQuoteArg(arg: string): string {
return "'" + arg.replace(/'/g, "'\\''") + "'";
}
export function buildSandboxExecCommand(
params: Pick<PluginEnvironmentExecuteParams, "args" | "command">,
): string[] {
const command = typeof params.command === "string" ? params.command.trim() : "";
const args = Array.isArray(params.args) ? params.args : [];
if (command.length > 0 && args.length > 0) {
return [command, ...args];
}
if (command.length > 0) {
return ["/bin/sh", "-lc", command];
}
if (args.length > 0) {
return ["/bin/sh", "-lc", args.map(shellQuoteArg).join(" ")];
}
return ["/bin/sh", "-l"];
}
export function buildSandboxExecShellCommand(
params: Pick<PluginEnvironmentExecuteParams, "args" | "command">,
): string {
@@ -379,6 +421,8 @@ const plugin = definePlugin({
const releaseOrchestrator =
leaseBackend === "sandbox-cr" ? sandboxCrOrchestrator : jobOrchestrator;
uploadInterceptorsByLease.delete(params.providerLeaseId);
try {
await releaseOrchestrator.release(clients, namespace, params.providerLeaseId);
} catch (err) {
@@ -488,15 +532,62 @@ const plugin = definePlugin({
};
}
// Build the command to exec. If params.command is provided use it;
// otherwise wrap in a login shell so profile scripts run.
const rawCommand = buildSandboxExecShellCommand(params);
const execCommand = rawCommand.length > 0
? ["/bin/sh", "-lc", rawCommand]
: ["/bin/sh", "-l"];
const remainingTimeoutMs = Math.max(1, effectiveTimeoutMs - (Date.now() - executeStartedAt));
const shellScript = extractShellScript(params);
if (shellScript) {
const decision = getOrCreateUploadInterceptor(lease.providerLeaseId).decide(shellScript);
if (decision.action === "ack") {
return {
exitCode: 0,
timedOut: false,
stdout: "",
stderr: "",
metadata: {
provider: "kubernetes",
backend: "sandbox-cr",
namespace,
sandboxName: lease.providerLeaseId,
podName,
fastUpload: "ack",
},
};
}
if (decision.action === "flush") {
const base64Body = decision.flush.payload.toString("base64");
const slashIndex = decision.flush.targetPath.lastIndexOf("/");
const dir = slashIndex > 0 ? decision.flush.targetPath.slice(0, slashIndex) : ".";
const script =
`mkdir -p ${shellQuoteArg(dir)} && ` +
`head -c ${base64Body.length} | base64 -d > ${shellQuoteArg(decision.flush.targetPath)}`;
const flushResult = await execInPod(
kc,
namespace,
podName,
"agent",
["/bin/sh", "-c", script],
base64Body,
remainingTimeoutMs,
);
return {
exitCode: flushResult.exitCode,
timedOut: flushResult.timedOut,
stdout: flushResult.stdout,
stderr: flushResult.stderr,
metadata: {
provider: "kubernetes",
backend: "sandbox-cr",
namespace,
sandboxName: lease.providerLeaseId,
podName,
fastUpload: "flush",
uploadedBytes: decision.flush.payload.length,
},
};
}
}
const execCommand = buildSandboxExecCommand(params);
const execResult = await execInPod(
kc,
namespace,
@@ -14,6 +14,12 @@ import { Exec } from "@kubernetes/client-node";
import { PassThrough } from "node:stream";
import type { KubeConfig } from "@kubernetes/client-node";
type WebSocketLike = {
close(): void;
on(event: "close", listener: (code: number, reason: Buffer) => void): void;
on(event: "error", listener: (err: Error) => void): void;
};
export interface ExecInPodResult {
exitCode: number;
timedOut: boolean;
@@ -21,24 +27,31 @@ export interface ExecInPodResult {
stderr: string;
}
function shQuoteArg(arg: string): string {
return "'" + arg.replace(/'/g, "'\\''") + "'";
}
export async function execInPod(
kc: KubeConfig,
namespace: string,
podName: string,
containerName: string,
command: string[],
stdin?: string,
stdin?: string | Buffer,
timeoutMs?: number,
): Promise<ExecInPodResult> {
const exec = new Exec(kc);
const stdoutStream = new PassThrough();
const stderrStream = new PassThrough();
// If stdin is provided build a readable stream from it; the Exec API accepts
// a Readable | null for stdin.
const stdinStream: import("node:stream").Readable | null = stdin
? PassThrough.from(stdin)
const stdinPayload: Buffer | null =
Buffer.isBuffer(stdin) ? stdin
: typeof stdin === "string" && stdin.length > 0 ? Buffer.from(stdin, "utf-8")
: null;
const stdinStream: PassThrough | null = stdinPayload ? new PassThrough() : null;
const effectiveCommand = stdinPayload
? ["/bin/sh", "-c", `head -c ${stdinPayload.length} | ${command.map(shQuoteArg).join(" ")}`]
: command;
let stdoutData = "";
let stderrData = "";
@@ -52,17 +65,27 @@ export async function execInPod(
return await new Promise<ExecInPodResult>(
(resolve, reject) => {
let ws: WebSocketLike | null = null;
let settled = false;
let pendingResult: Omit<ExecInPodResult, "stdout" | "stderr"> | null = null;
let stdoutEnded = false;
let stderrEnded = false;
const timeout =
typeof timeoutMs === "number" && timeoutMs > 0
? setTimeout(() => {
finishWithTransportFailure(`Kubernetes exec timed out after ${timeoutMs}ms`, true);
}, timeoutMs)
: null;
const finish = (result: ExecInPodResult) => {
if (settled) return;
settled = true;
if (timeout) clearTimeout(timeout);
try {
ws?.close();
} catch {
// Ignore best-effort close failures.
}
resolve(result);
};
const finishWithTransportFailure = (message: string, timedOut = false) => {
@@ -74,13 +97,30 @@ export async function execInPod(
stderr: `${stderrData}${separator}${message}`,
});
};
const tryFinish = () => {
if (settled || !pendingResult || !stdoutEnded || !stderrEnded) return;
finish({
...pendingResult,
stdout: stdoutData,
stderr: stderrData,
});
};
stdoutStream.on("end", () => {
stdoutEnded = true;
tryFinish();
});
stderrStream.on("end", () => {
stderrEnded = true;
tryFinish();
});
const websocketPromise = exec
.exec(
namespace,
podName,
containerName,
command,
effectiveCommand,
stdoutStream,
stderrStream,
stdinStream,
@@ -88,7 +128,8 @@ export async function execInPod(
(status) => {
// status.status is "Success" | "Failure"
if (status.status === "Success") {
finish({ exitCode: 0, timedOut: false, stdout: stdoutData, stderr: stderrData });
pendingResult = { exitCode: 0, timedOut: false };
tryFinish();
return;
}
// On failure, the exit code surfaces via
@@ -101,19 +142,25 @@ export async function execInPod(
const exitCode = exitCodeCause?.message
? Number(exitCodeCause.message)
: 1;
finish({ exitCode, timedOut: false, stdout: stdoutData, stderr: stderrData });
pendingResult = { exitCode, timedOut: false };
tryFinish();
},
);
websocketPromise
.then((ws) => {
.then((webSocket) => {
ws = webSocket as WebSocketLike;
if (stdinStream && stdinPayload) {
stdinStream.removeAllListeners("end");
stdinStream.end(stdinPayload);
}
ws.on("close", (code: number, reason: Buffer) => {
if (settled) return;
if (settled || pendingResult) return;
const reasonText = reason.length > 0 ? `: ${reason.toString("utf-8")}` : "";
finishWithTransportFailure(`Kubernetes exec websocket closed before status frame (${code})${reasonText}`);
});
ws.on("error", (err: Error) => {
if (settled) return;
if (settled || pendingResult) return;
finishWithTransportFailure(`Kubernetes exec websocket failed before status frame: ${err.message}`);
});
})
@@ -0,0 +1,111 @@
/**
* Fast-upload interceptor for the chunked-shell file transfer protocol used by
* `@paperclipai/adapter-utils` command-managed runtimes.
*
* The normal path writes files through many shell execs:
* 1. mkdir/rm/touch `<target>.paperclip-upload.b64`
* 2. append many base64 chunks with printf
* 3. base64-decode the temp file into the final target
*
* On Kubernetes each exec is a new WebSocket round trip. This state machine
* recognizes that exact protocol, buffers the base64 chunks in the plugin
* worker, and lets the caller flush the final payload through one exec.
* Pattern drift or missing state falls through to the original exec path.
*/
import { posix as pathPosix } from "node:path";
const INIT_RE =
/^mkdir -p '([^']+)' && rm -f '([^']+)\.paperclip-upload\.b64' && : > '\2\.paperclip-upload\.b64'$/;
const CHUNK_RE =
/^printf '%s' '([A-Za-z0-9+/=]+)' >> '([^']+)\.paperclip-upload\.b64'$/;
const FINALIZE_RE =
/^base64 -d < '([^']+)\.paperclip-upload\.b64' > '\1' && rm -f '\1\.paperclip-upload\.b64'$/;
const MAX_BUFFER_BYTES = 100 * 1024 * 1024;
export interface FastUploadFlush {
targetPath: string;
payload: Buffer;
}
export type FastUploadDecision =
| { action: "ack"; reason: string }
| { action: "flush"; flush: FastUploadFlush }
| { action: "passthrough"; reason: string };
interface BufferedUpload {
targetPath: string;
chunks: string[];
totalBase64Chars: number;
}
export class FastUploadInterceptor {
private readonly buffers = new Map<string, BufferedUpload>();
decide(command: string): FastUploadDecision {
const initMatch = INIT_RE.exec(command);
if (initMatch) {
const dir = initMatch[1];
const targetPath = initMatch[2];
if (pathPosix.dirname(targetPath) !== dir) {
return { action: "passthrough", reason: "init dir/target mismatch" };
}
this.buffers.set(`${targetPath}.paperclip-upload.b64`, {
targetPath,
chunks: [],
totalBase64Chars: 0,
});
return { action: "ack", reason: `init upload to ${targetPath}` };
}
const chunkMatch = CHUNK_RE.exec(command);
if (chunkMatch) {
const base64Chunk = chunkMatch[1];
const targetPath = chunkMatch[2];
const tempPath = `${targetPath}.paperclip-upload.b64`;
const upload = this.buffers.get(tempPath);
if (!upload) {
return { action: "passthrough", reason: "chunk without prior init" };
}
if (upload.totalBase64Chars + base64Chunk.length > (MAX_BUFFER_BYTES * 4) / 3) {
this.buffers.delete(tempPath);
return { action: "passthrough", reason: "buffer cap exceeded" };
}
upload.chunks.push(base64Chunk);
upload.totalBase64Chars += base64Chunk.length;
return { action: "ack", reason: `buffered ${base64Chunk.length} base64 chars` };
}
const finalizeMatch = FINALIZE_RE.exec(command);
if (finalizeMatch) {
const targetPath = finalizeMatch[1];
const tempPath = `${targetPath}.paperclip-upload.b64`;
const upload = this.buffers.get(tempPath);
if (!upload) {
return { action: "passthrough", reason: "finalize without buffered state" };
}
this.buffers.delete(tempPath);
return {
action: "flush",
flush: {
targetPath: upload.targetPath,
payload: Buffer.from(upload.chunks.join(""), "base64"),
},
};
}
return { action: "passthrough", reason: "no upload pattern" };
}
reset(): void {
this.buffers.clear();
}
get pendingCount(): number {
return this.buffers.size;
}
}
@@ -1,5 +1,6 @@
import { describe, it, expect } from "vitest";
import plugin, {
buildSandboxExecCommand,
buildSandboxExecShellCommand,
extractAdapterEnvFromProcess,
} from "../../src/plugin.js";
@@ -161,4 +162,21 @@ describe("plugin", () => {
}),
).toBe("pnpm test -- --runInBand");
});
it("passes command and args directly to Kubernetes exec", () => {
expect(
buildSandboxExecCommand({
command: "sh",
args: ["-c", "printf '%s' ok"],
}),
).toEqual(["sh", "-c", "printf '%s' ok"]);
});
it("wraps command-only execution in a login shell", () => {
expect(
buildSandboxExecCommand({
command: "pnpm test -- --runInBand",
}),
).toEqual(["/bin/sh", "-lc", "pnpm test -- --runInBand"]);
});
});
@@ -17,6 +17,8 @@ describe("execInPod", () => {
it("returns success when the Kubernetes exec status callback reports success", async () => {
execMock.mockImplementation((_namespace, _pod, _container, _command, stdout, _stderr, _stdin, _tty, statusCallback) => {
stdout.write("ok\n");
stdout.end();
_stderr.end();
statusCallback({ status: "Success" });
return Promise.resolve(new EventEmitter());
});
@@ -49,4 +51,31 @@ describe("execInPod", () => {
expect(result.timedOut).toBe(true);
expect(result.stderr).toContain("Kubernetes exec timed out after 5ms");
});
it("wraps stdin commands with a byte-counted head prefix", async () => {
let observedCommand: string[] | undefined;
let observedStdin = "";
let observedStdinFinished = false;
execMock.mockImplementation((_namespace, _pod, _container, command, stdout, stderr, stdin, _tty, statusCallback) => {
observedCommand = command;
stdin?.on("data", (chunk: Buffer) => {
observedStdin += chunk.toString("utf8");
});
stdin?.on("finish", () => {
observedStdinFinished = true;
});
stdout.end();
stderr.end();
statusCallback({ status: "Success" });
return Promise.resolve(new EventEmitter());
});
await execInPod({} as never, "ns", "pod-1", "agent", ["base64", "-d"], "abc");
await Promise.resolve();
expect(observedCommand).toEqual(["/bin/sh", "-c", "head -c 3 | 'base64' '-d'"]);
expect(observedStdin).toBe("abc");
expect(observedStdinFinished).toBe(true);
});
});
@@ -0,0 +1,72 @@
import { describe, expect, it } from "vitest";
import { FastUploadInterceptor } from "../../src/upload-interceptor.js";
describe("FastUploadInterceptor", () => {
it("collapses the adapter-utils chunked upload protocol into one flush", () => {
const interceptor = new FastUploadInterceptor();
const target = "/workspace/.paperclip-runtime/skills.tar";
const chunkA = Buffer.from("hello ").toString("base64").slice(0, 4);
const chunkB = Buffer.from("hello ").toString("base64").slice(4) + Buffer.from("world").toString("base64");
expect(
interceptor.decide(
`mkdir -p '/workspace/.paperclip-runtime' && rm -f '${target}.paperclip-upload.b64' && : > '${target}.paperclip-upload.b64'`,
),
).toMatchObject({ action: "ack" });
expect(interceptor.pendingCount).toBe(1);
expect(
interceptor.decide(`printf '%s' '${chunkA}' >> '${target}.paperclip-upload.b64'`),
).toMatchObject({ action: "ack" });
expect(
interceptor.decide(`printf '%s' '${chunkB}' >> '${target}.paperclip-upload.b64'`),
).toMatchObject({ action: "ack" });
const decision = interceptor.decide(
`base64 -d < '${target}.paperclip-upload.b64' > '${target}' && rm -f '${target}.paperclip-upload.b64'`,
);
expect(decision.action).toBe("flush");
if (decision.action !== "flush") throw new Error("expected flush");
expect(decision.flush.targetPath).toBe(target);
expect(decision.flush.payload.toString("utf8")).toBe("hello world");
expect(interceptor.pendingCount).toBe(0);
});
it("passes through chunks and finalizers without a matching init", () => {
const interceptor = new FastUploadInterceptor();
const target = "/workspace/file.bin";
expect(
interceptor.decide(`printf '%s' 'aGVsbG8=' >> '${target}.paperclip-upload.b64'`),
).toMatchObject({ action: "passthrough", reason: "chunk without prior init" });
expect(
interceptor.decide(
`base64 -d < '${target}.paperclip-upload.b64' > '${target}' && rm -f '${target}.paperclip-upload.b64'`,
),
).toMatchObject({ action: "passthrough", reason: "finalize without buffered state" });
});
it("falls through when the init command does not match the target parent directory", () => {
const interceptor = new FastUploadInterceptor();
expect(
interceptor.decide(
"mkdir -p '/tmp' && rm -f '/workspace/file.bin.paperclip-upload.b64' && : > '/workspace/file.bin.paperclip-upload.b64'",
),
).toMatchObject({ action: "passthrough", reason: "init dir/target mismatch" });
expect(interceptor.pendingCount).toBe(0);
});
it("clears buffered uploads on reset", () => {
const interceptor = new FastUploadInterceptor();
const target = "/workspace/file.bin";
interceptor.decide(
`mkdir -p '/workspace' && rm -f '${target}.paperclip-upload.b64' && : > '${target}.paperclip-upload.b64'`,
);
expect(interceptor.pendingCount).toBe(1);
interceptor.reset();
expect(interceptor.pendingCount).toBe(0);
});
});