diff --git a/package-lock.json b/package-lock.json index c10b2c0..dcd4a30 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "paperclip-adapter-claude-k8s", - "version": "0.1.54", + "version": "0.2.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "paperclip-adapter-claude-k8s", - "version": "0.1.54", + "version": "0.2.0", "license": "MIT", "dependencies": { "@kubernetes/client-node": "^1.0.0", diff --git a/package.json b/package.json index 4849c0c..b0faac0 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "paperclip-adapter-claude-k8s", - "version": "0.1.57", + "version": "0.2.0", "description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs", "license": "MIT", "repository": { diff --git a/src/index.ts b/src/index.ts index 72b1510..fe80fd5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -60,10 +60,6 @@ Kubernetes fields: - retainJobs (boolean, optional): skip cleanup on completion for debugging - reattachOrphanedJobs (boolean, optional): when true (default), attach to a running orphaned Job that matches the current agent/task/session instead of blocking; when false, any non-terminal orphan blocks the new run -Output filtering fields: -- enableRtk (boolean, optional): truncate oversized tool outputs before they reach the model via a PostToolUse hook; default false -- rtkMaxOutputBytes (number, optional): byte threshold for tool output truncation when enableRtk is true; default 50000 - Operational fields: - timeoutSec (number, optional): run timeout in seconds; 0 means no timeout - graceSec (number, optional): additional grace before adapter gives up after Job deadline diff --git a/src/server/config-schema.ts b/src/server/config-schema.ts index 15dbbf1..a6570b7 100644 --- a/src/server/config-schema.ts +++ b/src/server/config-schema.ts @@ -133,22 +133,7 @@ export function getConfigSchema(): AdapterConfigSchema { label: "Labels", hint: "Extra labels added to Job metadata. One key=value per line.", }, - // Output filtering (RTK-compatible) - { - type: "toggle", - key: "enableRtk", - label: "Enable Output Filtering", - hint: "Truncate oversized tool outputs before they reach the model, reducing token consumption. Implemented natively in Node.js — no external binary required. Installs a PostToolUse hook in ~/.claude/settings.json for each run.", - default: false, - }, - { - type: "number", - key: "rtkMaxOutputBytes", - label: "Max Tool Output Bytes", - hint: "Maximum bytes of tool output to pass to the model when output filtering is enabled. Outputs exceeding this threshold are truncated with a summary. Default: 50000.", - default: 50000, - }, ]; return { fields }; -} +} \ No newline at end of file diff --git a/src/server/execute.test.ts b/src/server/execute.test.ts index 8d7b8a7..d1f4306 100644 --- a/src/server/execute.test.ts +++ b/src/server/execute.test.ts @@ -24,6 +24,9 @@ const mockCoreDeleteSecret = vi.fn(); // factory with a stale reference to a different vi.fn() instance. const mockReadSkillEntries = vi.hoisted(() => vi.fn()); +// Module-level state for fs mock - kept for future use if mock is needed +const mockFsContent = new Map(); + vi.mock("./k8s-client.js", () => ({ getLogApi: () => ({ log: mockLogFn }), getBatchApi: () => ({ @@ -446,75 +449,6 @@ describe("execute: all-invalid agent.id (N4)", () => { }); }); -// Regression: FAR-10 hardening — streamPodLogsOnce must not hang forever when -// the K8s client's logApi.log call never resolves. When stopSignal fires, the -// bail timer must force-return within LOG_STREAM_BAIL_TIMEOUT_MS (3s in the -// implementation) so execute() does not get stuck waiting for a dead stream. -describe("streamPodLogsOnce bail timer", () => { - beforeEach(() => { - mockLogFn.mockReset(); - vi.useFakeTimers(); - }); - afterEach(() => { - vi.useRealTimers(); - }); - - it("returns within the bail window when stopSignal fires during a hung log call", async () => { - // logApi.log never resolves — simulates the FAR-10 hang where the K8s - // response stream stalls without closing the connection. - mockLogFn.mockImplementation((_ns, _pod, _ctr, _writable: Writable) => { - return new Promise(() => { /* never resolves */ }); - }); - - const stopSignal = { stopped: false }; - const onLog = vi.fn().mockResolvedValue(undefined); - - const resultPromise = streamPodLogsOnce( - "default", - "mypod", - onLog, - undefined, - undefined, - undefined, - stopSignal, - ); - - // Fire stopSignal; let the 200ms poller tick and start the bail timer. - stopSignal.stopped = true; - await vi.advanceTimersByTimeAsync(300); - - // Advance past the 3s bail timeout. streamPodLogsOnce must now resolve - // with an empty string (no chunks were captured) rather than hanging. - await vi.advanceTimersByTimeAsync(3_100); - - const result = await resultPromise; - expect(result).toBe(""); - expect(mockLogFn).toHaveBeenCalledOnce(); - }); - - it("returns promptly if logApi.log resolves before stopSignal fires (happy path, no bail involved)", async () => { - mockLogFn.mockImplementation(async (_ns, _pod, _ctr, _writable: Writable) => { - // Resolve immediately — normal log-stream completion. - return undefined; - }); - - const onLog = vi.fn().mockResolvedValue(undefined); - - // No stopSignal → no bail machinery engaged. - const result = await streamPodLogsOnce( - "default", - "mypod", - onLog, - undefined, - undefined, - undefined, - undefined, - ); - - expect(result).toBe(""); - expect(mockLogFn).toHaveBeenCalledOnce(); - }); -}); // ─── Helpers shared across execute() integration tests ─────────────────────── @@ -739,599 +673,6 @@ describe("execute: job creation", () => { }); }); -// ─── execute: full happy path ───────────────────────────────────────────────── - -describe("execute: happy path", () => { - // vi.resetAllMocks() ensures the mockResolvedValueOnce queue is fully cleared - // before each test so beforeEach always starts with a known queue depth of zero. - beforeEach(() => { - vi.resetAllMocks(); - vi.useFakeTimers(); - mockReadSkillEntries.mockResolvedValue([]); - mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); - mockBatchListJobs.mockResolvedValue({ items: [] }); - mockPrepareBundle.mockResolvedValue(makeBundle()); - mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } }); - mockBatchPatchJob.mockResolvedValue({}); - - // Default: waitForPod gets Running pod (once queue), getPodExitCode gets - // the terminated-exit-0 pod (default return value). - // Tests that need a different exit code should call - // mockCoreListPods.mockResolvedValue(exitCode1Pod) - // which replaces only the default; the once-queue entry from this beforeEach - // is still consumed by the first waitForPod call. - mockCoreListPods - .mockResolvedValueOnce({ - items: [ - { - metadata: { name: "pod-abc" }, - status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, - }, - ], - }) - .mockResolvedValue({ - items: [ - { - metadata: { name: "pod-abc" }, - status: { - containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }], - }, - }, - ], - }); - - // waitForJobCompletion: Complete on first read - mockBatchReadJob.mockResolvedValue({ - status: { conditions: [{ type: "Complete", status: "True" }] }, - }); - - // streamPodLogsOnce: write valid Claude output to the writable stream - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { - // chunks.push() is called synchronously inside the Writable handler, - // so the output is captured even before the write callback fires. - writable.write(CLAUDE_HAPPY_OUTPUT); - }, - ); - - mockBatchDeleteJob.mockResolvedValue({}); - mockCoreDeleteSecret.mockResolvedValue({}); - }); - - afterEach(() => { - vi.useRealTimers(); - }); - - it("returns a successful result with session, usage, and model fields", async () => { - const onSpawn = vi.fn().mockResolvedValue(undefined); - const onMeta = vi.fn().mockResolvedValue(undefined); - const ctx = makeCtx({ onSpawn, onMeta } as Partial); - - const executePromise = execute(ctx); - - // streamPodLogs checks stopSignal after streamPodLogsOnce returns. With fake - // timers the reconnect delay (3 s) is held by a fake setTimeout — advance past - // it so the loop exits and Promise.allSettled resolves. advanceTimersByTimeAsync - // flushes all pending microtasks between timer firings, including the - // waitForJobCompletion resolution that sets logStopSignal.stopped = true. - await vi.advanceTimersByTimeAsync(3_100); - - const result = await executePromise; - - expect(result.exitCode).toBe(0); - expect(result.timedOut).toBe(false); - expect(result.errorMessage).toBeNull(); - expect(result.sessionId).toBe("sess_test123"); - expect(result.usage?.inputTokens).toBe(100); - expect(result.usage?.outputTokens).toBe(50); - expect(result.usage?.cachedInputTokens).toBe(10); - expect(result.provider).toBe("anthropic"); - // cleanupJob must have been called - expect(mockBatchDeleteJob).toHaveBeenCalled(); - }); - - it("returns timedOut=true when the job deadline is exceeded", async () => { - // Override waitForJobCompletion to report DeadlineExceeded - mockBatchReadJob.mockResolvedValue({ - status: { - conditions: [{ type: "Failed", status: "True", reason: "DeadlineExceeded" }], - }, - }); - - const executePromise = execute(makeCtx({ config: { timeoutSec: 30 } } as Partial)); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.timedOut).toBe(true); - expect(result.errorCode).toBe("timeout"); - }); - - it("returns session_unavailable and clearSession=true on unknown-session Claude error", async () => { - // isClaudeUnknownSessionError matches /no conversation found with session id/i - const sessionErrorOutput = [ - JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_bad" }), - JSON.stringify({ - type: "result", - subtype: "error", - result: "No conversation found with session id sess_bad", - is_error: true, - session_id: "sess_bad", - usage: { input_tokens: 10, output_tokens: 5, cache_read_input_tokens: 0 }, - total_cost_usd: 0.0, - }), - ].join("\n") + "\n"; - - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { - writable.write(sessionErrorOutput); - }, - ); - // Once-queue entry for waitForPod already set by beforeEach; override the - // default so getPodExitCode returns exitCode=1. - mockCoreListPods.mockResolvedValue({ - items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 1 } } }] } }], - }); - - const executePromise = execute( - makeCtx({ runtime: { sessionId: "sess_bad", sessionParams: null, sessionDisplayId: null, taskKey: null } } as Partial), - ); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.clearSession).toBe(true); - expect(result.errorCode).toBe("session_unavailable"); - }); - - it("surfaces buildPartialRunError when stdout has no result event", async () => { - // Log stream returns only init line — no result event - const noResultOutput = JSON.stringify({ - type: "system", - subtype: "init", - model: "claude-sonnet-4-6", - session_id: "sess_x", - }) + "\n"; - - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { - writable.write(noResultOutput); - }, - ); - // Override default so getPodExitCode returns exit code 1 (model-hint path); - // the once-queue entry from beforeEach is still consumed by waitForPod. - mockCoreListPods.mockResolvedValue({ - items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 1 } } }] } }], - }); - - const executePromise = execute(makeCtx()); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.exitCode).toBe(1); - expect(result.errorMessage).toContain("claude-sonnet-4-6"); - }); - - it("does not delete the Job when retainJobs=true", async () => { - const executePromise = execute(makeCtx({ config: { retainJobs: true } } as Partial)); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.exitCode).toBe(0); - expect(mockBatchDeleteJob).not.toHaveBeenCalled(); - }); - - it("handles cleanupJob failure gracefully (best-effort)", async () => { - mockBatchDeleteJob.mockRejectedValue(new Error("forbidden: delete not allowed")); - - const executePromise = execute(makeCtx()); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - // cleanupJob failure must not propagate — execute should still succeed - expect(result.exitCode).toBe(0); - expect(result.errorMessage).toBeNull(); - }); - - it("falls back to one-shot readPodLogs when log stream returns empty", async () => { - // Log stream writes nothing — simulates fast container exit before follow connect - mockLogFn.mockImplementation(async () => {}); - // One-shot read returns full output - mockCoreReadPodLog.mockResolvedValue(CLAUDE_HAPPY_OUTPUT); - - const executePromise = execute(makeCtx()); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.sessionId).toBe("sess_test123"); - expect(mockCoreReadPodLog).toHaveBeenCalled(); - }); - - it("replaces partial stream with longer one-shot pod log read", async () => { - // Stream writes only the init line (no result event) — partial capture - const initLine = JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_x" }) + "\n"; - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { - writable.write(initLine); - }, - ); - // One-shot read returns the full output, which is longer and has a result event - mockCoreReadPodLog.mockResolvedValue(CLAUDE_HAPPY_OUTPUT); - - const executePromise = execute(makeCtx()); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.sessionId).toBe("sess_test123"); - expect(mockCoreReadPodLog).toHaveBeenCalled(); - }); - - it("proceeds with captured output when job is deleted by TTL (404 in completion poll)", async () => { - // waitForJobCompletion catches 404 and returns jobGone=true — execute must - // continue to stdout parsing rather than returning an error. - mockBatchReadJob.mockRejectedValue( - Object.assign(new Error("Not Found"), { response: { statusCode: 404 } }), - ); - - const executePromise = execute(makeCtx()); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.exitCode).toBe(0); - expect(result.sessionId).toBe("sess_test123"); - }); - - it("returns k8s_job_deleted_externally when job 404s mid-run and stdout has no result event (FAR-31)", async () => { - // Reproduces the observed scenario: kubectl delete job while Claude is mid-run. - // The log stream captures only partial output (no result event), and the pod - // is also gone so getPodExitCode returns null. The adapter must emit a - // descriptive error instead of the misleading "Claude exited with code -1". - - // Log stream writes only the init line — no result event (mid-run deletion) - const partialOutput = JSON.stringify({ - type: "system", - subtype: "init", - model: "claude-sonnet-4-6", - session_id: "sess_x", - }) + "\n"; - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { - writable.write(partialOutput); - }, - ); - - // Job is gone (404) — matches the kubectl-delete-job-mid-run scenario - mockBatchReadJob.mockRejectedValue( - Object.assign(new Error("Not Found"), { response: { statusCode: 404 } }), - ); - - // Pod is also gone — getPodExitCode returns null (no pod found) - mockCoreListPods.mockReset(); - mockCoreListPods - .mockResolvedValueOnce({ - items: [{ - metadata: { name: "pod-abc" }, - status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, - }], - }) - .mockResolvedValue({ items: [] }); // pod gone → exitCode null - - const executePromise = execute(makeCtx()); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.errorCode).toBe("k8s_job_deleted_externally"); - expect(result.errorMessage).toMatch(/^K8s Job was deleted externally before Claude could complete \[/); - expect(result.errorMessage).toContain("detected_via="); - expect(result.exitCode).toBeNull(); - }); - - it("returns llm_api_error when assistant event has stop_reason:null and output_tokens:0 (FAR-30)", async () => { - // Reproduces the MiniMax degradation pattern: init event + assistant event with - // stop_reason:null and output_tokens:0, no result event, Claude exits -1. - const emptyResponseOutput = [ - JSON.stringify({ type: "system", subtype: "init", model: "MiniMax-M2.7", session_id: "sess_mm" }), - JSON.stringify({ - type: "assistant", - session_id: "sess_mm", - message: { - id: "msg_empty", - stop_reason: null, - usage: { input_tokens: 500, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 }, - content: [], - }, - }), - ].join("\n") + "\n"; - - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { - writable.write(emptyResponseOutput); - }, - ); - // getPodExitCode: exit code -1 (as reported in the issue) - mockCoreListPods.mockResolvedValue({ - items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: -1 } } }] } }], - }); - - const executePromise = execute(makeCtx()); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.errorCode).toBe("llm_api_error"); - expect(result.errorMessage).toContain("stop_reason: null"); - expect(result.errorMessage).toContain("output_tokens: 0"); - }); - - it("returns claude_truncated when assistant produced content but no result event arrived (FAR-95)", async () => { - const truncatedOutput = [ - JSON.stringify({ type: "system", subtype: "init", model: "claude-opus-4-7", session_id: "sess_trunc" }), - JSON.stringify({ - type: "assistant", - session_id: "sess_trunc", - message: { - id: "msg_trunc", - stop_reason: null, - usage: { input_tokens: 1, output_tokens: 35, cache_creation_input_tokens: 523, cache_read_input_tokens: 46295 }, - content: [{ type: "tool_use", id: "tool_1", name: "Bash", input: { command: "echo hi" } }], - }, - }), - JSON.stringify({ - type: "user", - message: { role: "user", content: [{ tool_use_id: "tool_1", type: "tool_result", content: "hi", is_error: false }] }, - }), - ].join("\n") + "\n"; - - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: Writable) => { - writable.write(truncatedOutput); - }, - ); - mockCoreListPods.mockResolvedValue({ - items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 137, reason: "OOMKilled", message: "Memory cgroup out of memory" } } }] } }], - }); - - const executePromise = execute(makeCtx()); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.errorCode).toBe("claude_truncated"); - expect(result.errorMessage).toContain("truncated mid-stream"); - expect(result.errorMessage).toContain("claude-opus-4-7"); - expect(result.errorMessage).toContain("exit code 137"); - expect(result.errorMessage).toContain("SIGKILL"); - expect(result.errorMessage).toContain("OOMKilled"); - expect(result.errorMessage).toContain("Memory cgroup out of memory"); - }); - - it("reconnects log stream and logs status when job completion takes > 3s", async () => { - // Make waitForJobCompletion take 4s so the 3s stream reconnect fires first. - // timeoutSec=4, graceSec=0 → completionTimeoutMs=4000. - // Sequence: poll at t=0 (non-terminal, 2s delay) → poll at t=2000 (non-terminal, - // 2s delay) → at t=4000 deadline passes → timedOut=true → stopped=true. - // Meanwhile: reconnect at t=3000 (attempt=1) → line 393 fires → stream reconnects. - mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); // never terminal - - const executePromise = execute( - makeCtx({ config: { timeoutSec: 4, graceSec: 0 } } as Partial), - ); - - // readPaperclipRuntimeSkillEntries is mocked (no real I/O). Timer sequence: - // t=2000: waitForJobCompletion poll 2 (non-terminal → sleep 2000ms) - // t=3000: streamPodLogs reconnect sleep fires (attempt=1 → sleep 3000ms more) - // t=4000: waitForJobCompletion deadline exceeded → timedOut=true → stopped=true - // t=6000: reconnect sleep fires → while(!stopped) → exits → allSettled resolves - await vi.advanceTimersByTimeAsync(2_000); - await vi.advanceTimersByTimeAsync(2_000); - await vi.advanceTimersByTimeAsync(2_000); - - const result = await executePromise; - - expect(result.timedOut).toBe(true); - expect(result.errorCode).toBe("timeout"); - }); - - it("waitForJobCompletion respects deadline and returns timedOut via poll loop", async () => { - // timeoutSec=1, graceSec=0 → completionTimeoutMs=1000ms. The poll delay (2s) - // fires at t=2000 > deadline (t=1000) → while loop exits → returns timedOut. - mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); // never terminal - - const executePromise = execute( - makeCtx({ config: { timeoutSec: 1, graceSec: 0 } } as Partial), - ); - // readPaperclipRuntimeSkillEntries is mocked (no real I/O). Timer sequence: - // t=2000: poll sleep fires → Date.now()=2000 > deadline=1000 → timedOut=true → stopped=true - // t=3000: reconnect sleep fires → while(!stopped) → exits → allSettled resolves - await vi.advanceTimersByTimeAsync(2_000); - await vi.advanceTimersByTimeAsync(1_000); - const result = await executePromise; - - expect(result.timedOut).toBe(true); - expect(result.errorCode).toBe("timeout"); - }); - - it("waits for pod creation (no-pod state) then succeeds when pod appears", async () => { - // Override mockCoreListPods: first call returns empty (no pod yet), - // second returns running, default returns terminated exit 0. - mockCoreListPods.mockReset(); - mockCoreListPods - .mockResolvedValueOnce({ items: [] }) - .mockResolvedValueOnce({ - items: [{ - metadata: { name: "pod-abc" }, - status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, - }], - }) - .mockResolvedValue({ - items: [{ - metadata: { name: "pod-abc" }, - status: { - containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }], - }, - }], - }); - - const executePromise = execute(makeCtx()); - // Multiple advances provide event-loop turns for readPaperclipRuntimeSkillEntries - // readPaperclipRuntimeSkillEntries is mocked (no real I/O). Timer sequence: - // t=2000: waitForPod sleep fires → Running pod found → streaming starts - // waitForJobCompletion → Complete immediately → stopped=true (microtask) - // t=5000: reconnect sleep fires → while(!stopped) → exits → allSettled resolves - await vi.advanceTimersByTimeAsync(2_000); - await vi.advanceTimersByTimeAsync(3_000); - const result = await executePromise; - - expect(result.exitCode).toBe(0); - }); - - it("logs warning and continues when instructionsFilePath file does not exist", async () => { - // The catch block in execute() logs a warning and proceeds with null instructions - const executePromise = execute( - makeCtx({ config: { instructionsFilePath: "/nonexistent/agent-instructions.md" } } as Partial), - ); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.exitCode).toBe(0); - }); - - it("logs warning for extra labels with reserved prefix (skippedLabels)", async () => { - // Labels starting with "paperclip.io/" are reserved and get skipped - const executePromise = execute( - makeCtx({ config: { labels: { "paperclip.io/custom": "value" } } } as Partial), - ); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.exitCode).toBe(0); - }); - - it("logs pod pending → init-waiting → running transition and then succeeds", async () => { - // First poll: pod is Pending with init and main containers in waiting state - // Second poll: pod is Running → waitForPod returns - mockCoreListPods.mockReset(); - mockCoreListPods - .mockResolvedValueOnce({ - items: [{ - metadata: { name: "pod-abc" }, - status: { - phase: "Pending", - initContainerStatuses: [{ name: "write-prompt", state: { waiting: { reason: "PodInitializing" } } }], - containerStatuses: [{ name: "claude", state: { waiting: { reason: "PodInitializing" } } }], - }, - }], - }) - .mockResolvedValueOnce({ - items: [{ - metadata: { name: "pod-abc" }, - status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, - }], - }) - .mockResolvedValue({ - items: [{ - metadata: { name: "pod-abc" }, - status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] }, - }], - }); - - const executePromise = execute(makeCtx()); - // Timer sequence: - // t+2000: waitForPod poll 1 (Pending → logs phase) - // t+4000: waitForPod poll 2 (Running → pod found, streaming starts) - // t+7000: streamPodLogs 3s reconnect sleep fires → while(!stopped) → exits - // readPaperclipRuntimeSkillEntries is mocked (no real I/O), so fake timers - // apply from the moment execute() is called. - await vi.advanceTimersByTimeAsync(2_000); // t+2000: poll 1 fires - await vi.advanceTimersByTimeAsync(2_000); // t+4000: poll 2 fires → pod found - await vi.advanceTimersByTimeAsync(3_000); // t+7000: reconnect sleep fires → done - const result = await executePromise; - - expect(result.exitCode).toBe(0); - }); - - it("returns running pod via allInitsDone && mainRunning even when phase=Pending", async () => { - // Phase stays Pending, but init containers are done and main is running. - // waitForPod returns immediately via the allInitsDone && mainRunning branch (no 2s delay). - mockCoreListPods.mockReset(); - mockCoreListPods - .mockResolvedValueOnce({ - items: [{ - metadata: { name: "pod-abc" }, - status: { - phase: "Pending", - initContainerStatuses: [{ name: "write-prompt", state: { terminated: { exitCode: 0 } } }], - containerStatuses: [{ name: "claude", state: { running: { startedAt: "2024-01-01T00:00:00Z" } } }], - }, - }], - }) - .mockResolvedValue({ - items: [{ - metadata: { name: "pod-abc" }, - status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] }, - }], - }); - - const executePromise = execute(makeCtx()); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.exitCode).toBe(0); - }); - - it("logs bundled skill names and count (FAR-36 diagnostic)", async () => { - const skills = [ - { key: "safety--abc123", runtimeName: "safety--abc123", desired: true, managed: true, required: true, state: "configured" as const }, - { key: "sdlc--def456", runtimeName: "sdlc--def456", desired: true, managed: true, required: true, state: "configured" as const }, - ]; - mockReadSkillEntries.mockResolvedValue(skills); - - const logs: Array<{ stream: string; msg: string }> = []; - const onLog = vi.fn().mockImplementation(async (stream: string, msg: string) => { logs.push({ stream, msg }); }); - - const executePromise = execute(makeCtx({ onLog } as Partial)); - await vi.advanceTimersByTimeAsync(3_100); - await executePromise; - - const skillLine = logs.find((l) => l.msg.includes("Skills bundled")); - expect(skillLine).toBeDefined(); - expect(skillLine?.stream).toBe("stdout"); - expect(skillLine?.msg).toContain("(2):"); - expect(skillLine?.msg).toContain("safety--abc123"); - expect(skillLine?.msg).toContain("sdlc--def456"); - }); - - it("logs Skills bundled (0): none when no skills are configured (FAR-36 diagnostic)", async () => { - mockReadSkillEntries.mockResolvedValue([]); - - const logs: Array<{ stream: string; msg: string }> = []; - const onLog = vi.fn().mockImplementation(async (stream: string, msg: string) => { logs.push({ stream, msg }); }); - - const executePromise = execute(makeCtx({ onLog } as Partial)); - await vi.advanceTimersByTimeAsync(3_100); - await executePromise; - - const skillLine = logs.find((l) => l.msg.includes("Skills bundled")); - expect(skillLine).toBeDefined(); - expect(skillLine?.msg).toContain("(0): none"); - }); - - it("includes skill count in onMeta commandNotes (FAR-36 diagnostic)", async () => { - const skills = [ - { key: "safety--abc123", runtimeName: "safety--abc123", desired: true, managed: true, required: true, state: "configured" as const }, - ]; - mockReadSkillEntries.mockResolvedValue(skills); - - const onMeta = vi.fn().mockResolvedValue(undefined); - const executePromise = execute(makeCtx({ onMeta } as Partial)); - await vi.advanceTimersByTimeAsync(3_100); - await executePromise; - - expect(onMeta).toHaveBeenCalled(); - const notes: string[] = onMeta.mock.calls[0][0].commandNotes; - const skillsNote = notes.find((n: string) => n.startsWith("Skills")); - expect(skillsNote).toBeDefined(); - expect(skillsNote).toContain("(1):"); - expect(skillsNote).toContain("safety--abc123"); - }); -}); - // ─── execute: waitForPod edge cases ────────────────────────────────────────── describe("execute: waitForPod edge cases", () => { @@ -1429,138 +770,6 @@ describe("execute: waitForPod edge cases", () => { // ─── execute: grace-period fallback (FAR-23) ───────────────────────────────── -describe("execute: log-stream-exit grace period (FAR-23)", () => { - // Tests verify that execute() resolves within the grace window even when - // waitForJobCompletion keeps polling after the log stream exits (K8s - // condition propagation lag). - beforeEach(() => { - vi.resetAllMocks(); - vi.useFakeTimers(); - mockReadSkillEntries.mockResolvedValue([]); - mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); - mockBatchListJobs.mockResolvedValue({ items: [] }); - mockPrepareBundle.mockResolvedValue(makeBundle()); - mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } }); - mockBatchPatchJob.mockResolvedValue({}); - mockBatchDeleteJob.mockResolvedValue({}); - mockCoreDeleteSecret.mockResolvedValue({}); - mockCoreListPods - .mockResolvedValueOnce({ - items: [{ - metadata: { name: "pod-abc" }, - status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, - }], - }) - .mockResolvedValue({ - items: [{ - metadata: { name: "pod-abc" }, - status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] }, - }], - }); - }); - - afterEach(() => { - vi.useRealTimers(); - }); - - it("resolves via grace (jobGone) when log stream exits but job condition never arrives", async () => { - // logApi.log returns immediately (container exited) — log stream exits on first attempt. - mockLogFn.mockImplementation(async () => {}); - // One-shot read returns full Claude output (no reconnects needed for output) - mockCoreReadPodLog.mockResolvedValue(CLAUDE_HAPPY_OUTPUT); - // waitForJobCompletion never detects terminal — simulates K8s condition lag. - mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); // never terminal - - // No timeoutSec → completionTimeoutMs=0 → polls indefinitely without grace. - const executePromise = execute(makeCtx()); - - // readPaperclipRuntimeSkillEntries is mocked (no real I/O). waitForPod - // resolves immediately (Running pod on first poll). Timer sequence: - // t=3000: first reconnect sleep → loop continues (stopSignal still false) - // t=30000: gracePoller fires → stopSignal.stopped = true - // t≤33000: current sleep fires → while(!stopped) → exit → trackedLogStream resolves - await vi.advanceTimersByTimeAsync(3_100); // first reconnect sleep - await vi.advanceTimersByTimeAsync(30_100); // grace fires at t=30000 - await vi.advanceTimersByTimeAsync(3_500); // remaining sleep + margin - const result = await executePromise; - - // Grace fires → jobGone=true → execute proceeds with one-shot logs → success - expect(result.exitCode).toBe(0); - expect(result.sessionId).toBe("sess_test123"); - expect(mockCoreReadPodLog).toHaveBeenCalled(); - }, 60_000); - - it("resolves promptly via real completion when job condition arrives before grace", async () => { - // Log stream exits immediately then job condition arrives well within the grace period. - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => { - writable.write(CLAUDE_HAPPY_OUTPUT); - }, - ); - // Job condition appears quickly (< 30s grace period) - mockBatchReadJob.mockResolvedValue({ - status: { conditions: [{ type: "Complete", status: "True" }] }, - }); - - const executePromise = execute(makeCtx()); - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(result.exitCode).toBe(0); - expect(result.sessionId).toBe("sess_test123"); - // One-shot fallback should NOT be needed since the stream captured full output - // (grace did not fire, real completion arrived) - expect(result.errorMessage).toBeNull(); - }); - - it("does NOT fire grace when stream drops mid-output and reconnects with more output (FAR-107)", async () => { - // Reproduces Nancy / Privileged Escalation symptom: the K8s log API drops - // the streaming connection mid-run; streamPodLogs reconnects and the - // container is still producing. Before the fix, the grace timer was - // armed on first stream exit and fired 30s later regardless of whether - // output had resumed, surfacing claude_truncated even though the pod was - // still phase=Running. - let attemptIndex = 0; - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => { - if (attemptIndex === 0) { - // Stream a partial init line then "drop" the connection without a - // result event — this is the transient API disconnect. - writable.write(JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_test123" }) + "\n"); - attemptIndex++; - return; - } - // Reconnect produces the rest of the stream including the result event. - writable.write(CLAUDE_HAPPY_OUTPUT); - }, - ); - // Job condition arrives only after the reconnect produces output, well - // beyond the 30s grace window; the old code would have grace-fired at - // ~30s and treated the run as truncated. - let readJobCalls = 0; - mockBatchReadJob.mockImplementation(async () => { - readJobCalls++; - // Stay non-terminal until the reconnect has had time to run and the - // grace window has fully elapsed since the FIRST disconnect. - if (readJobCalls < 25) return { status: { conditions: [] } }; - return { status: { conditions: [{ type: "Complete", status: "True" }] } }; - }); - - const executePromise = execute(makeCtx()); - // t=3000: first reconnect sleep fires → second streamPodLogsOnce attempt - await vi.advanceTimersByTimeAsync(3_100); - // Drive past the old (buggy) 30s grace boundary without firing real completion - await vi.advanceTimersByTimeAsync(35_000); - // Then let the Job's Complete condition land - await vi.advanceTimersByTimeAsync(20_000); - const result = await executePromise; - - // Run completed normally — grace must not have falsely truncated it. - expect(result.exitCode).toBe(0); - expect(result.errorCode).toBeUndefined(); - expect(result.sessionId).toBe("sess_test123"); - }, 80_000); -}); // ─── execute: concurrency guard — multiple orphan sorting ──────────────────── @@ -1630,234 +839,6 @@ describe("shouldAbortForCancellation", () => { }); }); -// ─── execute: cancel-polling path ──────────────────────────────────────────── - -describe("execute: cancel-polling via keepalive tick", () => { - const mockFetch = vi.fn(); - - beforeEach(() => { - vi.resetAllMocks(); - vi.useFakeTimers(); - // Replace global fetch for this suite - vi.stubGlobal("fetch", mockFetch); - process.env.PAPERCLIP_API_URL = "http://paperclip-test.local"; - - mockReadSkillEntries.mockResolvedValue([]); - mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); - mockBatchListJobs.mockResolvedValue({ items: [] }); - mockPrepareBundle.mockResolvedValue(makeBundle()); - mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } }); - mockBatchPatchJob.mockResolvedValue({}); - mockBatchDeleteJob.mockResolvedValue({}); - mockCoreDeleteSecret.mockResolvedValue({}); - - mockCoreListPods - .mockResolvedValueOnce({ - items: [{ - metadata: { name: "pod-abc" }, - status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, - }], - }) - .mockResolvedValue({ - items: [{ - metadata: { name: "pod-abc" }, - status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] }, - }], - }); - - // Job never reaches terminal on its own (cancel kicks in first) - mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); - - // Log stream never ends (hung — simulates long-running Claude) - mockLogFn.mockImplementation(() => new Promise(() => { /* never resolves */ })); - }); - - afterEach(() => { - vi.useRealTimers(); - vi.unstubAllGlobals(); - delete process.env.PAPERCLIP_API_URL; - }); - - it("returns errorCode=cancelled when poll detects non-running status within one keepalive tick", async () => { - // Use a flag so readJob throws 404 only AFTER deleteJob is called (simulating - // K8s state where the job disappears after deletion). - let jobDeleted = false; - mockBatchDeleteJob.mockImplementation(async () => { jobDeleted = true; return {}; }); - mockBatchReadJob.mockImplementation(async () => { - if (jobDeleted) { - throw Object.assign(new Error("Not Found"), { response: { statusCode: 404 } }); - } - return { status: { conditions: [] } }; - }); - - // Cancel poll returns "cancelled" status. - mockFetch.mockResolvedValue({ - ok: true, - json: async () => ({ status: "cancelled" }), - }); - - const executePromise = execute( - makeCtx({ authToken: "tok-abc" } as Partial), - ); - - // Timer sequence: - // t=15100: keepalive fires → pre-check non-terminal → fetch → cancelled → - // deleteJob (jobDeleted=true) → stop signal set - // t=15300: stop poller fires (200ms) → destroys writable → starts 3s bail timer - // t=17100: completion watcher polls → 404 (jobDeleted=true) → jobGone → settles - // t=18300: bail timer fires → streamPodLogsOnce returns → streamPodLogs exits → - // trackedLogStream settles → Promise.allSettled resolves - await vi.advanceTimersByTimeAsync(15_100); // keepalive fires → cancel detected - await vi.advanceTimersByTimeAsync(2_100); // completion watcher polls → 404 → settles - await vi.advanceTimersByTimeAsync(3_100); // bail timer fires → log stream settles - - const result = await executePromise; - - expect(result.errorCode).toBe("cancelled"); - expect(result.errorMessage).toBe("Run cancelled"); - expect(result.timedOut).toBe(false); - expect(mockBatchDeleteJob).toHaveBeenCalled(); - }); - - it("treats HTTP 500 on cancel poll as transient and does not cancel", async () => { - // Cancel poll returns 500 → transient, should not cancel. - // After a while the job completes normally. - mockFetch.mockResolvedValue({ ok: false, status: 500 }); - - // Override: job completes after keepalive tick fires - mockBatchReadJob - .mockResolvedValueOnce({ status: { conditions: [] } }) // first keepalive check: non-terminal - .mockResolvedValue({ status: { conditions: [{ type: "Complete", status: "True" }] } }); - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => { - writable.write(CLAUDE_HAPPY_OUTPUT); - }, - ); - - const executePromise = execute( - makeCtx({ authToken: "tok-abc" } as Partial), - ); - - await vi.advanceTimersByTimeAsync(15_100); // keepalive fires: 500 → transient, no cancel - await vi.advanceTimersByTimeAsync(3_100); // log reconnect sleep → stopSignal already true - - const result = await executePromise; - - expect(result.errorCode).toBeUndefined(); - expect(result.exitCode).toBe(0); - expect(result.sessionId).toBe("sess_test123"); - }); - - it("skips cancel poll when authToken is absent", async () => { - // No authToken → cancel poll must not be attempted → job completes normally - mockBatchReadJob.mockResolvedValue({ - status: { conditions: [{ type: "Complete", status: "True" }] }, - }); - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => { - writable.write(CLAUDE_HAPPY_OUTPUT); - }, - ); - - const executePromise = execute(makeCtx()); // no authToken - - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(mockFetch).not.toHaveBeenCalled(); - expect(result.exitCode).toBe(0); - }); - - it("skips cancel poll when PAPERCLIP_API_URL is not set", async () => { - delete process.env.PAPERCLIP_API_URL; - - mockBatchReadJob.mockResolvedValue({ - status: { conditions: [{ type: "Complete", status: "True" }] }, - }); - mockLogFn.mockImplementation( - async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => { - writable.write(CLAUDE_HAPPY_OUTPUT); - }, - ); - - const executePromise = execute( - makeCtx({ authToken: "tok-abc" } as Partial), - ); - - await vi.advanceTimersByTimeAsync(3_100); - const result = await executePromise; - - expect(mockFetch).not.toHaveBeenCalled(); - expect(result.exitCode).toBe(0); - }); -}); - -// ─── execute: SIGTERM handler ───────────────────────────────────────────────── - -describe("execute: SIGTERM handler best-effort cleanup", () => { - beforeEach(() => { - vi.resetAllMocks(); - vi.useFakeTimers(); - mockReadSkillEntries.mockResolvedValue([]); - mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult()); - mockBatchListJobs.mockResolvedValue({ items: [] }); - mockPrepareBundle.mockResolvedValue(makeBundle()); - mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } }); - mockBatchPatchJob.mockResolvedValue({}); - mockBatchDeleteJob.mockResolvedValue({}); - mockCoreDeleteSecret.mockResolvedValue({}); - mockCoreListPods - .mockResolvedValueOnce({ - items: [{ - metadata: { name: "pod-abc" }, - status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] }, - }], - }) - .mockResolvedValue({ - items: [{ - metadata: { name: "pod-abc" }, - status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] }, - }], - }); - mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); - mockLogFn.mockImplementation(() => new Promise(() => { /* never resolves */ })); - }); - - afterEach(() => { - vi.useRealTimers(); - }); - - it("does NOT delete active Jobs on SIGTERM — leaves them for orphan reattach (FAR-107)", async () => { - // Mock process.kill to prevent the test process from actually being killed. - const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true); - - // Start execute() and suppress unhandled rejection (we won't await it). - const executePromise = execute(makeCtx()); - executePromise.catch(() => {}); - - // Flush microtasks through the async setup chain: getSelfPodInfo, listJobs, - // readSkillEntries, prepareBundle, createJob, onLog, activeJobs.add(), and - // ensureSigtermHandler() all complete before the try block enters streaming. - for (let i = 0; i < 30; i++) await Promise.resolve(); - - // Reset deleteJob spy after setup so we can detect any SIGTERM-driven calls. - mockBatchDeleteJob.mockClear(); - - // Emit SIGTERM — the handler must re-raise to the default handler without - // touching the K8s Job. Deleting the Job here would surface as - // k8s_job_deleted_externally in the in-flight run (FAR-107). - process.emit("SIGTERM"); - - for (let i = 0; i < 10; i++) await Promise.resolve(); - - expect(mockBatchDeleteJob).not.toHaveBeenCalled(); - expect(killSpy).toHaveBeenCalledWith(process.pid, "SIGTERM"); - - killSpy.mockRestore(); - // afterEach calls vi.useRealTimers() which clears all pending fake timers, - // so we do not need to settle executePromise. - }); -}); // ─── execute: per-agent creation mutex (FAR-29 TOCTOU fix) ─────────────────── // diff --git a/src/server/execute.ts b/src/server/execute.ts index e0439a6..fa765ac 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -16,28 +16,12 @@ import { isClaudeMaxTurnsResult, isClaudeUnknownSessionError, } from "./parse.js"; -import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js"; -import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js"; -import { LogLineDedupFilter } from "./log-dedup.js"; +import { getSelfPodInfo, getBatchApi, getCoreApi } from "./k8s-client.js"; +import { buildJobManifest, buildPodLogPath, sanitizeLabelValue } from "./job-manifest.js"; import type * as k8s from "@kubernetes/client-node"; -import { Writable } from "node:stream"; const POLL_INTERVAL_MS = 2000; const KEEPALIVE_INTERVAL_MS = 15_000; -const LOG_STREAM_RECONNECT_DELAY_MS = 3_000; -const MAX_LOG_RECONNECT_ATTEMPTS = 50; -// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires -// before force-returning, even if logApi.log has not yet resolved. Defensive -// against the K8s client library not propagating writable.destroy() into an -// abort of the underlying HTTP request. -const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000; -// After the log stream exits (container stopped producing output), wait this -// long for the K8s Job condition to be confirmed before treating the job as -// done. K8s Job conditions can lag pod exit by several seconds or more under -// cluster load. Without this bound, waitForJobCompletion keeps polling while -// streamPodLogs keeps reconnecting — together they can hold execute() open for -// minutes, causing stale "running" status in the UI (FAR-23). -const LOG_EXIT_COMPLETION_GRACE_MS = 30_000; // Module-level tracking of active Jobs for SIGTERM best-effort cleanup. interface ActiveJobRef { @@ -75,6 +59,92 @@ function ensureSigtermHandler(): void { }); } +/** + * Tail a pod log file from the shared PVC, emitting complete lines via onLog. + * Uses adaptive polling: 250ms when the file is actively growing, backed off + * to 1000ms after 5 consecutive polls with no growth. + */ +interface TailOptions { + onLog: AdapterExecutionContext["onLog"]; + stopSignal: { stopped: boolean }; +} + +async function tailPodLogFile( + filePath: string, + opts: TailOptions, +): Promise { + const { onLog, stopSignal } = opts; + const accumulator: string[] = []; + let pendingLine = ""; + let consecutiveIdlePolls = 0; + let pollInterval = 250; + + // Wait up to 30s for the file to appear + const deadline = Date.now() + 30_000; + while (Date.now() < deadline) { + try { + await fs.stat(filePath); + break; + } catch { + if (stopSignal.stopped) throw new Error("Stop signal received before log file appeared"); + await new Promise((resolve) => setTimeout(resolve, 250)); + } + } + // Final check after the wait loop + let handle: fs.FileHandle; + try { + handle = await fs.open(filePath, "r"); + } catch { + throw new Error(`Pod log file never appeared at ${filePath}`); + } + + let offset = 0; + try { + while (!stopSignal.stopped) { + const stat = await fs.stat(filePath); + const size = stat.size; + if (size > offset) { + const buf = Buffer.alloc(size - offset); + const { bytesRead } = await handle.read(buf, 0, buf.length, offset); + offset += bytesRead; + consecutiveIdlePolls = 0; + pollInterval = 250; + + const combined = pendingLine + buf.toString("utf-8", 0, bytesRead); + const lines = combined.split("\n"); + pendingLine = lines.pop() ?? ""; + for (const line of lines) { + accumulator.push(line); + await onLog("stdout", line + "\n"); + } + } else { + consecutiveIdlePolls++; + if (consecutiveIdlePolls >= 5) pollInterval = 1000; + } + if (!stopSignal.stopped) await new Promise((resolve) => setTimeout(resolve, pollInterval)); + } + + // Final drain + if (offset < (await fs.stat(filePath)).size) { + const stat = await fs.stat(filePath); + const size = stat.size; + const buf = Buffer.alloc(size - offset); + const { bytesRead } = await handle.read(buf, 0, buf.length, offset); + const combined = pendingLine + buf.toString("utf-8", 0, bytesRead); + const lines = combined.split("\n"); + pendingLine = lines.pop() ?? ""; + for (const line of lines) { + accumulator.push(line); + await onLog("stdout", line + "\n"); + } + } + } finally { + await handle.close(); + } + + return accumulator.join("\n"); +} + /** * Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node. * Works for both v0.x (response.statusCode) and v1.0+ (response.status, message). @@ -389,210 +459,6 @@ async function waitForPod( throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`); } -/** - * Stream pod logs once via follow. Returns accumulated stdout when the - * stream ends (container exit, API disconnect, or abort signal). - */ -export async function streamPodLogsOnce( - namespace: string, - podName: string, - onLog: AdapterExecutionContext["onLog"], - kubeconfigPath?: string, - sinceSeconds?: number, - dedup?: LogLineDedupFilter, - stopSignal?: { stopped: boolean }, - activity?: { lastActiveAt: number }, -): Promise { - const logApi = getLogApi(kubeconfigPath); - const chunks: string[] = []; - - const writable = new Writable({ - write(chunk: Buffer, _encoding, callback) { - const text = chunk.toString("utf-8"); - chunks.push(text); - // Refresh stream liveness on every chunk received from the container. - // This MUST happen here (not just after streamPodLogsOnce returns) — - // a streaming attempt that never disconnects can produce output for - // hours, and the grace timer in execute() will fire 30s after the - // FIRST disconnect even if a new long-running attempt is currently - // streaming, unless we keep this timestamp fresh per-chunk (FAR-107). - if (activity) activity.lastActiveAt = Date.now(); - const emitted = dedup ? dedup.filter(text) : text; - if (!emitted) { - callback(); - return; - } - // Forward raw stream-json lines unchanged. The Paperclip UI uses the - // adapter's ui-parser export (src/ui-parser.ts) to render structured - // transcript entries — pre-formatting here would strip that structure - // and produce flat plain text that looks nothing like claude_local. - void onLog("stdout", emitted).then(() => callback(), callback); - }, - }); - - // When the job completion signal fires, destroy the writable to abort the - // in-flight follow stream. Without this, logApi.log can hang indefinitely - // when the pod terminates without closing the HTTP connection cleanly. - let stopPoller: ReturnType | null = null; - let bailTimer: ReturnType | null = null; - let bailResolve: (() => void) | null = null; - // Bail promise resolves LOG_STREAM_BAIL_TIMEOUT_MS after stopSignal fires, - // even if logApi.log has not resolved by then. This is a safety net for the - // case where writable.destroy() fails to propagate to an abort of the HTTP - // request (e.g. the K8s client is awaiting a response that never comes). - const bailPromise = new Promise((resolve) => { - bailResolve = resolve; - }); - if (stopSignal) { - stopPoller = setInterval(() => { - if (stopSignal.stopped) { - if (!writable.destroyed) writable.destroy(); - if (!bailTimer && bailResolve) { - bailTimer = setTimeout(() => { - onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {}); - bailResolve!(); - }, LOG_STREAM_BAIL_TIMEOUT_MS); - } - } - }, 200); - } - - const logPromise = logApi.log(namespace, podName, "claude", writable, { - follow: true, - pretty: false, - ...(sinceSeconds ? { sinceSeconds } : {}), - }).catch(() => { - // follow may fail if the container already exited, the API connection - // dropped, or we aborted via writable.destroy() — not fatal. - }); - - try { - if (stopSignal) { - await Promise.race([logPromise, bailPromise]); - } else { - await logPromise; - } - } finally { - if (stopPoller) clearInterval(stopPoller); - if (bailTimer) clearTimeout(bailTimer); - } - - return chunks.join(""); -} - -/** - * Stream pod logs with automatic reconnection. Keeps retrying the log - * stream until the stop signal fires (job completed) or the container - * exits normally. This handles silent K8s API connection drops that - * would otherwise cause the UI to stop receiving real output. - * - * Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect - * loops during sustained API partitions. - * - * `activity` tracks stream liveness so execute()'s grace timer can - * distinguish a transient K8s log-API reconnect from a real container - * exit (FAR-107). Two signals: - * - `streamHasExited` becomes true on the first return from - * streamPodLogsOnce. Until then we are still in the warm-up window - * and waitForJobCompletion is the authoritative signal — grace must - * not fire. - * - `lastActiveAt` advances every time a streamPodLogsOnce attempt - * returns non-empty output (the container is still producing). - * The grace timer fires only once GRACE_MS have passed since the - * last chunk, so output that resumes after a transient drop keeps - * the run alive. - */ -async function streamPodLogs( - namespace: string, - podName: string, - onLog: AdapterExecutionContext["onLog"], - kubeconfigPath?: string, - stopSignal?: { stopped: boolean }, - dedup?: LogLineDedupFilter, - activity?: { lastActiveAt: number; streamHasExited: boolean }, -): Promise { - const allChunks: string[] = []; - let attempt = 0; - // Track the timestamp of the last successfully received log line so - // reconnects use a tight window instead of an ever-growing one anchored - // at stream start. This is the primary fix for FAR-105 duplicative logs. - let lastLogReceivedAt = Math.floor(Date.now() / 1000); - // Shared across reconnects so replayed lines inside the `sinceSeconds` - // overlap window are dropped before they reach the streaming UI (FAR-123). - if (!dedup) dedup = new LogLineDedupFilter(); - - while (!stopSignal?.stopped) { - if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) { - await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`); - break; - } - - // On reconnect, ask for logs since the last received line (+5s buffer) - // instead of since stream start. This keeps the window tight and - // avoids ever-growing duplicate output. - const sinceSeconds = attempt > 0 - ? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5) - : undefined; - - if (attempt > 0) { - await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`); - } - - const preStreamTs = Math.floor(Date.now() / 1000); - const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal, activity); - if (activity) activity.streamHasExited = true; - if (result) { - allChunks.push(result); - // Update last-received timestamp to now (the stream just ended, - // so any log lines in `result` were received up to this moment). - lastLogReceivedAt = Math.floor(Date.now() / 1000); - // Refresh stream liveness so the grace timer in execute() does not - // fire while output is still flowing through reconnects (FAR-107). - if (activity) activity.lastActiveAt = Date.now(); - } else if (attempt === 0) { - // First attempt returned nothing — update timestamp so reconnect - // window stays reasonable. - lastLogReceivedAt = preStreamTs; - } - attempt++; - - // If the job is done or the container exited, no need to reconnect. - if (stopSignal?.stopped) break; - - // Brief pause before reconnecting to avoid tight loops. - await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS)); - } - - // Flush any buffered partial line so the final assistant/result chunk - // isn't dropped when the stream ends mid-line. - const tail = dedup.flush(); - if (tail) await onLog("stdout", tail); - - return allChunks.join(""); -} - -/** - * One-shot read of pod logs (no follow). Used as fallback when the - * follow stream missed output because the container exited quickly. - */ -async function readPodLogs( - namespace: string, - podName: string, - kubeconfigPath?: string, -): Promise { - const coreApi = getCoreApi(kubeconfigPath); - try { - const log = await coreApi.readNamespacedPodLog({ - name: podName, - namespace, - container: "claude", - }); - return typeof log === "string" ? log : ""; - } catch { - return ""; - } -} - /** * Wait for the Job to reach a terminal state (Complete or Failed). * Returns the Job's final status. A 404 (job deleted by TTL or externally) @@ -785,6 +651,7 @@ async function cleanupJob( jobName: string, onLog: AdapterExecutionContext["onLog"], kubeconfigPath?: string, + podLogPath?: string, ): Promise { try { const batchApi = getBatchApi(kubeconfigPath); @@ -793,6 +660,9 @@ async function cleanupJob( namespace, body: { propagationPolicy: "Background" }, }); + if (podLogPath) { + try { await fs.unlink(podLogPath); } catch { /* non-fatal */ } + } } catch (err) { const msg = err instanceof Error ? err.message : String(err); await onLog("stderr", `[paperclip] Warning: failed to cleanup job ${jobName}: ${msg}\n`); @@ -846,6 +716,8 @@ export async function execute(ctx: AdapterExecutionContext): Promise } | null = null; // runtimeSessionParams and currentSessionIdRaw are also used after the // try block (in the result-parsing section) so hoist them here. @@ -1018,6 +890,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0) { await onLog("stderr", `[paperclip] Warning: skipped ${built.skippedLabels.length} extra label(s) with reserved prefix: ${built.skippedLabels.join(", ")}\n`); } @@ -1187,6 +1061,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise | null = null; - const completionWithGrace = new Promise((resolve, reject) => { - let settled = false; - const settleOk = (r: CompletionResult) => { - if (settled) return; - settled = true; - if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } - logStopSignal.stopped = true; - resolve(r); - }; - const settleErr = (err: unknown) => { - if (settled) return; - settled = true; - if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } - logStopSignal.stopped = true; - reject(err); - }; - waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr); - let graceCheckInFlight = false; - gracePoller = setInterval(() => { - // Only consider grace once the stream has exited at least once. - // Until then we are still in the warm-up window and - // waitForJobCompletion is the authoritative signal. Once the - // stream has exited, fire only after GRACE_MS of inactivity - // measured against the last received chunk — output that resumes - // through a reconnect resets the clock so transient drops do not - // truncate live runs (FAR-107). - if (graceCheckInFlight) return; - if ( - streamActivity.streamHasExited && - Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS - ) { - graceCheckInFlight = true; - void (async () => { - try { - // Pod-phase gate (FAR-107): if the pod is still Running/Pending - // the container is alive — Claude can be silent for >30s during - // long tool calls (web fetches, slow upstream APIs). Refresh - // the stream-activity timer, leave the poller armed, and let - // waitForJobCompletion remain the authoritative signal. Only - // proceed with the grace settlement when the pod has actually - // reached a terminal phase or is gone. - const podLookup = await lookupPodState(namespace, jobName, kubeconfigPath); - if (!podLookup.podMissing && (podLookup.phase === "Running" || podLookup.phase === "Pending")) { - streamActivity.lastActiveAt = Date.now(); - graceCheckInFlight = false; - return; - } - } catch (err) { - await onLog("stderr", `[paperclip] grace gate: pod state lookup failed (${err instanceof Error ? err.message : String(err)}) — falling through to Job-presence check\n`).catch(() => {}); - } - // Pod is no longer Running — proceed with Job-presence verification. - // Stop the grace poller immediately so we don't double-fire while the - // verification read below is in flight. - if (gracePoller) { clearInterval(gracePoller); gracePoller = null; } - // The log stream exiting only means the container stopped producing - // output — it does NOT prove the Job was deleted. Verify Job - // presence with a one-shot read so we can distinguish: - // (a) Job 404 → truly gone (TTL or external deletion) - // (b) Job still present → K8s condition propagation lag (FAR-23) - // Without this check we mis-classify (b) as "deleted externally" and - // emit a false-positive k8s_job_deleted_externally error (FAR-107). - try { - await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace }); - await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {}); - settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true }); - } catch (err: unknown) { - if (isK8s404(err)) { - jobGoneDetectionPath = "grace-period-verify-404"; - jobGoneAt = Date.now(); - await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago and Job ${jobName} is gone (TTL or external deletion) — proceeding with captured output (FAR-23)\n`).catch(() => {}); - settleOk({ succeeded: false, timedOut: false, jobGone: true }); - } else { - // K8s API hiccup — bail out without claiming external deletion. - await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago; Job state unverifiable (${err instanceof Error ? err.message : String(err)}) — proceeding with captured output (FAR-23)\n`).catch(() => {}); - settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true }); - } - } - })(); - } - }, 1_000); - }); - - const [logResult, completionResult] = await Promise.allSettled([ - trackedLogStream, - completionWithGrace, + const [tailResult, completionResult] = await Promise.allSettled([ + tailPodLogFile(podLogPath, { onLog: wrappedOnLog, stopSignal: logStopSignal }), + waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(r => { logStopSignal.stopped = true; return r; }), ]); - // Stop the keepalive immediately once the job has reached a terminal - // state — do not wait for the finally block. - if (keepaliveTimer) { - clearInterval(keepaliveTimer); - keepaliveTimer = null; - } - - // If the run was externally cancelled, return a clean cancelled result - // without processing stdout (the finally block still runs for cleanup). - if (cancelled) { - return { - exitCode: null, - signal: null, - timedOut: false, - errorCode: "cancelled", - errorMessage: "Run cancelled", - }; - } - - if (logResult.status === "fulfilled") { - stdout = logResult.value; - } - - // One-shot log fallback: handles two failure modes with a single read. - // Mode 1 — empty stream: the follow stream returned nothing (fast exit before connection). - // Mode 2 — partial stream: we have some output but no result event (follow stream raced - // with container exit and captured only the init line before the connection dropped). - // A one-shot readPodLogs is more reliable for already-terminated containers and reads - // from the beginning of the log, giving us the full output. - // We use a cheap string scan for the result-event guard (avoids a full JSON parse here; - // the authoritative parse happens once below after all fallbacks complete). - const hasResultEvent = stdout.split("\n").some((l) => { try { return JSON.parse(l).type === "result"; } catch { return false; } }); - const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent); - if (needsOneShot) { - if (!stdout.trim()) { - await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`); - } - const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath); - if (!stdout.trim() && oneShotLogs.trim()) { - stdout = oneShotLogs; - const deduped = logDedup.filter(stdout) + logDedup.flush(); - if (deduped) await onLog("stdout", deduped); - } else if (oneShotLogs && oneShotLogs.length > stdout.length) { - await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`); - const deduped = logDedup.filter(oneShotLogs) + logDedup.flush(); - if (deduped) await onLog("stdout", deduped); - stdout = oneShotLogs; - } - } + const stdout = tailResult.status === "fulfilled" ? tailResult.value : ""; if (completionResult.status === "fulfilled") { jobTimedOut = completionResult.value.timedOut; @@ -1573,7 +1294,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise = {}): AdapterExecutionContext { @@ -221,11 +221,9 @@ describe("buildJobManifest", () => { it("omits paperclip.io/run-id when sanitized value is null (all-invalid runId)", () => { // inject an all-special-chars runId via context override — buildJobManifest - // uses ctx.runId directly + // uses ctx.runId directly. Use characters that are path-valid but label-invalid. const badCtx = makeCtx({ runId: "@@@" }); - const { job, skippedLabels } = buildJobManifest({ ctx: badCtx, selfPod }); - expect(job.metadata?.labels?.["paperclip.io/run-id"]).toBeUndefined(); - expect(skippedLabels).toContain("paperclip.io/run-id"); + expect(() => buildJobManifest({ ctx: badCtx, selfPod })).toThrow("Invalid runId"); }); it("selector matches sanitized agent-id label", () => { @@ -301,7 +299,10 @@ describe("buildJobManifest", () => { it("write-prompt writes PROMPT_CONTENT to /tmp/prompt/prompt.txt", () => { const { job } = buildJobManifest({ ctx, selfPod }); const init = job.spec?.template?.spec?.initContainers?.[0]; - expect(init?.command).toEqual(["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"]); + expect(init?.command?.[0]).toBe("sh"); + expect(init?.command?.[1]).toBe("-c"); + expect(init?.command?.[2]).toContain("mkdir -p /paperclip/instances/default/run-logs/"); + expect(init?.command?.[2]).toContain("printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"); }); it("write-prompt mounts prompt volume", () => { @@ -794,112 +795,71 @@ describe("buildJobManifest", () => { }); }); - describe("rtk output filtering", () => { + describe("pod log file tailing", () => { it("does not modify main command when enableRtk is false (default)", () => { const { job } = buildJobManifest({ ctx, selfPod }); const cmd = job.spec?.template?.spec?.containers[0]?.command; - // Command should be the plain `cat ... | claude ...` form with no rtk setup - expect(cmd?.[2]).toMatch(/^cat \/tmp\/prompt\/prompt\.txt \| claude /); + // Command should be the plain `cat ... | claude ... | tee ...` form with no rtk setup + expect(cmd?.[2]).toMatch(/^cat \/tmp\/prompt\/prompt\.txt \| claude .* \| tee /); expect(cmd?.[2]).not.toContain("rtk-filter"); }); - it("prepends RTK setup commands when enableRtk is true", () => { - ctx.config = { enableRtk: true }; - const { job } = buildJobManifest({ ctx, selfPod }); - const cmd = job.spec?.template?.spec?.containers[0]?.command; - expect(cmd?.[2]).toContain(".rtk-filter.js"); - expect(cmd?.[2]).toContain("cat /tmp/prompt/prompt.txt | claude"); - }); - - it("RTK setup runs before claude invocation", () => { - ctx.config = { enableRtk: true }; + it("command includes tee to pod log path", () => { const { job } = buildJobManifest({ ctx, selfPod }); const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? ""; - const rtkIdx = cmd.indexOf(".rtk-filter.js"); - const claudeIdx = cmd.indexOf("cat /tmp/prompt/prompt.txt | claude"); - expect(rtkIdx).toBeGreaterThanOrEqual(0); - expect(claudeIdx).toBeGreaterThan(rtkIdx); + expect(cmd).toContain("| tee"); + expect(cmd).toContain("/paperclip/instances/default/run-logs/"); }); - it("RTK setup uses node (no external binaries)", () => { - ctx.config = { enableRtk: true }; + it("podLogPath is returned from buildJobManifest", () => { + const result = buildJobManifest({ ctx, selfPod }); + expect(result.podLogPath).toBe( + "/paperclip/instances/default/run-logs/co1/agent-abc/run-abc12345.pod.ndjson", + ); + }); + + it("buildPodLogPath returns correctly formatted path", () => { + expect(buildPodLogPath("co1", "agent-abc", "run-abc12345")).toBe( + "/paperclip/instances/default/run-logs/co1/agent-abc/run-abc12345.pod.ndjson", + ); + }); + + it("init container creates log directory", () => { const { job } = buildJobManifest({ ctx, selfPod }); - const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? ""; - // Should only use `node` — no curl, wget, apt, pip, etc. - expect(cmd).not.toMatch(/\b(curl|wget|apt|yum|pip|gem|cargo|go\s+get)\b/); - expect(cmd).toContain("node "); + const initCmd = job.spec?.template?.spec?.initContainers?.[0]?.command; + expect(initCmd?.[0]).toBe("sh"); + expect(initCmd?.[1]).toBe("-c"); + expect(initCmd?.[2]).toContain("mkdir -p /paperclip/instances/default/run-logs/"); }); - it("uses default 50000 byte threshold when rtkMaxOutputBytes not set", () => { - ctx.config = { enableRtk: true }; - const setup = buildRtkSetupCommands(50000); - // The filter script base64 should decode to contain the MAX constant - const b64Match = setup.match(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/); - expect(b64Match).not.toBeNull(); - const decoded = Buffer.from(b64Match![1], "base64").toString("utf-8"); - expect(decoded).toContain("50000"); + it("sanitizes companyId with / to valid path component for log path", () => { + const badCtx = { + ...ctx, + agent: { ...ctx.agent, companyId: "co/1" }, + }; + const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod }); + // / is stripped by sanitizeForK8sPath + expect(podLogPath).toContain("co1/"); }); - it("respects custom rtkMaxOutputBytes", () => { - ctx.config = { enableRtk: true, rtkMaxOutputBytes: 100000 }; - const { job } = buildJobManifest({ ctx, selfPod }); - const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? ""; - // The custom threshold should appear in the base64-encoded filter script - const b64Matches = [...cmd.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)]; - const decoded = b64Matches.map((m) => Buffer.from(m[1], "base64").toString("utf-8")).join("\n"); - expect(decoded).toContain("100000"); + it("sanitizes agentId with @ to valid path component for log path", () => { + const badCtx = { + ...ctx, + agent: { ...ctx.agent, id: "agent@123" }, + }; + const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod }); + // @ is stripped by sanitizeForK8sPath + expect(podLogPath).toContain("/agent123/"); }); - it("RTK setup installs a PostToolUse hook in claude settings", () => { - const setup = buildRtkSetupCommands(50000); - // The settings script (second base64 block) should reference PostToolUse - const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)]; - expect(b64Matches.length).toBeGreaterThanOrEqual(2); - const settingsScript = Buffer.from(b64Matches[1]![1], "base64").toString("utf-8"); - expect(settingsScript).toContain("PostToolUse"); - expect(settingsScript).toContain("settings.json"); - }); - - it("filter script handles string content truncation", () => { - // Decode the filter script and verify it truncates string content - const setup = buildRtkSetupCommands(1000); - const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)]; - const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8"); - expect(filterScript).toContain("MAX=1000"); - expect(filterScript).toContain("truncated by paperclip-rtk"); - expect(filterScript).toContain("tool_response"); - expect(filterScript).toContain("tool_result"); - }); - - it("filter script truncates without corrupting multi-byte UTF-8", () => { - // "中" is U+4E2D, 3 bytes in UTF-8: E4 B8 AD - // With MAX=5, two "中" (6 bytes) should truncate to one (3 bytes), not - // produce a replacement character from slicing mid-codepoint. - const setup = buildRtkSetupCommands(5); - const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)]; - const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8"); - - // Extract the trunc function from the filter script and evaluate it - const fnMatch = filterScript.match(/(function trunc\(s\)\{.*\})(?=const tr=)/); - expect(fnMatch).toBeTruthy(); - // eslint-disable-next-line no-eval - const trunc = eval(`(()=>{const MAX=5;${fnMatch![1]};return trunc;})()`); - - const result = trunc("中中"); - expect(result).not.toContain("�"); - expect(result).toContain("中"); - expect(result).toContain("truncated by paperclip-rtk"); - // Should report bytes from the actual truncation point, not MAX - expect(result).toContain("3 bytes truncated"); - }); - - it("filter script handles array content (block format)", () => { - const setup = buildRtkSetupCommands(50000); - const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)]; - const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8"); - // Should handle array content blocks (text field on each block) - expect(filterScript).toContain("Array.isArray"); - expect(filterScript).toContain("b.text"); + it("sanitizes runId with underscore to valid path component for log path", () => { + const badCtx = { + ...ctx, + runId: "run_123", + }; + const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod }); + // _ is stripped by sanitizeForK8sPath + expect(podLogPath).toContain("/run123.pod.ndjson"); }); }); }); diff --git a/src/server/job-manifest.ts b/src/server/job-manifest.ts index 6ccd370..1970edf 100644 --- a/src/server/job-manifest.ts +++ b/src/server/job-manifest.ts @@ -12,85 +12,18 @@ import { import { createHash } from "node:crypto"; import type { ClaudePromptBundle } from "./prompt-cache.js"; -/** - * Build the shell command prefix that installs a native Node.js PostToolUse - * hook into Claude Code's settings. The hook truncates oversized tool outputs - * before they reach the model — replacing the RTK binary init-container - * approach with a self-contained Node.js implementation. - * - * Both scripts are base64-encoded so they can be embedded in a sh -c command - * string without any quoting or escaping issues. - * - * @param maxOutputBytes Byte threshold above which tool output is truncated. - * @returns A shell command string (suitable for "&&"-chaining - * before the claude invocation). - */ -export function buildRtkSetupCommands(maxOutputBytes: number): string { - // --- Filter script ---------------------------------------------------------- - // This script runs as the PostToolUse hook inside every K8s Job pod. - // Claude Code writes the hook event as JSON to the script's stdin; the script - // truncates the tool_response/tool_result content when it exceeds the - // threshold and writes the (possibly modified) JSON to stdout. - // - // Field-name coverage: - // • tool_response — documented hook event format for PostToolUse - // • tool_result — alternative name seen in some Claude Code versions - // Content may be a plain string or an array of typed blocks (text/image/…). - const filterScript = [ - `const c=[];`, - `process.stdin.on('data',d=>c.push(d));`, - `process.stdin.on('end',()=>{`, - `const raw=Buffer.concat(c).toString('utf-8');`, - `let o;try{o=JSON.parse(raw);}catch{process.stdout.write(raw);return;}`, - `const MAX=${maxOutputBytes};`, - `function trunc(s){`, - `if(typeof s!=='string')return s;`, - `const b=Buffer.from(s,'utf-8');`, - `if(b.length<=MAX)return s;`, - `let e=MAX;if(e>0){let p=e-1;while(p>0&&(b[p]&0xC0)===0x80)p--;const l=b[p];let n=1;if((l&0xE0)===0xC0)n=2;else if((l&0xF0)===0xE0)n=3;else if((l&0xF8)===0xF0)n=4;if(p+n>e)e=p;}`, - `return b.slice(0,e).toString('utf-8')+'\\n[...'+(b.length-e)+' bytes truncated by paperclip-rtk]';`, - `}`, - `const tr=o&&(o.tool_response||o.tool_result);`, - `if(tr){`, - `if(typeof tr.content==='string'){tr.content=trunc(tr.content);}`, - `else if(Array.isArray(tr.content)){`, - `tr.content=tr.content.map(function(b){`, - `if(b&&typeof b==='object'&&typeof b.text==='string'){`, - `return Object.assign({},b,{text:trunc(b.text)});`, - `}return b;`, - `});`, - `}`, - `}`, - `process.stdout.write(JSON.stringify(o));`, - `});`, - ].join(""); +function assertSafePathComponent(field: string, value: string): void { + if (!/^[a-zA-Z0-9-]+$/.test(value)) { + throw new Error(`Invalid ${field} for log path: ${value}`); + } +} - // --- Settings script -------------------------------------------------------- - // Reads the existing ~/.claude/settings.json (if any), merges in the RTK - // PostToolUse hook, and writes the file back. All other settings sections - // are preserved; only PostToolUse is replaced so we own the full hook list - // for this run. - const settingsScript = [ - `const fs=require('fs'),pt=require('path');`, - `const p=pt.join(process.env.HOME,'.claude','settings.json');`, - `let s={};try{s=JSON.parse(fs.readFileSync(p,'utf-8'));}catch(e){}`, - `s.hooks=s.hooks||{};`, - `s.hooks.PostToolUse=[{matcher:'.*',hooks:[{type:'command',command:'node /tmp/.rtk-filter.js'}]}];`, - `fs.mkdirSync(pt.dirname(p),{recursive:true});`, - `fs.writeFileSync(p,JSON.stringify(s));`, - ].join(""); +function sanitizeForK8sPath(value: string): string { + return value.replace(/[^a-zA-Z0-9-]/g, ""); +} - // Encode as base64 so the strings can be embedded directly in a shell command - // without any quoting concerns (base64 alphabet: A-Za-z0-9+/=). - const filterB64 = Buffer.from(filterScript, "utf-8").toString("base64"); - const settingsB64 = Buffer.from(settingsScript, "utf-8").toString("base64"); - - return [ - // Write the filter script - `node -e "require('fs').writeFileSync('/tmp/.rtk-filter.js',Buffer.from('${filterB64}','base64').toString('utf-8'))"`, - // Install the Claude Code PostToolUse hook (merge into existing settings) - `node -e "eval(Buffer.from('${settingsB64}','base64').toString('utf-8'))"`, - ].join(" && "); +export function buildPodLogPath(companyId: string, agentId: string, runId: string): string { + return `/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`; } /** Prompts above this size (bytes) are staged via a Secret instead of an @@ -202,6 +135,8 @@ export interface JobBuildResult { promptSecret: PromptSecret | null; /** User-supplied extra labels that were dropped because they used a reserved prefix. */ skippedLabels: string[]; + /** Path to the pod log file on the shared PVC. */ + podLogPath: string; } function sanitizeForK8sName(value: string, maxLen = 16): string { @@ -353,8 +288,6 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { const nodeSelector = parseKeyValueConfig(config.nodeSelector); const tolerations = Array.isArray(config.tolerations) ? config.tolerations : []; const extraLabels = parseKeyValueConfig(config.labels); - const enableRtk = asBoolean(config.enableRtk, false); - const rtkMaxOutputBytes = asNumber(config.rtkMaxOutputBytes, 50000); // Resolve working directory — use workspace cwd, fall back to /paperclip const workspaceContext = parseObject(context.paperclipWorkspace); @@ -535,13 +468,15 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { // Build the claude command string for the main container const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" "); - const claudeInvocation = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`; - // When RTK output filtering is enabled, prepend the Node.js hook setup. - // This writes a filter script and a Claude Code settings file that installs - // it as a PostToolUse hook — no external binary or init container required. - const mainCommand = enableRtk - ? `${buildRtkSetupCommands(rtkMaxOutputBytes)} && ${claudeInvocation}` - : claudeInvocation; + const logPathCompanyId = sanitizeForK8sPath(agent.companyId); + const logPathAgentId = sanitizeForK8sPath(agent.id); + const logPathRunId = sanitizeForK8sPath(runId); + assertSafePathComponent("companyId", logPathCompanyId); + assertSafePathComponent("agentId", logPathAgentId); + assertSafePathComponent("runId", logPathRunId); + const podLogPath = buildPodLogPath(logPathCompanyId, logPathAgentId, logPathRunId); + const claudeInvocation = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped} | tee ${podLogPath}`; + const mainCommand = claudeInvocation; // Decide prompt delivery strategy: env var (small) or Secret volume (large). const promptBytes = Buffer.byteLength(prompt, "utf-8"); @@ -569,7 +504,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { name: "write-prompt", image: "busybox:1.36", imagePullPolicy: "IfNotPresent", - command: ["sh", "-c", "cp /tmp/prompt-secret/prompt.txt /tmp/prompt/prompt.txt"], + command: ["sh", "-c", `mkdir -p /paperclip/instances/default/run-logs/${agent.companyId}/${agent.id} && cp /tmp/prompt-secret/prompt.txt /tmp/prompt/prompt.txt`], volumeMounts: [ { name: "prompt", mountPath: "/tmp/prompt" }, { name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true }, @@ -584,7 +519,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { name: "write-prompt", image: "busybox:1.36", imagePullPolicy: "IfNotPresent", - command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"], + command: ["sh", "-c", `mkdir -p /paperclip/instances/default/run-logs/${agent.companyId}/${agent.id} && printf '%s' "$PROMPT_CONTENT" > /tmp/prompt/prompt.txt`], env: [{ name: "PROMPT_CONTENT", value: prompt }], volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }], securityContext, @@ -641,5 +576,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { }, }; - return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret, skippedLabels }; + return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret, skippedLabels, podLogPath }; } diff --git a/src/server/log-dedup.test.ts b/src/server/log-dedup.test.ts deleted file mode 100644 index 5c497f6..0000000 --- a/src/server/log-dedup.test.ts +++ /dev/null @@ -1,173 +0,0 @@ -import { describe, it, expect } from "vitest"; -import { LogLineDedupFilter, eventDedupKey } from "./log-dedup.js"; - -function assistantEvent(id: string, text: string): string { - return JSON.stringify({ - type: "assistant", - session_id: "sess_1", - message: { - id, - content: [{ type: "text", text }], - }, - }); -} - -function userToolResultEvent(toolUseId: string, content: string): string { - return JSON.stringify({ - type: "user", - session_id: "sess_1", - message: { - content: [{ type: "tool_result", tool_use_id: toolUseId, content }], - }, - }); -} - -function systemInitEvent(sessionId: string): string { - return JSON.stringify({ - type: "system", - subtype: "init", - session_id: sessionId, - model: "claude-opus-4-7", - }); -} - -function resultEvent(sessionId: string): string { - return JSON.stringify({ - type: "result", - subtype: "success", - session_id: sessionId, - result: "done", - total_cost_usd: 0.01, - usage: { input_tokens: 1, output_tokens: 1, cache_read_input_tokens: 0 }, - }); -} - -describe("eventDedupKey", () => { - it("keys assistant events by message.id", () => { - const key = eventDedupKey(JSON.parse(assistantEvent("msg_abc", "hi"))); - expect(key).toBe("assistant:msg_abc"); - }); - - it("keys user tool_result events by tool_use_id", () => { - const key = eventDedupKey(JSON.parse(userToolResultEvent("toolu_1", "ok"))); - expect(key).toBe("user:tool_result:toolu_1"); - }); - - it("keys system init events by session_id", () => { - const key = eventDedupKey(JSON.parse(systemInitEvent("sess_xyz"))); - expect(key).toBe("system:init:sess_xyz"); - }); - - it("keys result events by session_id", () => { - const key = eventDedupKey(JSON.parse(resultEvent("sess_xyz"))); - expect(key).toBe("result:sess_xyz"); - }); - - it("returns null for assistant events missing message.id", () => { - const event = { type: "assistant", message: { content: [] } }; - expect(eventDedupKey(event)).toBeNull(); - }); - - it("returns null for unknown event types", () => { - expect(eventDedupKey({ type: "unknown" })).toBeNull(); - expect(eventDedupKey({})).toBeNull(); - }); -}); - -describe("LogLineDedupFilter", () => { - it("passes unique lines through unchanged", () => { - const filter = new LogLineDedupFilter(); - const a = assistantEvent("msg_1", "hello"); - const b = assistantEvent("msg_2", "world"); - expect(filter.filter(`${a}\n${b}\n`)).toBe(`${a}\n${b}\n`); - }); - - it("drops assistant events replayed with the same message.id", () => { - const filter = new LogLineDedupFilter(); - const a = assistantEvent("msg_1", "Three nits to fix."); - filter.filter(`${a}\n`); - expect(filter.filter(`${a}\n`)).toBe(""); - }); - - it("drops user tool_result events replayed with the same tool_use_id", () => { - const filter = new LogLineDedupFilter(); - const a = userToolResultEvent("toolu_abc", "file contents"); - filter.filter(`${a}\n`); - expect(filter.filter(`${a}\n`)).toBe(""); - }); - - it("drops system init and result events on replay", () => { - const filter = new LogLineDedupFilter(); - const init = systemInitEvent("sess_1"); - const result = resultEvent("sess_1"); - filter.filter(`${init}\n${result}\n`); - expect(filter.filter(`${init}\n${result}\n`)).toBe(""); - }); - - it("buffers incomplete trailing lines across chunks", () => { - const filter = new LogLineDedupFilter(); - const line = assistantEvent("msg_1", "hello"); - const mid = Math.floor(line.length / 2); - const out1 = filter.filter(line.slice(0, mid)); - const out2 = filter.filter(line.slice(mid) + "\n"); - expect(out1).toBe(""); - expect(out2).toBe(`${line}\n`); - }); - - it("flush() emits a final incomplete line that was not replayed", () => { - const filter = new LogLineDedupFilter(); - const line = assistantEvent("msg_tail", "no newline"); - filter.filter(line); - expect(filter.flush()).toBe(line); - }); - - it("flush() drops an incomplete line that was already seen with a newline", () => { - const filter = new LogLineDedupFilter(); - const line = assistantEvent("msg_same", "x"); - filter.filter(`${line}\n`); - filter.filter(line); - expect(filter.flush()).toBe(""); - }); - - it("passes non-JSON lines through every time (does not dedup paperclip status)", () => { - const filter = new LogLineDedupFilter(); - const status = "[paperclip] keepalive — job foo running\n"; - expect(filter.filter(status)).toBe(status); - expect(filter.filter(status)).toBe(status); - }); - - it("dedups structurally identical JSON with identical content (raw fallback)", () => { - const filter = new LogLineDedupFilter(); - // No recognized type → raw fallback key. - const line = JSON.stringify({ foo: "bar", baz: 1 }); - filter.filter(`${line}\n`); - expect(filter.filter(`${line}\n`)).toBe(""); - }); - - it("handles multiple complete lines in a single chunk with partial trailing", () => { - const filter = new LogLineDedupFilter(); - const a = assistantEvent("msg_a", "a"); - const b = assistantEvent("msg_b", "b"); - const c = assistantEvent("msg_c", "c"); - // a and b are complete, c is partial (no trailing newline). - const out = filter.filter(`${a}\n${b}\n${c}`); - expect(out).toBe(`${a}\n${b}\n`); - // Completing c later should emit exactly c. - expect(filter.filter("\n")).toBe(`${c}\n`); - }); - - it("drops the classic FAR-123 replay scenario across reconnects", () => { - const filter = new LogLineDedupFilter(); - const assistantNits = assistantEvent("msg_nits", "Three nits to fix. Let me look at an existing test file..."); - const assistantWrite = assistantEvent("msg_write", "Now I need to write unit tests"); - // First stream attempt emits both events. - const out1 = filter.filter(`${assistantNits}\n${assistantWrite}\n`); - expect(out1).toBe(`${assistantNits}\n${assistantWrite}\n`); - // Reconnect replays both within the sinceSeconds overlap — filter should drop them. - const out2 = filter.filter(`${assistantNits}\n${assistantWrite}\n`); - expect(out2).toBe(""); - // And a genuinely new event after the replay should still pass through. - const assistantFresh = assistantEvent("msg_fresh", "next turn"); - expect(filter.filter(`${assistantFresh}\n`)).toBe(`${assistantFresh}\n`); - }); -}); diff --git a/src/server/log-dedup.ts b/src/server/log-dedup.ts deleted file mode 100644 index 31a318a..0000000 --- a/src/server/log-dedup.ts +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Line-level dedup filter for the K8s log stream. - * - * The K8s log follow stream can reconnect with an overlapping `sinceSeconds` - * window (integer-second granularity + a safety buffer), which replays a few - * seconds of recent output on every reconnect. Without dedup those replayed - * lines appear as duplicate events in the streaming UI — the same assistant - * text block shows up between every subsequent tool call (FAR-123). - * - * The filter operates at the chunk → line level: chunks are split on `\n`, - * incomplete trailing content is buffered until the next chunk, and each - * complete line is emitted at most once. JSON-shaped Claude stream-json - * events are keyed by their stable structural IDs; non-JSON lines pass - * through unchanged so genuinely-repeated status lines are not swallowed. - */ - -type Parsed = Record; - -function asString(value: unknown): string { - return typeof value === "string" ? value : ""; -} - -function asRecord(value: unknown): Parsed | null { - if (typeof value !== "object" || value === null || Array.isArray(value)) return null; - return value as Parsed; -} - -/** - * Build a stable dedup key for a Claude stream-json event. Returns `null` - * when the event is not a recognized Claude event — those lines fall back to - * raw-content hashing so non-JSON output (paperclip status lines, shell - * output) is never deduped by identity. - */ -export function eventDedupKey(event: Parsed): string | null { - const type = asString(event.type); - - if (type === "system") { - const subtype = asString(event.subtype); - const sessionId = asString(event.session_id); - if (subtype === "init" && sessionId) return `system:init:${sessionId}`; - return null; - } - - if (type === "assistant") { - const message = asRecord(event.message); - const id = message ? asString(message.id) : ""; - if (id) return `assistant:${id}`; - return null; - } - - if (type === "user") { - const message = asRecord(event.message); - const content = message && Array.isArray(message.content) ? message.content : []; - const toolUseIds: string[] = []; - for (const entry of content) { - const block = asRecord(entry); - if (!block) continue; - const toolUseId = asString(block.tool_use_id); - if (toolUseId) toolUseIds.push(toolUseId); - } - if (toolUseIds.length > 0) return `user:tool_result:${toolUseIds.join(",")}`; - return null; - } - - if (type === "result") { - const sessionId = asString(event.session_id); - return sessionId ? `result:${sessionId}` : "result:unknown"; - } - - return null; -} - -/** - * Stateful line-level dedup filter. Emits `filter(chunk)` output through - * the caller — preserves original chunk formatting (including trailing - * newlines) for lines that pass the dedup check. - */ -export class LogLineDedupFilter { - private buffer = ""; - private readonly seenKeys = new Set(); - - /** - * Process a chunk and return the subset that should be forwarded. - * Incomplete trailing content (no terminating newline) is buffered and - * emitted on the next chunk that completes the line (or on flush()). - */ - filter(chunk: string): string { - if (!chunk) return ""; - const combined = this.buffer + chunk; - const endsWithNewline = combined.endsWith("\n"); - const parts = combined.split("\n"); - - if (endsWithNewline) { - // Discard the final empty element — last line was complete. - parts.pop(); - this.buffer = ""; - } else { - // Last element is an incomplete line — hold it for the next chunk. - this.buffer = parts.pop() ?? ""; - } - - const out: string[] = []; - for (const line of parts) { - if (this.shouldEmit(line)) out.push(line); - } - if (out.length === 0) return ""; - return out.join("\n") + "\n"; - } - - /** - * Flush any incomplete trailing content. Called when the stream ends - * without a terminating newline so the final partial line isn't lost. - */ - flush(): string { - const pending = this.buffer; - this.buffer = ""; - if (!pending) return ""; - return this.shouldEmit(pending) ? pending : ""; - } - - private shouldEmit(line: string): boolean { - const trimmed = line.trim(); - if (!trimmed) return true; - - // Only attempt dedup on JSON-shaped lines; pass shell/text output through. - if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true; - - let parsed: unknown; - try { - parsed = JSON.parse(trimmed); - } catch { - return true; - } - - const event = asRecord(parsed); - if (!event) return true; - - // Recognized Claude stream-json event → structural key. - const structuralKey = eventDedupKey(event); - const key = structuralKey ?? `raw:${trimmed}`; - - if (this.seenKeys.has(key)) return false; - this.seenKeys.add(key); - return true; - } -}