From 40e8638aa330d1f51b01b915178e1fcbbe47e005 Mon Sep 17 00:00:00 2001 From: Dotta Date: Wed, 13 May 2026 13:51:12 -0500 Subject: [PATCH] fix(plugin): harden kubernetes fast upload edges --- .../kubernetes/src/pod-exec.ts | 6 ++++++ .../kubernetes/src/upload-interceptor.ts | 11 ++++++++++- .../kubernetes/test/unit/pod-exec.test.ts | 11 +++++++++++ .../test/unit/upload-interceptor.test.ts | 19 +++++++++++++++++++ 4 files changed, 46 insertions(+), 1 deletion(-) diff --git a/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts b/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts index 092bd491..4a077e73 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts @@ -102,6 +102,10 @@ export async function execInPod( stderr: stderrData, }); }; + const endOutputStreams = () => { + if (!stdoutStream.writableEnded) stdoutStream.end(); + if (!stderrStream.writableEnded) stderrStream.end(); + }; stdoutStream.on("end", () => { stdoutEnded = true; @@ -126,6 +130,7 @@ export async function execInPod( // status.status is "Success" | "Failure" if (status.status === "Success") { pendingResult = { exitCode: 0, timedOut: false }; + endOutputStreams(); tryFinish(); return; } @@ -140,6 +145,7 @@ export async function execInPod( ? Number(exitCodeCause.message) : 1; pendingResult = { exitCode, timedOut: false }; + endOutputStreams(); tryFinish(); }, ); diff --git a/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts b/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts index 45dca39f..28db96e4 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/upload-interceptor.ts @@ -54,7 +54,16 @@ export class FastUploadInterceptor { return { action: "passthrough", reason: "init dir/target mismatch" }; } - this.buffers.set(`${targetPath}.paperclip-upload.b64`, { + const tempPath = `${targetPath}.paperclip-upload.b64`; + if (this.buffers.has(tempPath)) { + this.buffers.delete(tempPath); + return { + action: "error", + message: `Fast upload already in progress for ${targetPath}; retry the upload from the beginning.`, + }; + } + + this.buffers.set(tempPath, { targetPath, chunks: [], totalBase64Chars: 0, diff --git a/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts b/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts index 0730ca2d..2a87ac5c 100644 --- a/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts +++ b/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts @@ -27,6 +27,17 @@ describe("execInPod", () => { expect(result).toEqual({ exitCode: 0, timedOut: false, stdout: "ok\n", stderr: "" }); }); + it("finishes when Kubernetes reports status without ending output streams", async () => { + execMock.mockImplementation((_namespace, _pod, _container, _command, stdout, _stderr, _stdin, _tty, statusCallback) => { + stdout.write("ok\n"); + statusCallback({ status: "Success" }); + return Promise.resolve(new EventEmitter()); + }); + + const result = await execInPod({} as never, "ns", "pod-1", "agent", ["echo", "ok"]); + expect(result).toEqual({ exitCode: 0, timedOut: false, stdout: "ok\n", stderr: "" }); + }); + it("returns an execution failure if the websocket closes before a status frame", async () => { const ws = new EventEmitter(); execMock.mockResolvedValue(ws); diff --git a/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts b/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts index 6f31a980..9b91ff40 100644 --- a/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts +++ b/packages/plugins/sandbox-providers/kubernetes/test/unit/upload-interceptor.test.ts @@ -75,6 +75,25 @@ describe("FastUploadInterceptor", () => { expect(interceptor.pendingCount).toBe(0); }); + it("fails fast when init repeats for an in-progress upload", () => { + const interceptor = new FastUploadInterceptor(); + const target = "/workspace/file.bin"; + const initCommand = + `mkdir -p '/workspace' && rm -f '${target}.paperclip-upload.b64' && : > '${target}.paperclip-upload.b64'`; + + expect(interceptor.decide(initCommand)).toMatchObject({ action: "ack" }); + expect( + interceptor.decide(`printf '%s' 'aGVsbG8=' >> '${target}.paperclip-upload.b64'`), + ).toMatchObject({ action: "ack" }); + + const decision = interceptor.decide(initCommand); + expect(decision).toMatchObject({ + action: "error", + message: expect.stringContaining("Fast upload already in progress"), + }); + expect(interceptor.pendingCount).toBe(0); + }); + it("clears buffered uploads on reset", () => { const interceptor = new FastUploadInterceptor(); const target = "/workspace/file.bin";