diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index eac2e91..2d76a6c 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -1,20 +1,51 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import type * as k8s from "@kubernetes/client-node"; import type { Writable } from "node:stream"; +import type { AdapterExecutionContext } from "@paperclipai/adapter-utils"; -// Mock the K8s client before importing execute so streamPodLogsOnce picks up -// the mocked getLogApi. The mock's logApi.log never resolves, simulating the -// FAR-10 hang: K8s API drops the connection but the client awaits forever. +// All K8s API mock functions — declared before vi.mock() so the factory can +// reference them. The mock's logApi.log default is a never-resolving promise, +// simulating the FAR-10 hang where K8s API drops the connection indefinitely. const mockLogFn = vi.fn(); +const mockGetSelfPodInfo = vi.fn(); +const mockBatchListJobs = vi.fn(); +const mockBatchCreateJob = vi.fn(); +const mockBatchReadJob = vi.fn(); +const mockBatchDeleteJob = vi.fn(); +const mockBatchPatchJob = vi.fn(); +const mockCoreListPods = vi.fn(); +const mockCoreReadPodLog = vi.fn(); +const mockCoreCreateSecret = vi.fn(); +const mockCorePatchSecret = vi.fn(); +const mockCoreDeleteSecret = vi.fn(); + vi.mock("./k8s-client.js", () => ({ getLogApi: () => ({ log: mockLogFn }), - getBatchApi: () => ({}), - getCoreApi: () => ({}), + getBatchApi: () => ({ + listNamespacedJob: mockBatchListJobs, + createNamespacedJob: mockBatchCreateJob, + readNamespacedJob: mockBatchReadJob, + deleteNamespacedJob: mockBatchDeleteJob, + patchNamespacedJob: mockBatchPatchJob, + }), + getCoreApi: () => ({ + listNamespacedPod: mockCoreListPods, + readNamespacedPodLog: mockCoreReadPodLog, + createNamespacedSecret: mockCoreCreateSecret, + patchNamespacedSecret: mockCorePatchSecret, + deleteNamespacedSecret: mockCoreDeleteSecret, + }), getAuthzApi: () => ({}), - getSelfPodInfo: vi.fn(), + getSelfPodInfo: mockGetSelfPodInfo, resetCache: vi.fn(), })); +const mockPrepareBundle = vi.fn(); +vi.mock("./prompt-cache.js", () => ({ + prepareClaudePromptBundle: mockPrepareBundle, +})); + + const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, execute } = await import("./execute.js"); function makeJob(opts: { @@ -349,3 +380,752 @@ describe("streamPodLogsOnce bail timer", () => { expect(mockLogFn).toHaveBeenCalledOnce(); }); }); + +// ─── Helpers shared across execute() integration tests ─────────────────────── + +function makeCtx(overrides: Partial = {}): AdapterExecutionContext { + return { + runId: "run-test-001", + agent: { + id: "agent-abc", + companyId: "co1", + name: "Test Agent", + adapterType: "claude_k8s", + adapterConfig: {}, + }, + runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null }, + config: {}, + context: {}, + onLog: vi.fn().mockResolvedValue(undefined), + ...overrides, + } as unknown as AdapterExecutionContext; +} + +function makeSelfPodResult() { + return { + namespace: "paperclip", + image: "paperclipai/paperclip:latest", + imagePullSecrets: [], + dnsConfig: undefined, + pvcClaimName: "paperclip-data", + secretVolumes: [], + inheritedEnv: {}, + inheritedEnvValueFrom: [], + inheritedEnvFrom: [], + }; +} + +function makeBundle() { + return { + bundleKey: "test-bundle", + rootDir: "/tmp/test-bundle", + addDir: "/tmp/test-bundle", + instructionsFilePath: null, + }; +} + +// Valid minimal Claude stream-json output used in happy-path tests. +const CLAUDE_HAPPY_OUTPUT = [ + JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_test123" }), + JSON.stringify({ + type: "result", + subtype: "success", + result: "Done.", + session_id: "sess_test123", + usage: { input_tokens: 100, output_tokens: 50, cache_read_input_tokens: 10 }, + total_cost_usd: 0.001, + }), +].join("\n") + "\n"; + +// ─── execute: concurrency guard paths ──────────────────────────────────────── + +describe("execute: concurrency guard", () => { + beforeEach(() => { + vi.clearAllMocks(); + mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); + }); + + it("returns k8s_concurrency_guard_unreachable when listNamespacedJob throws", async () => { + mockBatchListJobs.mockRejectedValue(new Error("K8s API unavailable")); + const result = await execute(makeCtx()); + expect(result.errorCode).toBe("k8s_concurrency_guard_unreachable"); + expect(result.errorMessage).toContain("K8s API unavailable"); + }); + + it("returns k8s_concurrent_run_blocked when reattach disabled and orphan is running", async () => { + const orphan = makeJob({ runId: "prior-run", agentId: "agent-abc", terminal: false }); + mockBatchListJobs.mockResolvedValue({ items: [orphan] }); + const result = await execute(makeCtx({ config: { reattachOrphanedJobs: false } } as Partial)); + expect(result.errorCode).toBe("k8s_concurrent_run_blocked"); + expect(result.errorMessage).toContain("reattach disabled"); + }); + + it("returns k8s_orphan_task_unknown when orphan has no task label", async () => { + // No taskId on the orphan job and no taskId in context → block_task_unknown + const orphan = makeJob({ runId: "prior-run", agentId: "agent-abc" }); // no taskId label + mockBatchListJobs.mockResolvedValue({ items: [orphan] }); + // context.taskId absent → currentTaskLabel = null → block_task_unknown + const result = await execute(makeCtx()); + expect(result.errorCode).toBe("k8s_orphan_task_unknown"); + }); + + it("returns k8s_concurrent_run_blocked when orphan task-id mismatches current task", async () => { + const orphan = makeJob({ runId: "prior-run", agentId: "agent-abc", taskId: "task-other" }); + mockBatchListJobs.mockResolvedValue({ items: [orphan] }); + const result = await execute( + makeCtx({ context: { taskId: "task-current" } } as Partial), + ); + expect(result.errorCode).toBe("k8s_concurrent_run_blocked"); + expect(result.errorMessage).toContain("different task"); + }); + + it("returns k8s_orphan_session_mismatch when task matches but session differs", async () => { + const orphan = makeJob({ + runId: "prior-run", + agentId: "agent-abc", + taskId: "task-match", + sessionId: "sess-other", + }); + mockBatchListJobs.mockResolvedValue({ items: [orphan] }); + const result = await execute( + makeCtx({ + context: { taskId: "task-match" }, + runtime: { sessionId: "sess-current", sessionParams: null, sessionDisplayId: null, taskKey: null }, + } as Partial), + ); + expect(result.errorCode).toBe("k8s_orphan_session_mismatch"); + expect(result.errorMessage).toContain("mismatched session"); + }); + + it("returns k8s_concurrent_run_blocked when same-run job is still running", async () => { + // runId matches → samRun.length > 0 → blocked + const sameRunJob = makeJob({ runId: "run-test-001", agentId: "agent-abc", terminal: false }); + mockBatchListJobs.mockResolvedValue({ items: [sameRunJob] }); + const result = await execute(makeCtx()); + expect(result.errorCode).toBe("k8s_concurrent_run_blocked"); + expect(result.errorMessage).toContain("still running for this agent"); + }); + + it("reattaches to a matching orphan and returns k8s_pod_reattach_failed when pod is missing", async () => { + // Orphan with matching taskId and sessionId → reattach classification → reattachTarget is set + const orphan = makeJob({ + runId: "prior-run", + agentId: "agent-abc", + taskId: "task-match", + sessionId: "sess-match", + }); + mockBatchListJobs.mockResolvedValue({ items: [orphan] }); + mockBatchPatchJob.mockResolvedValue({}); + mockPrepareBundle.mockResolvedValue(makeBundle()); + // Pod lookup finds nothing → reattach pod-not-found error + mockCoreListPods.mockResolvedValue({ items: [] }); + + const result = await execute( + makeCtx({ + context: { taskId: "task-match" }, + runtime: { sessionId: "sess-match", sessionParams: null, sessionDisplayId: null, taskKey: null }, + } as Partial), + ); + + expect(result.errorCode).toBe("k8s_pod_reattach_failed"); + expect(result.errorMessage).toContain("no pod"); + }); +}); + +// ─── execute: job creation paths ───────────────────────────────────────────── + +describe("execute: job creation", () => { + beforeEach(() => { + vi.resetAllMocks(); + mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); + mockBatchListJobs.mockResolvedValue({ items: [] }); // no concurrent jobs + mockPrepareBundle.mockResolvedValue(makeBundle()); + mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } }); + mockBatchDeleteJob.mockResolvedValue({}); + }); + + it("returns k8s_job_create_failed when createNamespacedJob throws", async () => { + mockBatchCreateJob.mockRejectedValue(new Error("quota exceeded")); + const result = await execute(makeCtx()); + expect(result.errorCode).toBe("k8s_job_create_failed"); + expect(result.errorMessage).toContain("quota exceeded"); + }); + + it("returns k8s_pod_schedule_failed when pod scheduling times out", async () => { + mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "uid-1" } }); + mockBatchDeleteJob.mockResolvedValue({}); + // Pod never appears → waitForPod eventually times out. + // Provide a config with very short timeout to avoid a real 2-minute wait. + // Instead, make listNamespacedPod return an unschedulable condition immediately. + mockCoreListPods.mockResolvedValue({ + items: [ + { + metadata: { name: "pod-xyz" }, + status: { + phase: "Pending", + conditions: [ + { type: "PodScheduled", status: "False", reason: "Unschedulable", message: "no nodes available" }, + ], + containerStatuses: [], + initContainerStatuses: [], + }, + }, + ], + }); + + const result = await execute(makeCtx()); + + expect(result.errorCode).toBe("k8s_pod_schedule_failed"); + expect(result.errorMessage).toContain("unschedulable"); + }); +}); + +// ─── execute: full happy path ───────────────────────────────────────────────── + +describe("execute: happy path", () => { + // vi.resetAllMocks() ensures the mockResolvedValueOnce queue is fully cleared + // before each test so beforeEach always starts with a known queue depth of zero. + beforeEach(() => { + vi.resetAllMocks(); + vi.useFakeTimers(); + mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); + mockBatchListJobs.mockResolvedValue({ items: [] }); + mockPrepareBundle.mockResolvedValue(makeBundle()); + mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } }); + mockBatchPatchJob.mockResolvedValue({}); + + // Default: waitForPod gets Running pod (once queue), getPodExitCode gets + // the terminated-exit-0 pod (default return value). + // Tests that need a different exit code should call + // mockCoreListPods.mockResolvedValue(exitCode1Pod) + // which replaces only the default; the once-queue entry from this beforeEach + // is still consumed by the first waitForPod call. + mockCoreListPods + .mockResolvedValueOnce({ + items: [ + { + metadata: { name: "pod-abc" }, + status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, + }, + ], + }) + .mockResolvedValue({ + items: [ + { + metadata: { name: "pod-abc" }, + status: { + containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }], + }, + }, + ], + }); + + // waitForJobCompletion: Complete on first read + mockBatchReadJob.mockResolvedValue({ + status: { conditions: [{ type: "Complete", status: "True" }] }, + }); + + // streamPodLogsOnce: write valid Claude output to the writable stream + mockLogFn.mockImplementation( + async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { + // chunks.push() is called synchronously inside the Writable handler, + // so the output is captured even before the write callback fires. + writable.write(CLAUDE_HAPPY_OUTPUT); + }, + ); + + mockBatchDeleteJob.mockResolvedValue({}); + mockCoreDeleteSecret.mockResolvedValue({}); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("returns a successful result with session, usage, and model fields", async () => { + const onSpawn = vi.fn().mockResolvedValue(undefined); + const onMeta = vi.fn().mockResolvedValue(undefined); + const ctx = makeCtx({ onSpawn, onMeta } as Partial); + + const executePromise = execute(ctx); + + // streamPodLogs checks stopSignal after streamPodLogsOnce returns. With fake + // timers the reconnect delay (3 s) is held by a fake setTimeout — advance past + // it so the loop exits and Promise.allSettled resolves. advanceTimersByTimeAsync + // flushes all pending microtasks between timer firings, including the + // waitForJobCompletion resolution that sets logStopSignal.stopped = true. + await vi.advanceTimersByTimeAsync(3_100); + + const result = await executePromise; + + expect(result.exitCode).toBe(0); + expect(result.timedOut).toBe(false); + expect(result.errorMessage).toBeNull(); + expect(result.sessionId).toBe("sess_test123"); + expect(result.usage?.inputTokens).toBe(100); + expect(result.usage?.outputTokens).toBe(50); + expect(result.usage?.cachedInputTokens).toBe(10); + expect(result.provider).toBe("anthropic"); + // cleanupJob must have been called + expect(mockBatchDeleteJob).toHaveBeenCalled(); + }); + + it("returns timedOut=true when the job deadline is exceeded", async () => { + // Override waitForJobCompletion to report DeadlineExceeded + mockBatchReadJob.mockResolvedValue({ + status: { + conditions: [{ type: "Failed", status: "True", reason: "DeadlineExceeded" }], + }, + }); + + const executePromise = execute(makeCtx({ config: { timeoutSec: 30 } } as Partial)); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.timedOut).toBe(true); + expect(result.errorCode).toBe("timeout"); + }); + + it("returns session_unavailable and clearSession=true on unknown-session Claude error", async () => { + // isClaudeUnknownSessionError matches /no conversation found with session id/i + const sessionErrorOutput = [ + JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_bad" }), + JSON.stringify({ + type: "result", + subtype: "error", + result: "No conversation found with session id sess_bad", + is_error: true, + session_id: "sess_bad", + usage: { input_tokens: 10, output_tokens: 5, cache_read_input_tokens: 0 }, + total_cost_usd: 0.0, + }), + ].join("\n") + "\n"; + + mockLogFn.mockImplementation( + async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { + writable.write(sessionErrorOutput); + }, + ); + // Once-queue entry for waitForPod already set by beforeEach; override the + // default so getPodExitCode returns exitCode=1. + mockCoreListPods.mockResolvedValue({ + items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 1 } } }] } }], + }); + + const executePromise = execute( + makeCtx({ runtime: { sessionId: "sess_bad", sessionParams: null, sessionDisplayId: null, taskKey: null } } as Partial), + ); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.clearSession).toBe(true); + expect(result.errorCode).toBe("session_unavailable"); + }); + + it("surfaces buildPartialRunError when stdout has no result event", async () => { + // Log stream returns only init line — no result event + const noResultOutput = JSON.stringify({ + type: "system", + subtype: "init", + model: "claude-sonnet-4-6", + session_id: "sess_x", + }) + "\n"; + + mockLogFn.mockImplementation( + async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { + writable.write(noResultOutput); + }, + ); + // Override default so getPodExitCode returns exit code 1 (model-hint path); + // the once-queue entry from beforeEach is still consumed by waitForPod. + mockCoreListPods.mockResolvedValue({ + items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 1 } } }] } }], + }); + + const executePromise = execute(makeCtx()); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.exitCode).toBe(1); + expect(result.errorMessage).toContain("claude-sonnet-4-6"); + }); + + it("does not delete the Job when retainJobs=true", async () => { + const executePromise = execute(makeCtx({ config: { retainJobs: true } } as Partial)); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.exitCode).toBe(0); + expect(mockBatchDeleteJob).not.toHaveBeenCalled(); + }); + + it("handles cleanupJob failure gracefully (best-effort)", async () => { + mockBatchDeleteJob.mockRejectedValue(new Error("forbidden: delete not allowed")); + + const executePromise = execute(makeCtx()); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + // cleanupJob failure must not propagate — execute should still succeed + expect(result.exitCode).toBe(0); + expect(result.errorMessage).toBeNull(); + }); + + it("falls back to one-shot readPodLogs when log stream returns empty", async () => { + // Log stream writes nothing — simulates fast container exit before follow connect + mockLogFn.mockImplementation(async () => {}); + // One-shot read returns full output + mockCoreReadPodLog.mockResolvedValue(CLAUDE_HAPPY_OUTPUT); + + const executePromise = execute(makeCtx()); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.sessionId).toBe("sess_test123"); + expect(mockCoreReadPodLog).toHaveBeenCalled(); + }); + + it("replaces partial stream with longer one-shot pod log read", async () => { + // Stream writes only the init line (no result event) — partial capture + const initLine = JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_x" }) + "\n"; + mockLogFn.mockImplementation( + async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { + writable.write(initLine); + }, + ); + // One-shot read returns the full output, which is longer and has a result event + mockCoreReadPodLog.mockResolvedValue(CLAUDE_HAPPY_OUTPUT); + + const executePromise = execute(makeCtx()); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.sessionId).toBe("sess_test123"); + expect(mockCoreReadPodLog).toHaveBeenCalled(); + }); + + it("proceeds with captured output when job is deleted by TTL (404 in completion poll)", async () => { + // waitForJobCompletion catches 404 and returns jobGone=true — execute must + // continue to stdout parsing rather than returning an error. + mockBatchReadJob.mockRejectedValue( + Object.assign(new Error("Not Found"), { response: { statusCode: 404 } }), + ); + + const executePromise = execute(makeCtx()); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.exitCode).toBe(0); + expect(result.sessionId).toBe("sess_test123"); + }); + + it("reconnects log stream and logs status when job completion takes > 3s", async () => { + // Make waitForJobCompletion take 4s so the 3s stream reconnect fires first. + // timeoutSec=4, graceSec=0 → completionTimeoutMs=4000. + // Sequence: poll at t=0 (non-terminal, 2s delay) → poll at t=2000 (non-terminal, + // 2s delay) → at t=4000 deadline passes → timedOut=true → stopped=true. + // Meanwhile: reconnect at t=3000 (attempt=1) → line 393 fires → stream reconnects. + mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); // never terminal + + const executePromise = execute( + makeCtx({ config: { timeoutSec: 4, graceSec: 0 } } as Partial), + ); + + // Timer sequence (all times relative to when execute() registers fake timers, + // which is after readPaperclipRuntimeSkillEntries real I/O completes): + // t+2000: waitForJobCompletion poll (non-terminal → sleep 2000ms more) + // t+3000: streamPodLogs reconnect (attempt=1 → sleep 3000ms more) + // t+4000: waitForJobCompletion deadline exceeded → timedOut=true → stopped=true + // t+6000: streamPodLogs sleep ends → while(!stopped) → exits → allSettled resolves + // + // readPaperclipRuntimeSkillEntries does real fs.stat I/O under fake timers. + // Each advanceTimersByTimeAsync call gives one real event-loop turn for that + // I/O to complete. Multiple small advances ensure I/O drains before fake timers + // fire, and the total (~12200ms) covers the I/O delay plus the 6000ms sequence. + await vi.advanceTimersByTimeAsync(2_100); + await vi.advanceTimersByTimeAsync(1_100); + await vi.advanceTimersByTimeAsync(1_000); + await vi.advanceTimersByTimeAsync(2_000); + await vi.advanceTimersByTimeAsync(3_000); + await vi.advanceTimersByTimeAsync(3_000); + + const result = await executePromise; + + expect(result.timedOut).toBe(true); + expect(result.errorCode).toBe("timeout"); + }, 15_000); + + it("waitForJobCompletion respects deadline and returns timedOut via poll loop", async () => { + // timeoutSec=1, graceSec=0 → completionTimeoutMs=1000ms. The poll delay (2s) + // fires at t=2000 > deadline (t=1000) → while loop exits → returns timedOut. + mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); // never terminal + + const executePromise = execute( + makeCtx({ config: { timeoutSec: 1, graceSec: 0 } } as Partial), + ); + // Multiple advances provide event-loop turns for readPaperclipRuntimeSkillEntries + // I/O to drain before fake timers fire. Total ~8400ms covers the I/O delay + // (~4200ms observed in isolation) plus the 3000ms timer sequence needed: + // t+2000: poll 2 fires, while(Date.now() < deadline) = false → timedOut=true → stopped=true + // t+3000: reconnect sleep fires → while(!stopped) → exits → allSettled resolves + await vi.advanceTimersByTimeAsync(2_100); + await vi.advanceTimersByTimeAsync(1_100); + await vi.advanceTimersByTimeAsync(1_000); + await vi.advanceTimersByTimeAsync(2_100); + await vi.advanceTimersByTimeAsync(2_100); + const result = await executePromise; + + expect(result.timedOut).toBe(true); + expect(result.errorCode).toBe("timeout"); + }, 15_000); + + it("waits for pod creation (no-pod state) then succeeds when pod appears", async () => { + // Override mockCoreListPods: first call returns empty (no pod yet), + // second returns running, default returns terminated exit 0. + mockCoreListPods.mockReset(); + mockCoreListPods + .mockResolvedValueOnce({ items: [] }) + .mockResolvedValueOnce({ + items: [{ + metadata: { name: "pod-abc" }, + status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, + }], + }) + .mockResolvedValue({ + items: [{ + metadata: { name: "pod-abc" }, + status: { + containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }], + }, + }], + }); + + const executePromise = execute(makeCtx()); + // Multiple advances provide event-loop turns for readPaperclipRuntimeSkillEntries + // I/O to drain. Total ~9400ms covers I/O delay (~4200ms) plus the 5000ms + // sequence: waitForPod sleep (2000ms) → pod found → allSettled starts → + // waitForJobCompletion Complete (immediate, stopped=true) → streamPodLogs + // reconnect sleep (3000ms) → while(!stopped) → exits. + await vi.advanceTimersByTimeAsync(2_100); + await vi.advanceTimersByTimeAsync(1_100); + await vi.advanceTimersByTimeAsync(1_000); + await vi.advanceTimersByTimeAsync(2_100); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.exitCode).toBe(0); + }, 15_000); + + it("logs warning and continues when instructionsFilePath file does not exist", async () => { + // The catch block in execute() logs a warning and proceeds with null instructions + const executePromise = execute( + makeCtx({ config: { instructionsFilePath: "/nonexistent/agent-instructions.md" } } as Partial), + ); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.exitCode).toBe(0); + }); + + it("logs warning for extra labels with reserved prefix (skippedLabels)", async () => { + // Labels starting with "paperclip.io/" are reserved and get skipped + const executePromise = execute( + makeCtx({ config: { labels: { "paperclip.io/custom": "value" } } } as Partial), + ); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.exitCode).toBe(0); + }); + + it("logs pod pending → init-waiting → running transition and then succeeds", async () => { + // First poll: pod is Pending with init and main containers in waiting state + // Second poll: pod is Running → waitForPod returns + mockCoreListPods.mockReset(); + mockCoreListPods + .mockResolvedValueOnce({ + items: [{ + metadata: { name: "pod-abc" }, + status: { + phase: "Pending", + initContainerStatuses: [{ name: "write-prompt", state: { waiting: { reason: "PodInitializing" } } }], + containerStatuses: [{ name: "claude", state: { waiting: { reason: "PodInitializing" } } }], + }, + }], + }) + .mockResolvedValueOnce({ + items: [{ + metadata: { name: "pod-abc" }, + status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, + }], + }) + .mockResolvedValue({ + items: [{ + metadata: { name: "pod-abc" }, + status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] }, + }], + }); + + const executePromise = execute(makeCtx()); + // First poll: Pending → logs phase, schedules next poll at t=2000 + // Second poll: Running → waitForPod returns, then log streaming begins + // Third advance: fires streamPodLogs 3s reconnect timer + await vi.advanceTimersByTimeAsync(2_100); // t=0→2100: first waitForPod poll (Pending) + await vi.advanceTimersByTimeAsync(2_100); // t=2100→4200: second waitForPod poll (Running) + await vi.advanceTimersByTimeAsync(3_100); // t=4200→7300: streamPodLogs reconnect + const result = await executePromise; + + expect(result.exitCode).toBe(0); + }, 15_000); + + it("returns running pod via allInitsDone && mainRunning even when phase=Pending", async () => { + // Phase stays Pending, but init containers are done and main is running. + // waitForPod returns immediately via the allInitsDone && mainRunning branch (no 2s delay). + mockCoreListPods.mockReset(); + mockCoreListPods + .mockResolvedValueOnce({ + items: [{ + metadata: { name: "pod-abc" }, + status: { + phase: "Pending", + initContainerStatuses: [{ name: "write-prompt", state: { terminated: { exitCode: 0 } } }], + containerStatuses: [{ name: "claude", state: { running: { startedAt: "2024-01-01T00:00:00Z" } } }], + }, + }], + }) + .mockResolvedValue({ + items: [{ + metadata: { name: "pod-abc" }, + status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] }, + }], + }); + + const executePromise = execute(makeCtx()); + await vi.advanceTimersByTimeAsync(3_100); + const result = await executePromise; + + expect(result.exitCode).toBe(0); + }); +}); + +// ─── execute: waitForPod edge cases ────────────────────────────────────────── + +describe("execute: waitForPod edge cases", () => { + beforeEach(() => { + vi.resetAllMocks(); + mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); + mockBatchListJobs.mockResolvedValue({ items: [] }); + mockPrepareBundle.mockResolvedValue(makeBundle()); + mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "uid-1" } }); + mockBatchDeleteJob.mockResolvedValue({}); + }); + + it("throws k8s_pod_schedule_failed when pod reaches phase=Failed immediately", async () => { + mockCoreListPods.mockResolvedValue({ + items: [{ + metadata: { name: "pod-fail" }, + status: { + phase: "Failed", + containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 137, reason: "OOMKilled" } } }], + initContainerStatuses: [], + }, + }], + }); + + const result = await execute(makeCtx()); + + expect(result.errorCode).toBe("k8s_pod_schedule_failed"); + expect(result.errorMessage).toContain("OOMKilled"); + }); + + it("throws k8s_pod_schedule_failed when init container exits non-zero", async () => { + mockCoreListPods.mockResolvedValue({ + items: [{ + metadata: { name: "pod-x" }, + status: { + phase: "Pending", + initContainerStatuses: [{ + name: "write-prompt", + state: { terminated: { exitCode: 1, reason: "Error" } }, + }], + containerStatuses: [], + }, + }], + }); + + const result = await execute(makeCtx()); + + expect(result.errorCode).toBe("k8s_pod_schedule_failed"); + expect(result.errorMessage).toContain("write-prompt"); + }); + + it("throws k8s_pod_schedule_failed when init container has ImagePullBackOff", async () => { + mockCoreListPods.mockResolvedValue({ + items: [{ + metadata: { name: "pod-x" }, + status: { + phase: "Pending", + initContainerStatuses: [{ + name: "write-prompt", + state: { waiting: { reason: "ImagePullBackOff", message: "pull failed" } }, + }], + containerStatuses: [], + }, + }], + }); + + const result = await execute(makeCtx()); + + expect(result.errorCode).toBe("k8s_pod_schedule_failed"); + expect(result.errorMessage).toContain("image pull"); + }); + + it("throws k8s_pod_schedule_failed when main container has CrashLoopBackOff", async () => { + mockCoreListPods.mockResolvedValue({ + items: [{ + metadata: { name: "pod-x" }, + status: { + phase: "Pending", + initContainerStatuses: [], + containerStatuses: [{ + name: "claude", + state: { waiting: { reason: "CrashLoopBackOff" } }, + }], + }, + }], + }); + + const result = await execute(makeCtx()); + + expect(result.errorCode).toBe("k8s_pod_schedule_failed"); + expect(result.errorMessage).toContain("crash loop"); + }); +}); + +// ─── execute: concurrency guard — multiple orphan sorting ──────────────────── + +describe("execute: concurrency guard — multiple orphans", () => { + beforeEach(() => { + vi.clearAllMocks(); + mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); + }); + + it("sorts multiple orphans newest-first and processes them in that order", async () => { + // orphanNew has a newer timestamp and a mismatching task → block_task_mismatch + // orphanOld has an older timestamp and a matching task → would reattach + // The sort (lines 603-605) must put orphanNew first so it is the one classified. + const orphanOld = makeJob({ runId: "prior-1", agentId: "agent-abc", taskId: "task-match" }); + orphanOld.metadata!.creationTimestamp = new Date("2024-01-01T00:00:00Z") as unknown as Date; + const orphanNew = makeJob({ runId: "prior-2", agentId: "agent-abc", taskId: "task-other" }); + orphanNew.metadata!.creationTimestamp = new Date("2024-01-02T00:00:00Z") as unknown as Date; + + mockBatchListJobs.mockResolvedValue({ items: [orphanOld, orphanNew] }); + const result = await execute( + makeCtx({ context: { taskId: "task-match" } } as Partial), + ); + + // Newest orphan (task-other) is classified first → block_task_mismatch + expect(result.errorCode).toBe("k8s_concurrent_run_blocked"); + expect(result.errorMessage).toContain("different task"); + }); +});