Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| abdce817f3 | |||
| f9d8a2e0ce | |||
| a7dfd5d502 |
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.35",
|
||||
"version": "0.1.36",
|
||||
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
|
||||
"license": "MIT",
|
||||
"repository": {
|
||||
|
||||
+875
-6
@@ -1,20 +1,51 @@
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import type { Writable } from "node:stream";
|
||||
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
|
||||
|
||||
// Mock the K8s client before importing execute so streamPodLogsOnce picks up
|
||||
// the mocked getLogApi. The mock's logApi.log never resolves, simulating the
|
||||
// FAR-10 hang: K8s API drops the connection but the client awaits forever.
|
||||
// All K8s API mock functions — declared before vi.mock() so the factory can
|
||||
// reference them. The mock's logApi.log default is a never-resolving promise,
|
||||
// simulating the FAR-10 hang where K8s API drops the connection indefinitely.
|
||||
const mockLogFn = vi.fn();
|
||||
const mockGetSelfPodInfo = vi.fn();
|
||||
const mockBatchListJobs = vi.fn();
|
||||
const mockBatchCreateJob = vi.fn();
|
||||
const mockBatchReadJob = vi.fn();
|
||||
const mockBatchDeleteJob = vi.fn();
|
||||
const mockBatchPatchJob = vi.fn();
|
||||
const mockCoreListPods = vi.fn();
|
||||
const mockCoreReadPodLog = vi.fn();
|
||||
const mockCoreCreateSecret = vi.fn();
|
||||
const mockCorePatchSecret = vi.fn();
|
||||
const mockCoreDeleteSecret = vi.fn();
|
||||
|
||||
vi.mock("./k8s-client.js", () => ({
|
||||
getLogApi: () => ({ log: mockLogFn }),
|
||||
getBatchApi: () => ({}),
|
||||
getCoreApi: () => ({}),
|
||||
getBatchApi: () => ({
|
||||
listNamespacedJob: mockBatchListJobs,
|
||||
createNamespacedJob: mockBatchCreateJob,
|
||||
readNamespacedJob: mockBatchReadJob,
|
||||
deleteNamespacedJob: mockBatchDeleteJob,
|
||||
patchNamespacedJob: mockBatchPatchJob,
|
||||
}),
|
||||
getCoreApi: () => ({
|
||||
listNamespacedPod: mockCoreListPods,
|
||||
readNamespacedPodLog: mockCoreReadPodLog,
|
||||
createNamespacedSecret: mockCoreCreateSecret,
|
||||
patchNamespacedSecret: mockCorePatchSecret,
|
||||
deleteNamespacedSecret: mockCoreDeleteSecret,
|
||||
}),
|
||||
getAuthzApi: () => ({}),
|
||||
getSelfPodInfo: vi.fn(),
|
||||
getSelfPodInfo: mockGetSelfPodInfo,
|
||||
resetCache: vi.fn(),
|
||||
}));
|
||||
|
||||
const mockPrepareBundle = vi.fn();
|
||||
vi.mock("./prompt-cache.js", () => ({
|
||||
prepareClaudePromptBundle: mockPrepareBundle,
|
||||
}));
|
||||
|
||||
|
||||
const { isK8s404, buildPartialRunError, classifyOrphan, describePodTerminatedError, streamPodLogsOnce, execute } = await import("./execute.js");
|
||||
|
||||
function makeJob(opts: {
|
||||
@@ -349,3 +380,841 @@ describe("streamPodLogsOnce bail timer", () => {
|
||||
expect(mockLogFn).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
// ─── Helpers shared across execute() integration tests ───────────────────────
|
||||
|
||||
function makeCtx(overrides: Partial<AdapterExecutionContext> = {}): AdapterExecutionContext {
|
||||
return {
|
||||
runId: "run-test-001",
|
||||
agent: {
|
||||
id: "agent-abc",
|
||||
companyId: "co1",
|
||||
name: "Test Agent",
|
||||
adapterType: "claude_k8s",
|
||||
adapterConfig: {},
|
||||
},
|
||||
runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null },
|
||||
config: {},
|
||||
context: {},
|
||||
onLog: vi.fn().mockResolvedValue(undefined),
|
||||
...overrides,
|
||||
} as unknown as AdapterExecutionContext;
|
||||
}
|
||||
|
||||
function makeSelfPodResult() {
|
||||
return {
|
||||
namespace: "paperclip",
|
||||
image: "paperclipai/paperclip:latest",
|
||||
imagePullSecrets: [],
|
||||
dnsConfig: undefined,
|
||||
pvcClaimName: "paperclip-data",
|
||||
secretVolumes: [],
|
||||
inheritedEnv: {},
|
||||
inheritedEnvValueFrom: [],
|
||||
inheritedEnvFrom: [],
|
||||
};
|
||||
}
|
||||
|
||||
function makeBundle() {
|
||||
return {
|
||||
bundleKey: "test-bundle",
|
||||
rootDir: "/tmp/test-bundle",
|
||||
addDir: "/tmp/test-bundle",
|
||||
instructionsFilePath: null,
|
||||
};
|
||||
}
|
||||
|
||||
// Valid minimal Claude stream-json output used in happy-path tests.
|
||||
const CLAUDE_HAPPY_OUTPUT = [
|
||||
JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_test123" }),
|
||||
JSON.stringify({
|
||||
type: "result",
|
||||
subtype: "success",
|
||||
result: "Done.",
|
||||
session_id: "sess_test123",
|
||||
usage: { input_tokens: 100, output_tokens: 50, cache_read_input_tokens: 10 },
|
||||
total_cost_usd: 0.001,
|
||||
}),
|
||||
].join("\n") + "\n";
|
||||
|
||||
// ─── execute: concurrency guard paths ────────────────────────────────────────
|
||||
|
||||
describe("execute: concurrency guard", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
|
||||
});
|
||||
|
||||
it("returns k8s_concurrency_guard_unreachable when listNamespacedJob throws", async () => {
|
||||
mockBatchListJobs.mockRejectedValue(new Error("K8s API unavailable"));
|
||||
const result = await execute(makeCtx());
|
||||
expect(result.errorCode).toBe("k8s_concurrency_guard_unreachable");
|
||||
expect(result.errorMessage).toContain("K8s API unavailable");
|
||||
});
|
||||
|
||||
it("returns k8s_concurrent_run_blocked when reattach disabled and orphan is running", async () => {
|
||||
const orphan = makeJob({ runId: "prior-run", agentId: "agent-abc", terminal: false });
|
||||
mockBatchListJobs.mockResolvedValue({ items: [orphan] });
|
||||
const result = await execute(makeCtx({ config: { reattachOrphanedJobs: false } } as Partial<AdapterExecutionContext>));
|
||||
expect(result.errorCode).toBe("k8s_concurrent_run_blocked");
|
||||
expect(result.errorMessage).toContain("reattach disabled");
|
||||
});
|
||||
|
||||
it("returns k8s_orphan_task_unknown when orphan has no task label", async () => {
|
||||
// No taskId on the orphan job and no taskId in context → block_task_unknown
|
||||
const orphan = makeJob({ runId: "prior-run", agentId: "agent-abc" }); // no taskId label
|
||||
mockBatchListJobs.mockResolvedValue({ items: [orphan] });
|
||||
// context.taskId absent → currentTaskLabel = null → block_task_unknown
|
||||
const result = await execute(makeCtx());
|
||||
expect(result.errorCode).toBe("k8s_orphan_task_unknown");
|
||||
});
|
||||
|
||||
it("returns k8s_concurrent_run_blocked when orphan task-id mismatches current task", async () => {
|
||||
const orphan = makeJob({ runId: "prior-run", agentId: "agent-abc", taskId: "task-other" });
|
||||
mockBatchListJobs.mockResolvedValue({ items: [orphan] });
|
||||
const result = await execute(
|
||||
makeCtx({ context: { taskId: "task-current" } } as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
expect(result.errorCode).toBe("k8s_concurrent_run_blocked");
|
||||
expect(result.errorMessage).toContain("different task");
|
||||
});
|
||||
|
||||
it("returns k8s_orphan_session_mismatch when task matches but session differs", async () => {
|
||||
const orphan = makeJob({
|
||||
runId: "prior-run",
|
||||
agentId: "agent-abc",
|
||||
taskId: "task-match",
|
||||
sessionId: "sess-other",
|
||||
});
|
||||
mockBatchListJobs.mockResolvedValue({ items: [orphan] });
|
||||
const result = await execute(
|
||||
makeCtx({
|
||||
context: { taskId: "task-match" },
|
||||
runtime: { sessionId: "sess-current", sessionParams: null, sessionDisplayId: null, taskKey: null },
|
||||
} as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
expect(result.errorCode).toBe("k8s_orphan_session_mismatch");
|
||||
expect(result.errorMessage).toContain("mismatched session");
|
||||
});
|
||||
|
||||
it("returns k8s_concurrent_run_blocked when same-run job is still running", async () => {
|
||||
// runId matches → samRun.length > 0 → blocked
|
||||
const sameRunJob = makeJob({ runId: "run-test-001", agentId: "agent-abc", terminal: false });
|
||||
mockBatchListJobs.mockResolvedValue({ items: [sameRunJob] });
|
||||
const result = await execute(makeCtx());
|
||||
expect(result.errorCode).toBe("k8s_concurrent_run_blocked");
|
||||
expect(result.errorMessage).toContain("still running for this agent");
|
||||
});
|
||||
|
||||
it("reattaches to a matching orphan and returns k8s_pod_reattach_failed when pod is missing", async () => {
|
||||
// Orphan with matching taskId and sessionId → reattach classification → reattachTarget is set
|
||||
const orphan = makeJob({
|
||||
runId: "prior-run",
|
||||
agentId: "agent-abc",
|
||||
taskId: "task-match",
|
||||
sessionId: "sess-match",
|
||||
});
|
||||
mockBatchListJobs.mockResolvedValue({ items: [orphan] });
|
||||
mockBatchPatchJob.mockResolvedValue({});
|
||||
mockPrepareBundle.mockResolvedValue(makeBundle());
|
||||
// Pod lookup finds nothing → reattach pod-not-found error
|
||||
mockCoreListPods.mockResolvedValue({ items: [] });
|
||||
|
||||
const result = await execute(
|
||||
makeCtx({
|
||||
context: { taskId: "task-match" },
|
||||
runtime: { sessionId: "sess-match", sessionParams: null, sessionDisplayId: null, taskKey: null },
|
||||
} as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
|
||||
expect(result.errorCode).toBe("k8s_pod_reattach_failed");
|
||||
expect(result.errorMessage).toContain("no pod");
|
||||
});
|
||||
});
|
||||
|
||||
// ─── execute: job creation paths ─────────────────────────────────────────────
|
||||
|
||||
describe("execute: job creation", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
|
||||
mockBatchListJobs.mockResolvedValue({ items: [] }); // no concurrent jobs
|
||||
mockPrepareBundle.mockResolvedValue(makeBundle());
|
||||
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } });
|
||||
mockBatchDeleteJob.mockResolvedValue({});
|
||||
});
|
||||
|
||||
it("returns k8s_job_create_failed when createNamespacedJob throws", async () => {
|
||||
mockBatchCreateJob.mockRejectedValue(new Error("quota exceeded"));
|
||||
const result = await execute(makeCtx());
|
||||
expect(result.errorCode).toBe("k8s_job_create_failed");
|
||||
expect(result.errorMessage).toContain("quota exceeded");
|
||||
});
|
||||
|
||||
it("returns k8s_pod_schedule_failed when pod scheduling times out", async () => {
|
||||
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "uid-1" } });
|
||||
mockBatchDeleteJob.mockResolvedValue({});
|
||||
// Pod never appears → waitForPod eventually times out.
|
||||
// Provide a config with very short timeout to avoid a real 2-minute wait.
|
||||
// Instead, make listNamespacedPod return an unschedulable condition immediately.
|
||||
mockCoreListPods.mockResolvedValue({
|
||||
items: [
|
||||
{
|
||||
metadata: { name: "pod-xyz" },
|
||||
status: {
|
||||
phase: "Pending",
|
||||
conditions: [
|
||||
{ type: "PodScheduled", status: "False", reason: "Unschedulable", message: "no nodes available" },
|
||||
],
|
||||
containerStatuses: [],
|
||||
initContainerStatuses: [],
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const result = await execute(makeCtx());
|
||||
|
||||
expect(result.errorCode).toBe("k8s_pod_schedule_failed");
|
||||
expect(result.errorMessage).toContain("unschedulable");
|
||||
});
|
||||
});
|
||||
|
||||
// ─── execute: full happy path ─────────────────────────────────────────────────
|
||||
|
||||
describe("execute: happy path", () => {
|
||||
// vi.resetAllMocks() ensures the mockResolvedValueOnce queue is fully cleared
|
||||
// before each test so beforeEach always starts with a known queue depth of zero.
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
vi.useFakeTimers();
|
||||
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
|
||||
mockBatchListJobs.mockResolvedValue({ items: [] });
|
||||
mockPrepareBundle.mockResolvedValue(makeBundle());
|
||||
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } });
|
||||
mockBatchPatchJob.mockResolvedValue({});
|
||||
|
||||
// Default: waitForPod gets Running pod (once queue), getPodExitCode gets
|
||||
// the terminated-exit-0 pod (default return value).
|
||||
// Tests that need a different exit code should call
|
||||
// mockCoreListPods.mockResolvedValue(exitCode1Pod)
|
||||
// which replaces only the default; the once-queue entry from this beforeEach
|
||||
// is still consumed by the first waitForPod call.
|
||||
mockCoreListPods
|
||||
.mockResolvedValueOnce({
|
||||
items: [
|
||||
{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
|
||||
},
|
||||
],
|
||||
})
|
||||
.mockResolvedValue({
|
||||
items: [
|
||||
{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: {
|
||||
containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }],
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
// waitForJobCompletion: Complete on first read
|
||||
mockBatchReadJob.mockResolvedValue({
|
||||
status: { conditions: [{ type: "Complete", status: "True" }] },
|
||||
});
|
||||
|
||||
// streamPodLogsOnce: write valid Claude output to the writable stream
|
||||
mockLogFn.mockImplementation(
|
||||
async (_ns: string, _pod: string, _ctr: string, writable: Writable) => {
|
||||
// chunks.push() is called synchronously inside the Writable handler,
|
||||
// so the output is captured even before the write callback fires.
|
||||
writable.write(CLAUDE_HAPPY_OUTPUT);
|
||||
},
|
||||
);
|
||||
|
||||
mockBatchDeleteJob.mockResolvedValue({});
|
||||
mockCoreDeleteSecret.mockResolvedValue({});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("returns a successful result with session, usage, and model fields", async () => {
|
||||
const onSpawn = vi.fn().mockResolvedValue(undefined);
|
||||
const onMeta = vi.fn().mockResolvedValue(undefined);
|
||||
const ctx = makeCtx({ onSpawn, onMeta } as Partial<AdapterExecutionContext>);
|
||||
|
||||
const executePromise = execute(ctx);
|
||||
|
||||
// streamPodLogs checks stopSignal after streamPodLogsOnce returns. With fake
|
||||
// timers the reconnect delay (3 s) is held by a fake setTimeout — advance past
|
||||
// it so the loop exits and Promise.allSettled resolves. advanceTimersByTimeAsync
|
||||
// flushes all pending microtasks between timer firings, including the
|
||||
// waitForJobCompletion resolution that sets logStopSignal.stopped = true.
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(result.timedOut).toBe(false);
|
||||
expect(result.errorMessage).toBeNull();
|
||||
expect(result.sessionId).toBe("sess_test123");
|
||||
expect(result.usage?.inputTokens).toBe(100);
|
||||
expect(result.usage?.outputTokens).toBe(50);
|
||||
expect(result.usage?.cachedInputTokens).toBe(10);
|
||||
expect(result.provider).toBe("anthropic");
|
||||
// cleanupJob must have been called
|
||||
expect(mockBatchDeleteJob).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns timedOut=true when the job deadline is exceeded", async () => {
|
||||
// Override waitForJobCompletion to report DeadlineExceeded
|
||||
mockBatchReadJob.mockResolvedValue({
|
||||
status: {
|
||||
conditions: [{ type: "Failed", status: "True", reason: "DeadlineExceeded" }],
|
||||
},
|
||||
});
|
||||
|
||||
const executePromise = execute(makeCtx({ config: { timeoutSec: 30 } } as Partial<AdapterExecutionContext>));
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.timedOut).toBe(true);
|
||||
expect(result.errorCode).toBe("timeout");
|
||||
});
|
||||
|
||||
it("returns session_unavailable and clearSession=true on unknown-session Claude error", async () => {
|
||||
// isClaudeUnknownSessionError matches /no conversation found with session id/i
|
||||
const sessionErrorOutput = [
|
||||
JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_bad" }),
|
||||
JSON.stringify({
|
||||
type: "result",
|
||||
subtype: "error",
|
||||
result: "No conversation found with session id sess_bad",
|
||||
is_error: true,
|
||||
session_id: "sess_bad",
|
||||
usage: { input_tokens: 10, output_tokens: 5, cache_read_input_tokens: 0 },
|
||||
total_cost_usd: 0.0,
|
||||
}),
|
||||
].join("\n") + "\n";
|
||||
|
||||
mockLogFn.mockImplementation(
|
||||
async (_ns: string, _pod: string, _ctr: string, writable: Writable) => {
|
||||
writable.write(sessionErrorOutput);
|
||||
},
|
||||
);
|
||||
// Once-queue entry for waitForPod already set by beforeEach; override the
|
||||
// default so getPodExitCode returns exitCode=1.
|
||||
mockCoreListPods.mockResolvedValue({
|
||||
items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 1 } } }] } }],
|
||||
});
|
||||
|
||||
const executePromise = execute(
|
||||
makeCtx({ runtime: { sessionId: "sess_bad", sessionParams: null, sessionDisplayId: null, taskKey: null } } as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.clearSession).toBe(true);
|
||||
expect(result.errorCode).toBe("session_unavailable");
|
||||
});
|
||||
|
||||
it("surfaces buildPartialRunError when stdout has no result event", async () => {
|
||||
// Log stream returns only init line — no result event
|
||||
const noResultOutput = JSON.stringify({
|
||||
type: "system",
|
||||
subtype: "init",
|
||||
model: "claude-sonnet-4-6",
|
||||
session_id: "sess_x",
|
||||
}) + "\n";
|
||||
|
||||
mockLogFn.mockImplementation(
|
||||
async (_ns: string, _pod: string, _ctr: string, writable: Writable) => {
|
||||
writable.write(noResultOutput);
|
||||
},
|
||||
);
|
||||
// Override default so getPodExitCode returns exit code 1 (model-hint path);
|
||||
// the once-queue entry from beforeEach is still consumed by waitForPod.
|
||||
mockCoreListPods.mockResolvedValue({
|
||||
items: [{ metadata: { name: "pod-abc" }, status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 1 } } }] } }],
|
||||
});
|
||||
|
||||
const executePromise = execute(makeCtx());
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.exitCode).toBe(1);
|
||||
expect(result.errorMessage).toContain("claude-sonnet-4-6");
|
||||
});
|
||||
|
||||
it("does not delete the Job when retainJobs=true", async () => {
|
||||
const executePromise = execute(makeCtx({ config: { retainJobs: true } } as Partial<AdapterExecutionContext>));
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(mockBatchDeleteJob).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("handles cleanupJob failure gracefully (best-effort)", async () => {
|
||||
mockBatchDeleteJob.mockRejectedValue(new Error("forbidden: delete not allowed"));
|
||||
|
||||
const executePromise = execute(makeCtx());
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
// cleanupJob failure must not propagate — execute should still succeed
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(result.errorMessage).toBeNull();
|
||||
});
|
||||
|
||||
it("falls back to one-shot readPodLogs when log stream returns empty", async () => {
|
||||
// Log stream writes nothing — simulates fast container exit before follow connect
|
||||
mockLogFn.mockImplementation(async () => {});
|
||||
// One-shot read returns full output
|
||||
mockCoreReadPodLog.mockResolvedValue(CLAUDE_HAPPY_OUTPUT);
|
||||
|
||||
const executePromise = execute(makeCtx());
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.sessionId).toBe("sess_test123");
|
||||
expect(mockCoreReadPodLog).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("replaces partial stream with longer one-shot pod log read", async () => {
|
||||
// Stream writes only the init line (no result event) — partial capture
|
||||
const initLine = JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_x" }) + "\n";
|
||||
mockLogFn.mockImplementation(
|
||||
async (_ns: string, _pod: string, _ctr: string, writable: Writable) => {
|
||||
writable.write(initLine);
|
||||
},
|
||||
);
|
||||
// One-shot read returns the full output, which is longer and has a result event
|
||||
mockCoreReadPodLog.mockResolvedValue(CLAUDE_HAPPY_OUTPUT);
|
||||
|
||||
const executePromise = execute(makeCtx());
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.sessionId).toBe("sess_test123");
|
||||
expect(mockCoreReadPodLog).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("proceeds with captured output when job is deleted by TTL (404 in completion poll)", async () => {
|
||||
// waitForJobCompletion catches 404 and returns jobGone=true — execute must
|
||||
// continue to stdout parsing rather than returning an error.
|
||||
mockBatchReadJob.mockRejectedValue(
|
||||
Object.assign(new Error("Not Found"), { response: { statusCode: 404 } }),
|
||||
);
|
||||
|
||||
const executePromise = execute(makeCtx());
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(result.sessionId).toBe("sess_test123");
|
||||
});
|
||||
|
||||
it("reconnects log stream and logs status when job completion takes > 3s", async () => {
|
||||
// Make waitForJobCompletion take 4s so the 3s stream reconnect fires first.
|
||||
// timeoutSec=4, graceSec=0 → completionTimeoutMs=4000.
|
||||
// Sequence: poll at t=0 (non-terminal, 2s delay) → poll at t=2000 (non-terminal,
|
||||
// 2s delay) → at t=4000 deadline passes → timedOut=true → stopped=true.
|
||||
// Meanwhile: reconnect at t=3000 (attempt=1) → line 393 fires → stream reconnects.
|
||||
mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); // never terminal
|
||||
|
||||
const executePromise = execute(
|
||||
makeCtx({ config: { timeoutSec: 4, graceSec: 0 } } as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
|
||||
// Timer sequence (all times relative to when execute() registers fake timers,
|
||||
// which is after readPaperclipRuntimeSkillEntries real I/O completes):
|
||||
// t+2000: waitForJobCompletion poll (non-terminal → sleep 2000ms more)
|
||||
// t+3000: streamPodLogs reconnect (attempt=1 → sleep 3000ms more)
|
||||
// t+4000: waitForJobCompletion deadline exceeded → timedOut=true → stopped=true
|
||||
// t+6000: streamPodLogs sleep ends → while(!stopped) → exits → allSettled resolves
|
||||
//
|
||||
// readPaperclipRuntimeSkillEntries does real fs.stat I/O under fake timers.
|
||||
// Each advanceTimersByTimeAsync call gives one real event-loop turn for that
|
||||
// I/O to complete. Multiple small advances ensure I/O drains before fake timers
|
||||
// fire, and the total (~12200ms) covers the I/O delay plus the 6000ms sequence.
|
||||
await vi.advanceTimersByTimeAsync(2_100);
|
||||
await vi.advanceTimersByTimeAsync(1_100);
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
await vi.advanceTimersByTimeAsync(3_000);
|
||||
await vi.advanceTimersByTimeAsync(3_000);
|
||||
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.timedOut).toBe(true);
|
||||
expect(result.errorCode).toBe("timeout");
|
||||
}, 15_000);
|
||||
|
||||
it("waitForJobCompletion respects deadline and returns timedOut via poll loop", async () => {
|
||||
// timeoutSec=1, graceSec=0 → completionTimeoutMs=1000ms. The poll delay (2s)
|
||||
// fires at t=2000 > deadline (t=1000) → while loop exits → returns timedOut.
|
||||
mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); // never terminal
|
||||
|
||||
const executePromise = execute(
|
||||
makeCtx({ config: { timeoutSec: 1, graceSec: 0 } } as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
// Multiple advances provide event-loop turns for readPaperclipRuntimeSkillEntries
|
||||
// I/O to drain before fake timers fire. Total ~8400ms covers the I/O delay
|
||||
// (~4200ms observed in isolation) plus the 3000ms timer sequence needed:
|
||||
// t+2000: poll 2 fires, while(Date.now() < deadline) = false → timedOut=true → stopped=true
|
||||
// t+3000: reconnect sleep fires → while(!stopped) → exits → allSettled resolves
|
||||
await vi.advanceTimersByTimeAsync(2_100);
|
||||
await vi.advanceTimersByTimeAsync(1_100);
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
await vi.advanceTimersByTimeAsync(2_100);
|
||||
await vi.advanceTimersByTimeAsync(2_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.timedOut).toBe(true);
|
||||
expect(result.errorCode).toBe("timeout");
|
||||
}, 15_000);
|
||||
|
||||
it("waits for pod creation (no-pod state) then succeeds when pod appears", async () => {
|
||||
// Override mockCoreListPods: first call returns empty (no pod yet),
|
||||
// second returns running, default returns terminated exit 0.
|
||||
mockCoreListPods.mockReset();
|
||||
mockCoreListPods
|
||||
.mockResolvedValueOnce({ items: [] })
|
||||
.mockResolvedValueOnce({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
|
||||
}],
|
||||
})
|
||||
.mockResolvedValue({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: {
|
||||
containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }],
|
||||
},
|
||||
}],
|
||||
});
|
||||
|
||||
const executePromise = execute(makeCtx());
|
||||
// Multiple advances provide event-loop turns for readPaperclipRuntimeSkillEntries
|
||||
// I/O to drain. Total ~9400ms covers I/O delay (~4200ms) plus the 5000ms
|
||||
// sequence: waitForPod sleep (2000ms) → pod found → allSettled starts →
|
||||
// waitForJobCompletion Complete (immediate, stopped=true) → streamPodLogs
|
||||
// reconnect sleep (3000ms) → while(!stopped) → exits.
|
||||
await vi.advanceTimersByTimeAsync(2_100);
|
||||
await vi.advanceTimersByTimeAsync(1_100);
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
await vi.advanceTimersByTimeAsync(2_100);
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.exitCode).toBe(0);
|
||||
}, 15_000);
|
||||
|
||||
it("logs warning and continues when instructionsFilePath file does not exist", async () => {
|
||||
// The catch block in execute() logs a warning and proceeds with null instructions
|
||||
const executePromise = execute(
|
||||
makeCtx({ config: { instructionsFilePath: "/nonexistent/agent-instructions.md" } } as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.exitCode).toBe(0);
|
||||
});
|
||||
|
||||
it("logs warning for extra labels with reserved prefix (skippedLabels)", async () => {
|
||||
// Labels starting with "paperclip.io/" are reserved and get skipped
|
||||
const executePromise = execute(
|
||||
makeCtx({ config: { labels: { "paperclip.io/custom": "value" } } } as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.exitCode).toBe(0);
|
||||
});
|
||||
|
||||
it("logs pod pending → init-waiting → running transition and then succeeds", async () => {
|
||||
// First poll: pod is Pending with init and main containers in waiting state
|
||||
// Second poll: pod is Running → waitForPod returns
|
||||
mockCoreListPods.mockReset();
|
||||
mockCoreListPods
|
||||
.mockResolvedValueOnce({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: {
|
||||
phase: "Pending",
|
||||
initContainerStatuses: [{ name: "write-prompt", state: { waiting: { reason: "PodInitializing" } } }],
|
||||
containerStatuses: [{ name: "claude", state: { waiting: { reason: "PodInitializing" } } }],
|
||||
},
|
||||
}],
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
|
||||
}],
|
||||
})
|
||||
.mockResolvedValue({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] },
|
||||
}],
|
||||
});
|
||||
|
||||
const executePromise = execute(makeCtx());
|
||||
// First poll: Pending → logs phase, schedules next poll at t=2000
|
||||
// Second poll: Running → waitForPod returns, then log streaming begins
|
||||
// Third advance: fires streamPodLogs 3s reconnect timer
|
||||
await vi.advanceTimersByTimeAsync(2_100); // t=0→2100: first waitForPod poll (Pending)
|
||||
await vi.advanceTimersByTimeAsync(2_100); // t=2100→4200: second waitForPod poll (Running)
|
||||
await vi.advanceTimersByTimeAsync(3_100); // t=4200→7300: streamPodLogs reconnect
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.exitCode).toBe(0);
|
||||
}, 15_000);
|
||||
|
||||
it("returns running pod via allInitsDone && mainRunning even when phase=Pending", async () => {
|
||||
// Phase stays Pending, but init containers are done and main is running.
|
||||
// waitForPod returns immediately via the allInitsDone && mainRunning branch (no 2s delay).
|
||||
mockCoreListPods.mockReset();
|
||||
mockCoreListPods
|
||||
.mockResolvedValueOnce({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: {
|
||||
phase: "Pending",
|
||||
initContainerStatuses: [{ name: "write-prompt", state: { terminated: { exitCode: 0 } } }],
|
||||
containerStatuses: [{ name: "claude", state: { running: { startedAt: "2024-01-01T00:00:00Z" } } }],
|
||||
},
|
||||
}],
|
||||
})
|
||||
.mockResolvedValue({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] },
|
||||
}],
|
||||
});
|
||||
|
||||
const executePromise = execute(makeCtx());
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.exitCode).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
// ─── execute: waitForPod edge cases ──────────────────────────────────────────
|
||||
|
||||
describe("execute: waitForPod edge cases", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
|
||||
mockBatchListJobs.mockResolvedValue({ items: [] });
|
||||
mockPrepareBundle.mockResolvedValue(makeBundle());
|
||||
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "uid-1" } });
|
||||
mockBatchDeleteJob.mockResolvedValue({});
|
||||
});
|
||||
|
||||
it("throws k8s_pod_schedule_failed when pod reaches phase=Failed immediately", async () => {
|
||||
mockCoreListPods.mockResolvedValue({
|
||||
items: [{
|
||||
metadata: { name: "pod-fail" },
|
||||
status: {
|
||||
phase: "Failed",
|
||||
containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 137, reason: "OOMKilled" } } }],
|
||||
initContainerStatuses: [],
|
||||
},
|
||||
}],
|
||||
});
|
||||
|
||||
const result = await execute(makeCtx());
|
||||
|
||||
expect(result.errorCode).toBe("k8s_pod_schedule_failed");
|
||||
expect(result.errorMessage).toContain("OOMKilled");
|
||||
});
|
||||
|
||||
it("throws k8s_pod_schedule_failed when init container exits non-zero", async () => {
|
||||
mockCoreListPods.mockResolvedValue({
|
||||
items: [{
|
||||
metadata: { name: "pod-x" },
|
||||
status: {
|
||||
phase: "Pending",
|
||||
initContainerStatuses: [{
|
||||
name: "write-prompt",
|
||||
state: { terminated: { exitCode: 1, reason: "Error" } },
|
||||
}],
|
||||
containerStatuses: [],
|
||||
},
|
||||
}],
|
||||
});
|
||||
|
||||
const result = await execute(makeCtx());
|
||||
|
||||
expect(result.errorCode).toBe("k8s_pod_schedule_failed");
|
||||
expect(result.errorMessage).toContain("write-prompt");
|
||||
});
|
||||
|
||||
it("throws k8s_pod_schedule_failed when init container has ImagePullBackOff", async () => {
|
||||
mockCoreListPods.mockResolvedValue({
|
||||
items: [{
|
||||
metadata: { name: "pod-x" },
|
||||
status: {
|
||||
phase: "Pending",
|
||||
initContainerStatuses: [{
|
||||
name: "write-prompt",
|
||||
state: { waiting: { reason: "ImagePullBackOff", message: "pull failed" } },
|
||||
}],
|
||||
containerStatuses: [],
|
||||
},
|
||||
}],
|
||||
});
|
||||
|
||||
const result = await execute(makeCtx());
|
||||
|
||||
expect(result.errorCode).toBe("k8s_pod_schedule_failed");
|
||||
expect(result.errorMessage).toContain("image pull");
|
||||
});
|
||||
|
||||
it("throws k8s_pod_schedule_failed when main container has CrashLoopBackOff", async () => {
|
||||
mockCoreListPods.mockResolvedValue({
|
||||
items: [{
|
||||
metadata: { name: "pod-x" },
|
||||
status: {
|
||||
phase: "Pending",
|
||||
initContainerStatuses: [],
|
||||
containerStatuses: [{
|
||||
name: "claude",
|
||||
state: { waiting: { reason: "CrashLoopBackOff" } },
|
||||
}],
|
||||
},
|
||||
}],
|
||||
});
|
||||
|
||||
const result = await execute(makeCtx());
|
||||
|
||||
expect(result.errorCode).toBe("k8s_pod_schedule_failed");
|
||||
expect(result.errorMessage).toContain("crash loop");
|
||||
});
|
||||
});
|
||||
|
||||
// ─── execute: grace-period fallback (FAR-23) ─────────────────────────────────
|
||||
|
||||
describe("execute: log-stream-exit grace period (FAR-23)", () => {
|
||||
// Tests verify that execute() resolves within the grace window even when
|
||||
// waitForJobCompletion keeps polling after the log stream exits (K8s
|
||||
// condition propagation lag).
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
vi.useFakeTimers();
|
||||
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
|
||||
mockBatchListJobs.mockResolvedValue({ items: [] });
|
||||
mockPrepareBundle.mockResolvedValue(makeBundle());
|
||||
mockBatchCreateJob.mockResolvedValue({ metadata: { uid: "job-uid-1" } });
|
||||
mockBatchPatchJob.mockResolvedValue({});
|
||||
mockBatchDeleteJob.mockResolvedValue({});
|
||||
mockCoreDeleteSecret.mockResolvedValue({});
|
||||
mockCoreListPods
|
||||
.mockResolvedValueOnce({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { phase: "Running", containerStatuses: [], initContainerStatuses: [] },
|
||||
}],
|
||||
})
|
||||
.mockResolvedValue({
|
||||
items: [{
|
||||
metadata: { name: "pod-abc" },
|
||||
status: { containerStatuses: [{ name: "claude", state: { terminated: { exitCode: 0 } } }] },
|
||||
}],
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("resolves via grace (jobGone) when log stream exits but job condition never arrives", async () => {
|
||||
// logApi.log returns immediately (container exited) — log stream exits on first attempt.
|
||||
mockLogFn.mockImplementation(async () => {});
|
||||
// One-shot read returns full Claude output (no reconnects needed for output)
|
||||
mockCoreReadPodLog.mockResolvedValue(CLAUDE_HAPPY_OUTPUT);
|
||||
// waitForJobCompletion never detects terminal — simulates K8s condition lag.
|
||||
mockBatchReadJob.mockResolvedValue({ status: { conditions: [] } }); // never terminal
|
||||
|
||||
// No timeoutSec → completionTimeoutMs=0 → polls indefinitely without grace.
|
||||
const executePromise = execute(makeCtx());
|
||||
|
||||
// Advance past:
|
||||
// ~4200ms of readPaperclipRuntimeSkillEntries real I/O (multiple small advances)
|
||||
// 3100ms: streamPodLogs reconnect sleep (attempt=1, stopSignal still false at entry)
|
||||
// + 30100ms: grace period (LOG_EXIT_COMPLETION_GRACE_MS = 30s)
|
||||
// + 3100ms: streamPodLogs bail timer (stopSignal was set by grace → bail fires)
|
||||
await vi.advanceTimersByTimeAsync(2_100);
|
||||
await vi.advanceTimersByTimeAsync(1_100);
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
await vi.advanceTimersByTimeAsync(3_100); // reconnect sleep
|
||||
await vi.advanceTimersByTimeAsync(30_100); // grace fires
|
||||
await vi.advanceTimersByTimeAsync(3_500); // bail timer + margin
|
||||
const result = await executePromise;
|
||||
|
||||
// Grace fires → jobGone=true → execute proceeds with one-shot logs → success
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(result.sessionId).toBe("sess_test123");
|
||||
expect(mockCoreReadPodLog).toHaveBeenCalled();
|
||||
}, 60_000);
|
||||
|
||||
it("resolves promptly via real completion when job condition arrives before grace", async () => {
|
||||
// Log stream exits immediately then job condition arrives well within the grace period.
|
||||
mockLogFn.mockImplementation(
|
||||
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
|
||||
writable.write(CLAUDE_HAPPY_OUTPUT);
|
||||
},
|
||||
);
|
||||
// Job condition appears quickly (< 30s grace period)
|
||||
mockBatchReadJob.mockResolvedValue({
|
||||
status: { conditions: [{ type: "Complete", status: "True" }] },
|
||||
});
|
||||
|
||||
const executePromise = execute(makeCtx());
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
const result = await executePromise;
|
||||
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(result.sessionId).toBe("sess_test123");
|
||||
// One-shot fallback should NOT be needed since the stream captured full output
|
||||
// (grace did not fire, real completion arrived)
|
||||
expect(result.errorMessage).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
// ─── execute: concurrency guard — multiple orphan sorting ────────────────────
|
||||
|
||||
describe("execute: concurrency guard — multiple orphans", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mockGetSelfPodInfo.mockResolvedValue(makeSelfPodResult());
|
||||
});
|
||||
|
||||
it("sorts multiple orphans newest-first and processes them in that order", async () => {
|
||||
// orphanNew has a newer timestamp and a mismatching task → block_task_mismatch
|
||||
// orphanOld has an older timestamp and a matching task → would reattach
|
||||
// The sort (lines 603-605) must put orphanNew first so it is the one classified.
|
||||
const orphanOld = makeJob({ runId: "prior-1", agentId: "agent-abc", taskId: "task-match" });
|
||||
orphanOld.metadata!.creationTimestamp = new Date("2024-01-01T00:00:00Z") as unknown as Date;
|
||||
const orphanNew = makeJob({ runId: "prior-2", agentId: "agent-abc", taskId: "task-other" });
|
||||
orphanNew.metadata!.creationTimestamp = new Date("2024-01-02T00:00:00Z") as unknown as Date;
|
||||
|
||||
mockBatchListJobs.mockResolvedValue({ items: [orphanOld, orphanNew] });
|
||||
const result = await execute(
|
||||
makeCtx({ context: { taskId: "task-match" } } as Partial<AdapterExecutionContext>),
|
||||
);
|
||||
|
||||
// Newest orphan (task-other) is classified first → block_task_mismatch
|
||||
expect(result.errorCode).toBe("k8s_concurrent_run_blocked");
|
||||
expect(result.errorMessage).toContain("different task");
|
||||
});
|
||||
});
|
||||
|
||||
+65
-5
@@ -35,6 +35,13 @@ const POST_TERMINAL_KEEPALIVE_MS = 90_000;
|
||||
// against the K8s client library not propagating writable.destroy() into an
|
||||
// abort of the underlying HTTP request.
|
||||
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
|
||||
// After the log stream exits (container stopped producing output), wait this
|
||||
// long for the K8s Job condition to be confirmed before treating the job as
|
||||
// done. K8s Job conditions can lag pod exit by several seconds or more under
|
||||
// cluster load. Without this bound, waitForJobCompletion keeps polling while
|
||||
// streamPodLogs keeps reconnecting — together they can hold execute() open for
|
||||
// minutes, causing stale "running" status in the UI (FAR-23).
|
||||
const LOG_EXIT_COMPLETION_GRACE_MS = 30_000;
|
||||
|
||||
/**
|
||||
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
|
||||
@@ -357,6 +364,11 @@ export async function streamPodLogsOnce(
|
||||
*
|
||||
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
|
||||
* loops during sustained API partitions.
|
||||
*
|
||||
* onFirstStreamExit is called the first time streamPodLogsOnce returns
|
||||
* (container has exited or stream disconnected). Used by execute() to
|
||||
* start the LOG_EXIT_COMPLETION_GRACE_MS grace timer (FAR-23) without
|
||||
* waiting for all reconnects to exhaust.
|
||||
*/
|
||||
async function streamPodLogs(
|
||||
namespace: string,
|
||||
@@ -365,6 +377,7 @@ async function streamPodLogs(
|
||||
kubeconfigPath?: string,
|
||||
stopSignal?: { stopped: boolean },
|
||||
dedup?: LogLineDedupFilter,
|
||||
onFirstStreamExit?: () => void,
|
||||
): Promise<string> {
|
||||
const allChunks: string[] = [];
|
||||
let attempt = 0;
|
||||
@@ -395,6 +408,9 @@ async function streamPodLogs(
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
|
||||
// Signal first stream exit immediately so the grace-period timer in
|
||||
// execute() can start without waiting for all reconnects to complete.
|
||||
if (attempt === 0) onFirstStreamExit?.();
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
@@ -1063,12 +1079,56 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
|
||||
const logDedup = new LogLineDedupFilter();
|
||||
|
||||
const [logResult, completionResult] = await Promise.allSettled([
|
||||
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup),
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then((r) => {
|
||||
// Track when the log stream first exits so the grace-period can fire
|
||||
// if the K8s Job condition lags behind container exit (FAR-23).
|
||||
// Set via onFirstStreamExit callback (called after attempt=0 returns)
|
||||
// rather than in .then() of streamPodLogs, which would create a
|
||||
// deadlock: streamPodLogs only resolves after stopSignal is set, but
|
||||
// stopSignal is set by the grace timer which needs logExitTime to be
|
||||
// non-null.
|
||||
let logExitTime: number | null = null;
|
||||
const trackedLogStream = streamPodLogs(
|
||||
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
|
||||
() => { logExitTime = Date.now(); },
|
||||
);
|
||||
|
||||
// completionWithGrace races waitForJobCompletion against a grace timer
|
||||
// that fires LOG_EXIT_COMPLETION_GRACE_MS after the log stream exits.
|
||||
// This bounds the stale-UI window when K8s Job conditions lag container
|
||||
// exit (FAR-23): without it, waitForJobCompletion polls indefinitely
|
||||
// while streamPodLogs reconnects, holding execute() open for minutes.
|
||||
// logStopSignal.stopped is set on every settled path (fulfilled, rejected,
|
||||
// or grace) so streamPodLogs stops reconnecting promptly.
|
||||
type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean };
|
||||
let gracePoller: ReturnType<typeof setInterval> | null = null;
|
||||
const completionWithGrace = new Promise<CompletionResult>((resolve, reject) => {
|
||||
let settled = false;
|
||||
const settleOk = (r: CompletionResult) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
logStopSignal.stopped = true;
|
||||
return r;
|
||||
}),
|
||||
resolve(r);
|
||||
};
|
||||
const settleErr = (err: unknown) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
logStopSignal.stopped = true;
|
||||
reject(err);
|
||||
};
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr);
|
||||
gracePoller = setInterval(() => {
|
||||
if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) {
|
||||
void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, jobGone: true });
|
||||
}
|
||||
}, 1_000);
|
||||
});
|
||||
|
||||
const [logResult, completionResult] = await Promise.allSettled([
|
||||
trackedLogStream,
|
||||
completionWithGrace,
|
||||
]);
|
||||
|
||||
// Stop the keepalive immediately once the job has reached a terminal
|
||||
|
||||
Reference in New Issue
Block a user