diff --git a/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts b/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts index a529e961..750d2814 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/plugin.ts @@ -182,6 +182,10 @@ const plugin = definePlugin({ const adapterDefaultsForWarn = getAdapterDefaults(config.adapterType); const totalFqdnsForWarn = [...adapterDefaultsForWarn.allowFqdns, ...config.egressAllowFqdns]; if (config.egressMode === "standard" && totalFqdnsForWarn.length > 0) { + // The SDK does not currently thread ctx.logger into environment hooks. + // Keep this explicit so operators still see the standard-mode egress + // trade-off in raw worker logs. + // eslint-disable-next-line no-console console.warn( `[plugin-kubernetes] egressMode=standard cannot enforce FQDN-based egress rules for ${totalFqdnsForWarn.join(", ")}. Agent pods will get public IPv4 HTTPS egress with private/link-local ranges excluded. Switch egressMode to "cilium" for exact FQDN enforcement.`, ); diff --git a/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts b/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts index b036780d..a8c49de2 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/pod-exec.ts @@ -44,7 +44,22 @@ export async function execInPod( return await new Promise<{ exitCode: number; stdout: string; stderr: string }>( (resolve, reject) => { - exec + let settled = false; + const finish = (result: { exitCode: number; stdout: string; stderr: string }) => { + if (settled) return; + settled = true; + resolve(result); + }; + const finishWithTransportFailure = (message: string) => { + const separator = stderrData.length > 0 && !stderrData.endsWith("\n") ? "\n" : ""; + finish({ + exitCode: 1, + stdout: stdoutData, + stderr: `${stderrData}${separator}${message}`, + }); + }; + + const websocketPromise = exec .exec( namespace, podName, @@ -57,7 +72,7 @@ export async function execInPod( (status) => { // status.status is "Success" | "Failure" if (status.status === "Success") { - resolve({ exitCode: 0, stdout: stdoutData, stderr: stderrData }); + finish({ exitCode: 0, stdout: stdoutData, stderr: stderrData }); return; } // On failure, the exit code surfaces via @@ -70,10 +85,26 @@ export async function execInPod( const exitCode = exitCodeCause?.message ? Number(exitCodeCause.message) : 1; - resolve({ exitCode, stdout: stdoutData, stderr: stderrData }); + finish({ exitCode, stdout: stdoutData, stderr: stderrData }); }, - ) - .catch(reject); + ); + + websocketPromise + .then((ws) => { + ws.on("close", (code: number, reason: Buffer) => { + if (settled) return; + const reasonText = reason.length > 0 ? `: ${reason.toString("utf-8")}` : ""; + finishWithTransportFailure(`Kubernetes exec websocket closed before status frame (${code})${reasonText}`); + }); + ws.on("error", (err: Error) => { + if (settled) return; + finishWithTransportFailure(`Kubernetes exec websocket failed before status frame: ${err.message}`); + }); + }) + .catch((err) => { + if (settled) return; + reject(err); + }); }, ); } diff --git a/packages/plugins/sandbox-providers/kubernetes/src/tenant-orchestrator.ts b/packages/plugins/sandbox-providers/kubernetes/src/tenant-orchestrator.ts index e69f522e..072d0ed2 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/tenant-orchestrator.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/tenant-orchestrator.ts @@ -47,22 +47,26 @@ async function ensureNamespace(clients: KubeClients, input: EnsureTenantInput): } catch (err) { if (!isNotFound(err)) throw err; } - await clients.core.createNamespace({ - body: { - apiVersion: "v1", - kind: "Namespace", - metadata: { - name: input.namespace, - labels: { - "paperclip.io/company-id": input.companyId, - "paperclip.io/managed-by": "paperclip-k8s-plugin", - "pod-security.kubernetes.io/enforce": "restricted", - "pod-security.kubernetes.io/audit": "restricted", - "pod-security.kubernetes.io/warn": "restricted", + try { + await clients.core.createNamespace({ + body: { + apiVersion: "v1", + kind: "Namespace", + metadata: { + name: input.namespace, + labels: { + "paperclip.io/company-id": input.companyId, + "paperclip.io/managed-by": "paperclip-k8s-plugin", + "pod-security.kubernetes.io/enforce": "restricted", + "pod-security.kubernetes.io/audit": "restricted", + "pod-security.kubernetes.io/warn": "restricted", + }, }, }, - }, - }); + }); + } catch (err) { + if (!isAlreadyExists(err)) throw err; + } } async function ensureServiceAccount(clients: KubeClients, input: EnsureTenantInput): Promise { @@ -87,7 +91,17 @@ async function ensureServiceAccount(clients: KubeClients, input: EnsureTenantInp } catch (err) { if (!isNotFound(err)) throw err; } - await clients.core.createNamespacedServiceAccount({ namespace: input.namespace, body: manifest }); + try { + await clients.core.createNamespacedServiceAccount({ namespace: input.namespace, body: manifest }); + } catch (err) { + if (!isAlreadyExists(err)) throw err; + const existing = await clients.core.readNamespacedServiceAccount({ name: SERVICE_ACCOUNT_NAME, namespace: input.namespace }); + await clients.core.replaceNamespacedServiceAccount({ + name: SERVICE_ACCOUNT_NAME, + namespace: input.namespace, + body: withResourceVersion(manifest, existing) as never, + }); + } } async function ensureRole(clients: KubeClients, input: EnsureTenantInput): Promise { @@ -110,7 +124,17 @@ async function ensureRole(clients: KubeClients, input: EnsureTenantInput): Promi } catch (err) { if (!isNotFound(err)) throw err; } - await clients.rbac.createNamespacedRole({ namespace: input.namespace, body: manifest }); + try { + await clients.rbac.createNamespacedRole({ namespace: input.namespace, body: manifest }); + } catch (err) { + if (!isAlreadyExists(err)) throw err; + const existing = await clients.rbac.readNamespacedRole({ name: ROLE_NAME, namespace: input.namespace }); + await clients.rbac.replaceNamespacedRole({ + name: ROLE_NAME, + namespace: input.namespace, + body: withResourceVersion(manifest, existing) as never, + }); + } } async function ensureRoleBinding(clients: KubeClients, input: EnsureTenantInput): Promise { @@ -132,7 +156,17 @@ async function ensureRoleBinding(clients: KubeClients, input: EnsureTenantInput) } catch (err) { if (!isNotFound(err)) throw err; } - await clients.rbac.createNamespacedRoleBinding({ namespace: input.namespace, body: manifest }); + try { + await clients.rbac.createNamespacedRoleBinding({ namespace: input.namespace, body: manifest }); + } catch (err) { + if (!isAlreadyExists(err)) throw err; + const existing = await clients.rbac.readNamespacedRoleBinding({ name: ROLE_BINDING_NAME, namespace: input.namespace }); + await clients.rbac.replaceNamespacedRoleBinding({ + name: ROLE_BINDING_NAME, + namespace: input.namespace, + body: withResourceVersion(manifest, existing) as never, + }); + } } async function ensureResourceQuota(clients: KubeClients, input: EnsureTenantInput): Promise { @@ -161,7 +195,17 @@ async function ensureResourceQuota(clients: KubeClients, input: EnsureTenantInpu } catch (err) { if (!isNotFound(err)) throw err; } - await clients.core.createNamespacedResourceQuota({ namespace: input.namespace, body: manifest }); + try { + await clients.core.createNamespacedResourceQuota({ namespace: input.namespace, body: manifest }); + } catch (err) { + if (!isAlreadyExists(err)) throw err; + const existing = await clients.core.readNamespacedResourceQuota({ name: RESOURCE_QUOTA_NAME, namespace: input.namespace }); + await clients.core.replaceNamespacedResourceQuota({ + name: RESOURCE_QUOTA_NAME, + namespace: input.namespace, + body: withResourceVersion(manifest, existing) as never, + }); + } } async function ensureLimitRange(clients: KubeClients, input: EnsureTenantInput): Promise { @@ -195,10 +239,20 @@ async function ensureLimitRange(clients: KubeClients, input: EnsureTenantInput): } catch (err) { if (!isNotFound(err)) throw err; } - await clients.core.createNamespacedLimitRange({ - namespace: input.namespace, - body: manifest as never, - }); + try { + await clients.core.createNamespacedLimitRange({ + namespace: input.namespace, + body: manifest as never, + }); + } catch (err) { + if (!isAlreadyExists(err)) throw err; + const existing = await clients.core.readNamespacedLimitRange({ name: LIMIT_RANGE_NAME, namespace: input.namespace }); + await clients.core.replaceNamespacedLimitRange({ + name: LIMIT_RANGE_NAME, + namespace: input.namespace, + body: withResourceVersion(manifest, existing) as never, + }); + } } async function ensureNetworkPolicies(clients: KubeClients, input: EnsureTenantInput): Promise { @@ -243,7 +297,17 @@ async function ensureNetworkPolicy( } catch (err) { if (!isNotFound(err)) throw err; } - await clients.networking.createNamespacedNetworkPolicy({ namespace, body: manifest as never }); + try { + await clients.networking.createNamespacedNetworkPolicy({ namespace, body: manifest as never }); + } catch (err) { + if (!isAlreadyExists(err)) throw err; + const existing = await clients.networking.readNamespacedNetworkPolicy({ name, namespace }); + await clients.networking.replaceNamespacedNetworkPolicy({ + name, + namespace, + body: withResourceVersion(manifest, existing) as never, + }); + } } async function ensureCiliumNetworkPolicy( @@ -272,13 +336,32 @@ async function ensureCiliumNetworkPolicy( } catch (err) { if (!isNotFound(err)) throw err; } - await clients.custom.createNamespacedCustomObject({ - group: "cilium.io", - version: "v2", - namespace, - plural: "ciliumnetworkpolicies", - body: manifest, - }); + try { + await clients.custom.createNamespacedCustomObject({ + group: "cilium.io", + version: "v2", + namespace, + plural: "ciliumnetworkpolicies", + body: manifest, + }); + } catch (err) { + if (!isAlreadyExists(err)) throw err; + const existing = await clients.custom.getNamespacedCustomObject({ + group: "cilium.io", + version: "v2", + namespace, + plural: "ciliumnetworkpolicies", + name, + }); + await clients.custom.replaceNamespacedCustomObject({ + group: "cilium.io", + version: "v2", + namespace, + plural: "ciliumnetworkpolicies", + name, + body: withResourceVersion(manifest, existing), + }); + } } async function deleteNetworkPolicyIfExists(clients: KubeClients, namespace: string, name: string): Promise { @@ -320,3 +403,9 @@ function isNotFound(err: unknown): boolean { const e = err as { code?: number; statusCode?: number }; return e.code === 404 || e.statusCode === 404; } + +function isAlreadyExists(err: unknown): boolean { + if (typeof err !== "object" || err === null) return false; + const e = err as { code?: number; statusCode?: number }; + return e.code === 409 || e.statusCode === 409; +} diff --git a/packages/plugins/sandbox-providers/kubernetes/src/utils.ts b/packages/plugins/sandbox-providers/kubernetes/src/utils.ts index 07e80ece..63ca8b28 100644 --- a/packages/plugins/sandbox-providers/kubernetes/src/utils.ts +++ b/packages/plugins/sandbox-providers/kubernetes/src/utils.ts @@ -1,3 +1,5 @@ +import { randomBytes } from "node:crypto"; + const ULID_ALPHABET = "0123456789abcdefghjkmnpqrstvwxyz"; export function deriveCompanySlug(input: string): string { @@ -22,8 +24,9 @@ export function newRunUlidDns(now: () => number = Date.now): string { out = ULID_ALPHABET[t & 0x1f] + out; t = Math.floor(t / 32); } + const randBytes = randomBytes(16); for (let i = 0; i < 16; i++) { - out += ULID_ALPHABET[Math.floor(Math.random() * 32)]; + out += ULID_ALPHABET[randBytes[i] & 0x1f]; } return out; } diff --git a/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts b/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts new file mode 100644 index 00000000..0ecd5720 --- /dev/null +++ b/packages/plugins/sandbox-providers/kubernetes/test/unit/pod-exec.test.ts @@ -0,0 +1,41 @@ +import { EventEmitter } from "node:events"; +import { describe, it, expect, vi, beforeEach } from "vitest"; + +const execMock = vi.fn(); + +vi.mock("@kubernetes/client-node", () => ({ + Exec: vi.fn().mockImplementation(() => ({ exec: execMock })), +})); + +const { execInPod } = await import("../../src/pod-exec.js"); + +describe("execInPod", () => { + beforeEach(() => { + execMock.mockReset(); + }); + + it("returns success when the Kubernetes exec status callback reports success", async () => { + execMock.mockImplementation((_namespace, _pod, _container, _command, stdout, _stderr, _stdin, _tty, statusCallback) => { + stdout.write("ok\n"); + statusCallback({ status: "Success" }); + return Promise.resolve(new EventEmitter()); + }); + + const result = await execInPod({} as never, "ns", "pod-1", "agent", ["echo", "ok"]); + expect(result).toEqual({ exitCode: 0, stdout: "ok\n", stderr: "" }); + }); + + it("returns an execution failure if the websocket closes before a status frame", async () => { + const ws = new EventEmitter(); + execMock.mockResolvedValue(ws); + + const resultPromise = execInPod({} as never, "ns", "pod-1", "agent", ["sleep", "1"]); + await Promise.resolve(); + ws.emit("close", 1006, Buffer.from("connection lost")); + + await expect(resultPromise).resolves.toMatchObject({ + exitCode: 1, + stderr: expect.stringContaining("websocket closed before status frame"), + }); + }); +}); diff --git a/packages/plugins/sandbox-providers/kubernetes/test/unit/tenant-orchestrator.test.ts b/packages/plugins/sandbox-providers/kubernetes/test/unit/tenant-orchestrator.test.ts index d61b278e..6b412c76 100644 --- a/packages/plugins/sandbox-providers/kubernetes/test/unit/tenant-orchestrator.test.ts +++ b/packages/plugins/sandbox-providers/kubernetes/test/unit/tenant-orchestrator.test.ts @@ -150,4 +150,25 @@ describe("ensureTenant", () => { name: "paperclip-egress-allow", }); }); + + it("handles concurrent first-run create conflicts by rereading and replacing managed resources", async () => { + const clients = makeMockClients(); + const existing = { metadata: { resourceVersion: "rv-race" } }; + clients.core.createNamespace.mockRejectedValueOnce({ code: 409 }); + clients.core.readNamespacedServiceAccount + .mockRejectedValueOnce({ code: 404 }) + .mockResolvedValue(existing); + clients.core.createNamespacedServiceAccount.mockRejectedValueOnce({ code: 409 }); + + await ensureTenant(clients as never, baseInput); + + expect(clients.core.createNamespace).toHaveBeenCalled(); + expect(clients.core.replaceNamespacedServiceAccount).toHaveBeenCalledWith( + expect.objectContaining({ + body: expect.objectContaining({ + metadata: expect.objectContaining({ resourceVersion: "rv-race" }), + }), + }), + ); + }); }); diff --git a/packages/plugins/sandbox-providers/kubernetes/test/unit/utils.test.ts b/packages/plugins/sandbox-providers/kubernetes/test/unit/utils.test.ts index 8ead8684..fd844a06 100644 --- a/packages/plugins/sandbox-providers/kubernetes/test/unit/utils.test.ts +++ b/packages/plugins/sandbox-providers/kubernetes/test/unit/utils.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect } from "vitest"; +import { describe, it, expect, vi } from "vitest"; import { deriveCompanySlug, deriveNamespaceName, newRunUlidDns, paperclipLabels } from "../../src/utils.js"; describe("deriveCompanySlug", () => { @@ -28,6 +28,13 @@ describe("newRunUlidDns", () => { const id = newRunUlidDns(); expect(id).toMatch(/^[a-z0-9]{26}$/); }); + + it("does not use Math.random for the random suffix", () => { + const spy = vi.spyOn(Math, "random"); + newRunUlidDns(() => 1); + expect(spy).not.toHaveBeenCalled(); + spy.mockRestore(); + }); }); describe("paperclipLabels", () => {