fix(plugin): harden kubernetes fast upload edges

This commit is contained in:
Dotta
2026-05-13 13:51:12 -05:00
committed by Chris Farhood
parent 713fb6eb4e
commit 40e8638aa3
4 changed files with 46 additions and 1 deletions
@@ -102,6 +102,10 @@ export async function execInPod(
stderr: stderrData, stderr: stderrData,
}); });
}; };
const endOutputStreams = () => {
if (!stdoutStream.writableEnded) stdoutStream.end();
if (!stderrStream.writableEnded) stderrStream.end();
};
stdoutStream.on("end", () => { stdoutStream.on("end", () => {
stdoutEnded = true; stdoutEnded = true;
@@ -126,6 +130,7 @@ export async function execInPod(
// status.status is "Success" | "Failure" // status.status is "Success" | "Failure"
if (status.status === "Success") { if (status.status === "Success") {
pendingResult = { exitCode: 0, timedOut: false }; pendingResult = { exitCode: 0, timedOut: false };
endOutputStreams();
tryFinish(); tryFinish();
return; return;
} }
@@ -140,6 +145,7 @@ export async function execInPod(
? Number(exitCodeCause.message) ? Number(exitCodeCause.message)
: 1; : 1;
pendingResult = { exitCode, timedOut: false }; pendingResult = { exitCode, timedOut: false };
endOutputStreams();
tryFinish(); tryFinish();
}, },
); );
@@ -54,7 +54,16 @@ export class FastUploadInterceptor {
return { action: "passthrough", reason: "init dir/target mismatch" }; return { action: "passthrough", reason: "init dir/target mismatch" };
} }
this.buffers.set(`${targetPath}.paperclip-upload.b64`, { const tempPath = `${targetPath}.paperclip-upload.b64`;
if (this.buffers.has(tempPath)) {
this.buffers.delete(tempPath);
return {
action: "error",
message: `Fast upload already in progress for ${targetPath}; retry the upload from the beginning.`,
};
}
this.buffers.set(tempPath, {
targetPath, targetPath,
chunks: [], chunks: [],
totalBase64Chars: 0, totalBase64Chars: 0,
@@ -27,6 +27,17 @@ describe("execInPod", () => {
expect(result).toEqual({ exitCode: 0, timedOut: false, stdout: "ok\n", stderr: "" }); expect(result).toEqual({ exitCode: 0, timedOut: false, stdout: "ok\n", stderr: "" });
}); });
it("finishes when Kubernetes reports status without ending output streams", async () => {
execMock.mockImplementation((_namespace, _pod, _container, _command, stdout, _stderr, _stdin, _tty, statusCallback) => {
stdout.write("ok\n");
statusCallback({ status: "Success" });
return Promise.resolve(new EventEmitter());
});
const result = await execInPod({} as never, "ns", "pod-1", "agent", ["echo", "ok"]);
expect(result).toEqual({ exitCode: 0, timedOut: false, stdout: "ok\n", stderr: "" });
});
it("returns an execution failure if the websocket closes before a status frame", async () => { it("returns an execution failure if the websocket closes before a status frame", async () => {
const ws = new EventEmitter(); const ws = new EventEmitter();
execMock.mockResolvedValue(ws); execMock.mockResolvedValue(ws);
@@ -75,6 +75,25 @@ describe("FastUploadInterceptor", () => {
expect(interceptor.pendingCount).toBe(0); expect(interceptor.pendingCount).toBe(0);
}); });
it("fails fast when init repeats for an in-progress upload", () => {
const interceptor = new FastUploadInterceptor();
const target = "/workspace/file.bin";
const initCommand =
`mkdir -p '/workspace' && rm -f '${target}.paperclip-upload.b64' && : > '${target}.paperclip-upload.b64'`;
expect(interceptor.decide(initCommand)).toMatchObject({ action: "ack" });
expect(
interceptor.decide(`printf '%s' 'aGVsbG8=' >> '${target}.paperclip-upload.b64'`),
).toMatchObject({ action: "ack" });
const decision = interceptor.decide(initCommand);
expect(decision).toMatchObject({
action: "error",
message: expect.stringContaining("Fast upload already in progress"),
});
expect(interceptor.pendingCount).toBe(0);
});
it("clears buffered uploads on reset", () => { it("clears buffered uploads on reset", () => {
const interceptor = new FastUploadInterceptor(); const interceptor = new FastUploadInterceptor();
const target = "/workspace/file.bin"; const target = "/workspace/file.bin";