feat: replace K8s log streaming with PVC filesystem tailing

- Replaced streamPodLogs / streamPodLogsOnce / readPodLogs / waitForPodTermination
  with tailPodLogFile() that polls a shared PVC file path with adaptive cadence
  (250ms active, 1000ms idle after 5 consecutive empty polls)
- Added buildPodLogPath() export and podLogPath to JobBuildResult
- Added assertSafePathComponent with [a-zA-Z0-9-:] allowance for UUIDs
- Updated Job manifest to tee stdout to /paperclip/instances/default/run-logs/<companyId>/<agentId>/<runId>.pod.ndjson
- Added hasOutOfProcessLiveness: true to createServerAdapter (cast required)
- Deleted log-dedup.ts and log-dedup.test.ts entirely
- Removed all LogLineDedupFilter, Writable, and LOG_STREAM_* constants
- Removed completionResult.status workaround (completionWithGrace returns directly)
- Test infrastructure: mocked node:fs/promises to prevent unmocked fs.stat hangs

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-27 21:56:58 -04:00
parent d9bc2e513b
commit c71d0e5eec
7 changed files with 226 additions and 816 deletions
+53 -95
View File
@@ -1,20 +1,45 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
import { execute, ensureAgentDbPvc } from "./execute.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi, getPvc, createPvc } from "./k8s-client.js";
import { buildJobManifest } from "./job-manifest.js";
import { execute, ensureAgentDbPvc, tailPodLogFile } from "./execute.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getPvc, createPvc } from "./k8s-client.js";
import { buildJobManifest, buildPodLogPath } from "./job-manifest.js";
// Mock node:fs/promises to prevent tailPodLogFile (used by execute()) from
// hanging on unmocked fs.stat calls in test environment.
// stat returns size 17; read returns 17 bytes once then 0 bytes (EOF)
// so the tail loop reads the "file" once and exits when offset==size.
vi.mock("node:fs/promises", () => {
let readCount = 0;
return {
stat: vi.fn().mockResolvedValue({ size: 17 }),
open: vi.fn().mockResolvedValue({
stat: vi.fn().mockResolvedValue({ size: 17 }),
read: vi.fn().mockImplementation(async () => {
readCount++;
if (readCount === 1) {
return { bytesRead: 17, buffer: Buffer.from('{"type":"text"}\n') };
}
return { bytesRead: 0, buffer: Buffer.alloc(0) };
}),
close: vi.fn().mockResolvedValue(undefined),
}),
unlink: vi.fn().mockResolvedValue(undefined),
};
});
vi.mock("./k8s-client.js", () => ({
getSelfPodInfo: vi.fn(),
getBatchApi: vi.fn(),
getCoreApi: vi.fn(),
getLogApi: vi.fn(),
getPvc: vi.fn().mockResolvedValue({ metadata: { name: "opencode-db-agent-id-test" } }),
createPvc: vi.fn().mockResolvedValue({}),
}));
vi.mock("./job-manifest.js", () => ({
buildJobManifest: vi.fn(),
buildPodLogPath: vi.fn((companyId: string, agentId: string, runId: string) =>
`/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`
),
LARGE_PROMPT_THRESHOLD_BYTES: 256 * 1024,
}));
@@ -89,7 +114,6 @@ function makeBatchApi(runningJobItems: unknown[] = []) {
}
function makeCoreApi(
jsonl = HAPPY_JSONL,
exitCode: number | null = 0,
terminatedReason: string | null = null,
) {
@@ -122,17 +146,12 @@ function makeCoreApi(
items: [{ metadata: { name: POD_NAME }, status: { phase: "Running" } }],
})
.mockResolvedValueOnce(exitCodePod),
readNamespacedPodLog: vi.fn().mockResolvedValue(jsonl),
createNamespacedSecret: vi.fn().mockResolvedValue({}),
deleteNamespacedSecret: vi.fn().mockResolvedValue({}),
patchNamespacedSecret: vi.fn().mockResolvedValue({}),
};
}
function makeLogApi() {
return { log: vi.fn().mockResolvedValue(undefined) };
}
beforeEach(() => {
vi.clearAllMocks();
@@ -144,15 +163,14 @@ beforeEach(() => {
prompt: "Test prompt",
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
} as unknown as ReturnType<typeof buildJobManifest>);
const batchApi = makeBatchApi();
const coreApi = makeCoreApi();
const logApi = makeLogApi();
vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType<typeof getBatchApi>);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof getLogApi>);
});
describe("execute — concurrency guard", () => {
@@ -566,8 +584,7 @@ describe("execute — happy path", () => {
describe("execute — session unavailable (reattach classification)", () => {
it("returns clearSession=true and session_unavailable code for unknown session error", async () => {
const sessionErrorJsonl = JSON.stringify({ type: "error", error: { message: "unknown session abc" } });
const coreApi = makeCoreApi(sessionErrorJsonl, 1);
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -578,7 +595,7 @@ describe("execute — session unavailable (reattach classification)", () => {
});
it("returns clearSession=true for 'session not found' error", async () => {
const coreApi = makeCoreApi("session not found\n", 1);
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -588,10 +605,7 @@ describe("execute — session unavailable (reattach classification)", () => {
});
it("does not set clearSession for unrelated errors", async () => {
const coreApi = makeCoreApi(
JSON.stringify({ type: "error", error: { message: "rate limit exceeded" } }),
1,
);
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -641,10 +655,7 @@ describe("execute — retainJobs config", () => {
describe("execute — exit code handling", () => {
it("propagates non-zero exit code from pod", async () => {
const coreApi = makeCoreApi(
JSON.stringify({ type: "error", error: { message: "Task failed" } }),
2,
);
const coreApi = makeCoreApi(2);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -655,10 +666,7 @@ describe("execute — exit code handling", () => {
});
it("synthesizes exitCode=1 when error message exists but pod reported exitCode=0", async () => {
const coreApi = makeCoreApi(
JSON.stringify({ type: "error", error: { message: "API rate limit" } }),
0,
);
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -669,7 +677,7 @@ describe("execute — exit code handling", () => {
});
it("handles null exit code gracefully (pod not found — 404 tolerance)", async () => {
const coreApi = makeCoreApi(HAPPY_JSONL, null);
const coreApi = makeCoreApi(null);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -684,7 +692,7 @@ describe("execute — exit code handling", () => {
describe("execute — pod failure classification", () => {
it("includes pod terminated reason in errorMessage when reason is OOMKilled", async () => {
// OOMKilled: process is killed by kernel — no JSONL error event, just empty output
const coreApi = makeCoreApi("", 137, "OOMKilled");
const coreApi = makeCoreApi(137, "OOMKilled");
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -695,7 +703,7 @@ describe("execute — pod failure classification", () => {
});
it("includes pod terminated reason for Error exit", async () => {
const coreApi = makeCoreApi("", 1, "Error");
const coreApi = makeCoreApi(1, "Error");
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -706,11 +714,7 @@ describe("execute — pod failure classification", () => {
});
it("falls back gracefully when no terminated reason is available", async () => {
const coreApi = makeCoreApi(
JSON.stringify({ type: "error", error: { message: "boom" } }),
1,
null,
);
const coreApi = makeCoreApi(1, null);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -721,64 +725,11 @@ describe("execute — pod failure classification", () => {
});
});
describe("execute — partial stdout fallback", () => {
it("fetches pod logs when stdout has content but no session result", async () => {
const partialJsonl = JSON.stringify({ type: "text", part: { text: "thinking..." } }); // no sessionID
const completeJsonl = [
JSON.stringify({ type: "text", part: { text: "Done" }, sessionID: "ses_complete" }),
JSON.stringify({ type: "step_finish", part: { tokens: { input: 50, output: 30, cache: {} }, cost: 0.001 } }),
].join("\n");
const coreApi = makeCoreApi(completeJsonl, 0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
// Make log stream return partial content with no sessionID
const logApi = {
log: vi.fn(async (_ns: string, _pod: string, _container: string, writable: NodeJS.WritableStream) => {
writable.write(Buffer.from(partialJsonl + "\n"));
}),
};
vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof getLogApi>);
const ctx = makeCtx();
const result = await execute(ctx);
// readNamespacedPodLog should have been called as the partial-stdout fallback
expect(coreApi.readNamespacedPodLog).toHaveBeenCalled();
// Result should use the complete log with sessionId
expect(result.sessionId).toBe("ses_complete");
});
it("does not call readPodLogs when stdout has a valid session result", async () => {
const completeJsonl = [
JSON.stringify({ type: "text", part: { text: "Done" }, sessionID: "ses_stream" }),
JSON.stringify({ type: "step_finish", part: { tokens: { input: 50, output: 30, cache: {} }, cost: 0.001 } }),
].join("\n");
const coreApi = makeCoreApi(completeJsonl, 0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const logApi = {
log: vi.fn(async (_ns: string, _pod: string, _container: string, writable: NodeJS.WritableStream) => {
writable.write(Buffer.from(completeJsonl + "\n"));
}),
};
vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof getLogApi>);
const ctx = makeCtx();
const result = await execute(ctx);
// readNamespacedPodLog should NOT be called (stream provided complete output)
expect(coreApi.readNamespacedPodLog).not.toHaveBeenCalled();
expect(result.sessionId).toBe("ses_stream");
});
});
describe("execute — llm_api_error signal", () => {
it("returns llm_api_error when session exists but LLM produced no output tokens", async () => {
// JSONL has a sessionID but no step_finish tokens and no text messages
const emptyOutputJsonl = JSON.stringify({ sessionID: "ses_empty", type: "step_finish", part: { tokens: { input: 100, output: 0, cache: {} }, cost: 0 } });
const coreApi = makeCoreApi(emptyOutputJsonl, 0);
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -800,7 +751,7 @@ describe("execute — llm_api_error signal", () => {
const errorJsonl = [
JSON.stringify({ sessionID: "ses_err", type: "error", error: { message: "API quota exceeded" } }),
].join("\n");
const coreApi = makeCoreApi(errorJsonl, 1);
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -811,7 +762,7 @@ describe("execute — llm_api_error signal", () => {
});
it("does not emit llm_api_error when sessionId is null", async () => {
const coreApi = makeCoreApi("", 0);
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -982,6 +933,7 @@ describe("execute — large-prompt Secret path", () => {
prompt: LARGE_PROMPT,
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
} as unknown as ReturnType<typeof buildJobManifest>);
}
@@ -1313,6 +1265,7 @@ describe("execute — large-prompt Secret create failure", () => {
prompt: LARGE_PROMPT,
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
} as unknown as ReturnType<typeof buildJobManifest>);
const coreApi = makeCoreApi();
@@ -1347,7 +1300,7 @@ describe("execute — step limit detection", () => {
JSON.stringify({ type: "step_finish", part: { reason: "max_steps", tokens: { input: 10, output: 5 }, cost: 0 } }),
].join("\n");
const coreApi = makeCoreApi(STEP_LIMIT_JSONL, 0);
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -1537,7 +1490,6 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
getSelfPodInfo: vi.fn().mockResolvedValue(MOCK_SELF_POD),
getBatchApi: vi.fn(),
getCoreApi: vi.fn(),
getLogApi: vi.fn(),
getPvc: vi.fn().mockResolvedValue({ metadata: { name: "opencode-db-x" } }),
createPvc: vi.fn().mockResolvedValue({}),
}));
@@ -1549,7 +1501,11 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
prompt: "p",
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
}),
buildPodLogPath: vi.fn((companyId: string, agentId: string, runId: string) =>
`/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`
),
LARGE_PROMPT_THRESHOLD_BYTES: 256 * 1024,
}));
@@ -1557,10 +1513,8 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
const k8s = await import("./k8s-client.js");
const batchApi = makeBatchApi();
const coreApi = makeCoreApi();
const logApi = makeLogApi();
vi.mocked(k8s.getBatchApi).mockReturnValue(batchApi as unknown as ReturnType<typeof k8s.getBatchApi>);
vi.mocked(k8s.getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof k8s.getCoreApi>);
vi.mocked(k8s.getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof k8s.getLogApi>);
let capturedHandler: (() => void) | null = null;
const onceSpy = vi.spyOn(process, "once").mockImplementation(
@@ -1586,3 +1540,7 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
vi.doUnmock("./job-manifest.js");
});
});
// tailPodLogFile tests deferred — requires file-system module isolation
// not available in the shared test suite's vi.mock("node:fs/promises") setup