fix(server): make tailPodLogFile exit on job completion + port c8429cf

- Run tailPodLogFile and waitForJobCompletion in parallel via Promise.allSettled;
  completion sets stopSignal.stopped so the tail loop drains and exits. Without
  this, tailPodLogFile loops forever — the only natural exit was fh.stat()
  throwing on file removal, which never happened during normal job completion.
- Restructure tail loop to read-then-sleep, with a final drain after stopSignal
  is set to capture bytes written between the last poll and terminal state.
- Port the c8429cf fix from paperclip-adapter-claude-k8s:
  * buildPodLogPath now writes to /paperclip/instances/default/data/run-logs/...
    to match the server PVC layout (the /data/ segment was missing).
  * Drop the mkdir -p ... && from both init container command variants — the
    PVC isn't mounted in the init container, so the mkdir was failing with
    exit code 1 and the && short-circuit prevented the prompt copy.
- Test infrastructure:
  * Hoisted fs/promises mock now uses importOriginal so readFile (used for
    skill bundle loading) hits the real implementation.
  * setMockJsonl() lets individual tests inject specific JSONL into the tail's
    read buffer (previously dead constants in the test file).
  * fh.read mock now writes into the caller's buffer instead of returning a
    separate one.
- Add src/server/test.test.ts covering testEnvironment (was 0% → 98.5% stmts).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-30 14:57:40 -04:00
parent bc340bfcc9
commit e3af8aa83b
6 changed files with 296 additions and 66 deletions
+47 -27
View File
@@ -4,33 +4,47 @@ import { execute, ensureAgentDbPvc, tailPodLogFile } from "./execute.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getPvc, createPvc } from "./k8s-client.js";
import { buildJobManifest, buildPodLogPath } from "./job-manifest.js";
// Mock node:fs/promises to prevent tailPodLogFile (used by execute()) from
// hanging on unmocked fs.stat calls in test environment.
// vi.hoisted creates shared module-level state; beforeEach resets it so every
// test gets a clean first-read-success.
const { readMock, resetFsMocks } = vi.hoisted(() => {
// Mock node:fs/promises so tailPodLogFile (used by execute()) reads a
// configurable JSONL payload and returns. Individual tests override the
// payload via setMockJsonl(...) before calling execute().
const { readMock, statMock, fhStatMock, resetFsMocks, setMockJsonl } = vi.hoisted(() => {
const HAPPY = [
JSON.stringify({ type: "text", part: { text: "Task complete" }, sessionID: "ses_happy" }),
JSON.stringify({ type: "step_finish", part: { tokens: { input: 100, output: 50, cache: { read: 20 } }, cost: 0.002 } }),
].join("\n");
let payload = HAPPY;
let buffer = Buffer.from(payload);
let readOffset = 0;
const apply = (next: string) => { payload = next; buffer = Buffer.from(payload); readOffset = 0; };
return {
readMock: vi.fn().mockImplementation(async () => {
if (readOffset === 0) {
readOffset = 17;
return { bytesRead: 17, buffer: Buffer.from('{"type":"text"}\n') };
}
return { bytesRead: 0, buffer: Buffer.alloc(0) };
readMock: vi.fn().mockImplementation(async (buf: Buffer, off: number, len: number, _pos: number) => {
if (readOffset >= buffer.byteLength) return { bytesRead: 0, buffer: buf };
const remaining = buffer.byteLength - readOffset;
const toRead = Math.min(len, remaining);
buffer.copy(buf, off, readOffset, readOffset + toRead);
readOffset += toRead;
return { bytesRead: toRead, buffer: buf };
}),
resetFsMocks: () => { readOffset = 0; },
statMock: vi.fn().mockImplementation(async () => ({ size: buffer.byteLength })),
fhStatMock: vi.fn().mockImplementation(async () => ({ size: buffer.byteLength })),
resetFsMocks: () => { apply(HAPPY); },
setMockJsonl: (jsonl: string) => { apply(jsonl); },
};
});
vi.mock("node:fs/promises", () => ({
stat: vi.fn().mockResolvedValue({ size: 17 }),
open: vi.fn().mockResolvedValue({
stat: vi.fn().mockResolvedValue({ size: 17 }),
read: readMock,
close: vi.fn().mockResolvedValue(undefined),
}),
unlink: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("node:fs/promises", async (importOriginal) => {
const actual = await importOriginal<typeof import("node:fs/promises")>();
return {
...actual,
stat: statMock,
open: vi.fn().mockResolvedValue({
stat: fhStatMock,
read: readMock,
close: vi.fn().mockResolvedValue(undefined),
}),
unlink: vi.fn().mockResolvedValue(undefined),
};
});
vi.mock("./k8s-client.js", () => ({
getSelfPodInfo: vi.fn(),
@@ -43,7 +57,7 @@ vi.mock("./k8s-client.js", () => ({
vi.mock("./job-manifest.js", () => ({
buildJobManifest: vi.fn(),
buildPodLogPath: vi.fn((companyId: string, agentId: string, runId: string) =>
`/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`
`/paperclip/instances/default/data/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`
),
LARGE_PROMPT_THRESHOLD_BYTES: 256 * 1024,
}));
@@ -169,7 +183,7 @@ beforeEach(() => {
prompt: "Test prompt",
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
} as unknown as ReturnType<typeof buildJobManifest>);
const batchApi = makeBatchApi();
@@ -590,6 +604,7 @@ describe("execute — happy path", () => {
describe("execute — session unavailable (reattach classification)", () => {
it("returns clearSession=true and session_unavailable code for unknown session error", async () => {
setMockJsonl(JSON.stringify({ type: "error", error: { message: "Unknown session ses_xxx" } }));
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
@@ -601,6 +616,7 @@ describe("execute — session unavailable (reattach classification)", () => {
});
it("returns clearSession=true for 'session not found' error", async () => {
setMockJsonl(JSON.stringify({ type: "error", error: { message: "Session ses_xxx not found" } }));
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
@@ -672,6 +688,7 @@ describe("execute — exit code handling", () => {
});
it("synthesizes exitCode=1 when error message exists but pod reported exitCode=0", async () => {
setMockJsonl(JSON.stringify({ type: "error", error: { message: "something went wrong" } }));
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
@@ -735,6 +752,7 @@ describe("execute — llm_api_error signal", () => {
it("returns llm_api_error when session exists but LLM produced no output tokens", async () => {
// JSONL has a sessionID but no step_finish tokens and no text messages
const emptyOutputJsonl = JSON.stringify({ sessionID: "ses_empty", type: "step_finish", part: { tokens: { input: 100, output: 0, cache: {} }, cost: 0 } });
setMockJsonl(emptyOutputJsonl);
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
@@ -757,6 +775,7 @@ describe("execute — llm_api_error signal", () => {
const errorJsonl = [
JSON.stringify({ sessionID: "ses_err", type: "error", error: { message: "API quota exceeded" } }),
].join("\n");
setMockJsonl(errorJsonl);
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
@@ -939,7 +958,7 @@ describe("execute — large-prompt Secret path", () => {
prompt: LARGE_PROMPT,
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
} as unknown as ReturnType<typeof buildJobManifest>);
}
@@ -1271,7 +1290,7 @@ describe("execute — large-prompt Secret create failure", () => {
prompt: LARGE_PROMPT,
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
} as unknown as ReturnType<typeof buildJobManifest>);
const coreApi = makeCoreApi();
@@ -1305,6 +1324,7 @@ describe("execute — step limit detection", () => {
JSON.stringify({ type: "text", part: { text: "partial" }, sessionID: "ses_step" }),
JSON.stringify({ type: "step_finish", part: { reason: "max_steps", tokens: { input: 10, output: 5 }, cost: 0 } }),
].join("\n");
setMockJsonl(STEP_LIMIT_JSONL);
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
@@ -1507,10 +1527,10 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
prompt: "p",
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
}),
buildPodLogPath: vi.fn((companyId: string, agentId: string, runId: string) =>
`/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`
`/paperclip/instances/default/data/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`
),
LARGE_PROMPT_THRESHOLD_BYTES: 256 * 1024,
}));