forked from farhoodlabs/paperclip
fix(plugin): address kubernetes fast upload review
This commit is contained in:
@@ -108,7 +108,7 @@ Secret pc-{ulid}-env (owned by Sandbox CR; cascade
|
||||
|
||||
The `sandbox-cr` backend recognizes the chunked base64 upload protocol emitted by `@paperclipai/adapter-utils` for workspace, skill, and config-seed file transfers. Instead of running one Kubernetes exec per base64 chunk, the plugin buffers the upload in worker memory and flushes the final payload through a single `head -c <bytes> | base64 -d` exec with stdin.
|
||||
|
||||
The interceptor is intentionally narrow: only the exact `mkdir`/`printf`/`base64 -d` command shape generated by adapter-utils is optimized. Unknown commands, missing init state, or uploads over the 100 MB buffer cap fall back to normal exec behavior.
|
||||
The interceptor is intentionally narrow: only the exact `mkdir`/`printf`/`base64 -d` command shape generated by adapter-utils is optimized. Unknown commands and missing init state fall back to normal exec behavior. Uploads over the 100 MB buffer cap fail fast instead of falling back, because earlier chunks were already acknowledged without being written to the pod.
|
||||
|
||||
For each agent run (job backend):
|
||||
|
||||
|
||||
@@ -559,7 +559,7 @@ const plugin = definePlugin({
|
||||
const dir = slashIndex > 0 ? decision.flush.targetPath.slice(0, slashIndex) : ".";
|
||||
const script =
|
||||
`mkdir -p ${shellQuoteArg(dir)} && ` +
|
||||
`head -c ${base64Body.length} | base64 -d > ${shellQuoteArg(decision.flush.targetPath)}`;
|
||||
`base64 -d > ${shellQuoteArg(decision.flush.targetPath)}`;
|
||||
const flushResult = await execInPod(
|
||||
kc,
|
||||
namespace,
|
||||
@@ -585,6 +585,22 @@ const plugin = definePlugin({
|
||||
},
|
||||
};
|
||||
}
|
||||
if (decision.action === "error") {
|
||||
return {
|
||||
exitCode: 1,
|
||||
timedOut: false,
|
||||
stdout: "",
|
||||
stderr: decision.message,
|
||||
metadata: {
|
||||
provider: "kubernetes",
|
||||
backend: "sandbox-cr",
|
||||
namespace,
|
||||
sandboxName: lease.providerLeaseId,
|
||||
podName,
|
||||
fastUpload: "error",
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const execCommand = buildSandboxExecCommand(params);
|
||||
|
||||
@@ -151,7 +151,6 @@ export async function execInPod(
|
||||
.then((webSocket) => {
|
||||
ws = webSocket as WebSocketLike;
|
||||
if (stdinStream && stdinPayload) {
|
||||
stdinStream.removeAllListeners("end");
|
||||
stdinStream.end(stdinPayload);
|
||||
}
|
||||
ws.on("close", (code: number, reason: Buffer) => {
|
||||
|
||||
@@ -31,6 +31,7 @@ export interface FastUploadFlush {
|
||||
export type FastUploadDecision =
|
||||
| { action: "ack"; reason: string }
|
||||
| { action: "flush"; flush: FastUploadFlush }
|
||||
| { action: "error"; message: string }
|
||||
| { action: "passthrough"; reason: string };
|
||||
|
||||
interface BufferedUpload {
|
||||
@@ -42,6 +43,8 @@ interface BufferedUpload {
|
||||
export class FastUploadInterceptor {
|
||||
private readonly buffers = new Map<string, BufferedUpload>();
|
||||
|
||||
constructor(private readonly maxBufferBytes = MAX_BUFFER_BYTES) {}
|
||||
|
||||
decide(command: string): FastUploadDecision {
|
||||
const initMatch = INIT_RE.exec(command);
|
||||
if (initMatch) {
|
||||
@@ -69,9 +72,12 @@ export class FastUploadInterceptor {
|
||||
return { action: "passthrough", reason: "chunk without prior init" };
|
||||
}
|
||||
|
||||
if (upload.totalBase64Chars + base64Chunk.length > (MAX_BUFFER_BYTES * 4) / 3) {
|
||||
if (upload.totalBase64Chars + base64Chunk.length > (this.maxBufferBytes * 4) / 3) {
|
||||
this.buffers.delete(tempPath);
|
||||
return { action: "passthrough", reason: "buffer cap exceeded" };
|
||||
return {
|
||||
action: "error",
|
||||
message: `Fast upload buffer cap exceeded for ${upload.targetPath}; retry the upload with a smaller payload.`,
|
||||
};
|
||||
}
|
||||
|
||||
upload.chunks.push(base64Chunk);
|
||||
|
||||
@@ -57,6 +57,24 @@ describe("FastUploadInterceptor", () => {
|
||||
expect(interceptor.pendingCount).toBe(0);
|
||||
});
|
||||
|
||||
it("fails fast instead of falling through after acknowledged chunks exceed the buffer cap", () => {
|
||||
const interceptor = new FastUploadInterceptor(1);
|
||||
const target = "/workspace/file.bin";
|
||||
|
||||
expect(
|
||||
interceptor.decide(
|
||||
`mkdir -p '/workspace' && rm -f '${target}.paperclip-upload.b64' && : > '${target}.paperclip-upload.b64'`,
|
||||
),
|
||||
).toMatchObject({ action: "ack" });
|
||||
|
||||
const decision = interceptor.decide(`printf '%s' 'AAAA' >> '${target}.paperclip-upload.b64'`);
|
||||
expect(decision).toMatchObject({
|
||||
action: "error",
|
||||
message: expect.stringContaining("Fast upload buffer cap exceeded"),
|
||||
});
|
||||
expect(interceptor.pendingCount).toBe(0);
|
||||
});
|
||||
|
||||
it("clears buffered uploads on reset", () => {
|
||||
const interceptor = new FastUploadInterceptor();
|
||||
const target = "/workspace/file.bin";
|
||||
|
||||
Reference in New Issue
Block a user