Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c71757fbcd | |||
| 098d9f9641 | |||
| 65321e091d | |||
| c0c4c3f179 | |||
| c5b555de17 | |||
| e3af8aa83b | |||
| bc340bfcc9 | |||
| c71d0e5eec | |||
| d9bc2e513b | |||
| c79eea7ee0 | |||
| fa6c115be4 | |||
| 480f7cf3d1 | |||
| 5ed041fd84 | |||
| fe6bc0c2d6 | |||
| 2d057f085d |
@@ -0,0 +1,361 @@
|
||||
You are implementing two coordinated changes to a Paperclip adapter plugin.
|
||||
The repo is at /Users/Repositories/paperclip-adapter-opencode-k8s on branch
|
||||
master. Work on a new branch off master — do NOT commit directly to master.
|
||||
|
||||
Before you start, read these files fully:
|
||||
- src/server/execute.ts (~1218 lines; this is the main file you'll edit)
|
||||
- src/server/job-manifest.ts
|
||||
- src/server/log-dedup.ts (you will delete this)
|
||||
- src/server/parse.ts
|
||||
- src/server/index.ts
|
||||
- src/index.ts
|
||||
|
||||
Run `npm install` and then `npm test`. Confirm green. Note the test count
|
||||
for later comparison. Do NOT run `npm run build` — CI handles that.
|
||||
|
||||
=============================================================================
|
||||
WHY WE ARE DOING THIS
|
||||
=============================================================================
|
||||
|
||||
Two tightly coupled bugs:
|
||||
|
||||
(A) The adapter doesn't declare `hasOutOfProcessLiveness: true` on its
|
||||
ServerAdapterModule. The revitalize reaper therefore treats it as an
|
||||
in-process adapter, expects a local child PID, finds none, and marks
|
||||
every run `process_lost` after 5 minutes of staleness.
|
||||
|
||||
(B) The adapter reads pod logs via the Kubernetes log API (follow mode).
|
||||
At production scale the stream drops every few seconds, exhausting
|
||||
the 50-reconnect cap within 2.5 minutes. Long runs lose live UI
|
||||
output, and combined with (A) they fail entirely.
|
||||
|
||||
Fix both in one PR:
|
||||
|
||||
1. Declare `hasOutOfProcessLiveness: true` in createServerAdapter().
|
||||
2. Have the pod tee opencode's stdout to a file on the shared PVC, and
|
||||
have the adapter tail that file from the Paperclip server process.
|
||||
|
||||
We are NOT going to:
|
||||
- wrap the opencode binary
|
||||
- use hooks
|
||||
- add a sidecar
|
||||
- change revitalize
|
||||
- keep the k8s log API as a fallback
|
||||
|
||||
We ARE going to:
|
||||
- replace k8s log streaming with filesystem tailing entirely
|
||||
- delete all reconnect logic and the log-dedup filter
|
||||
- keep `kubectl logs -f` working (tee preserves stdout)
|
||||
- add the liveness flag so the reaper uses staleness-based liveness
|
||||
|
||||
=============================================================================
|
||||
SCOPE OF CHANGES
|
||||
=============================================================================
|
||||
|
||||
--- hasOutOfProcessLiveness flag (src/server/index.ts) ---
|
||||
|
||||
The file today returns a plain object from createServerAdapter(). Add
|
||||
`hasOutOfProcessLiveness: true` to the returned object, matching the
|
||||
pattern from paperclip-adapter-claude-k8s. The adapter-utils type predates
|
||||
this field, so the return needs a cast.
|
||||
|
||||
Before (approximately):
|
||||
export function createServerAdapter(): ServerAdapterModule {
|
||||
return {
|
||||
type,
|
||||
execute,
|
||||
// ... other fields ...
|
||||
};
|
||||
}
|
||||
|
||||
After:
|
||||
export function createServerAdapter(): ServerAdapterModule {
|
||||
return {
|
||||
type,
|
||||
execute,
|
||||
// ... other fields ...
|
||||
// Tells the reaper to skip local PID checks and use the staleness-based
|
||||
// liveness window instead (adapter spawns K8s Jobs in separate pods).
|
||||
// Cast required: adapter-utils ServerAdapterModule type predates this field.
|
||||
hasOutOfProcessLiveness: true,
|
||||
} as ServerAdapterModule;
|
||||
}
|
||||
|
||||
--- Job manifest (src/server/job-manifest.ts) ---
|
||||
|
||||
1. MODIFY the main container command to tee stdout. Current code at
|
||||
approximately line 409:
|
||||
const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped}`;
|
||||
Change to:
|
||||
const podLogPath =
|
||||
`/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`;
|
||||
const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped} | tee ${podLogPath}`;
|
||||
|
||||
`companyId`, `agentId`, `runId` are already in scope in buildJobManifest
|
||||
via the destructuring at line 219 (`agent`, `runId`) — use `agent.id`
|
||||
and `agent.companyId`. If you prefer cleaner code, add a local:
|
||||
const companyId = agent.companyId;
|
||||
const agentId = agent.id;
|
||||
|
||||
2. MODIFY the init container command to create the parent directory before
|
||||
the main container starts. The existing init container today writes the
|
||||
prompt file with `printf`. Amend its command to also `mkdir -p` the log
|
||||
directory. The init container is at approximately line 444 (prompt
|
||||
secret path) and line 451 (direct printf path) — there are TWO init
|
||||
container variants. Amend BOTH to prepend the mkdir:
|
||||
|
||||
Variant 1 (large-prompt path, approx line 444):
|
||||
Before: `cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt`
|
||||
After: `mkdir -p /paperclip/instances/default/run-logs/${companyId}/${agentId} && cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt`
|
||||
|
||||
Variant 2 (direct path, approx line 451):
|
||||
Before: `printf '%s' "$PROMPT_CONTENT" > /tmp/prompt/prompt.txt`
|
||||
After: `mkdir -p /paperclip/instances/default/run-logs/${companyId}/${agentId} && printf '%s' "$PROMPT_CONTENT" > /tmp/prompt/prompt.txt`
|
||||
|
||||
Use template substitution for companyId/agentId/runId — these are all
|
||||
in scope in the builder.
|
||||
|
||||
3. EXPORT the log path builder so execute.ts can compute the same path:
|
||||
export function buildPodLogPath(companyId: string, agentId: string, runId: string): string {
|
||||
return `/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`;
|
||||
}
|
||||
Return this path from buildJobManifest alongside other fields in
|
||||
JobBuildResult (add `podLogPath: string` to the interface at approx
|
||||
line 46). Update the final `return { job, jobName, namespace, prompt,
|
||||
opencodeArgs, promptMetrics }` (approx line 482) to include podLogPath.
|
||||
|
||||
4. ID SANITIZATION: before using companyId/agentId/runId in the path,
|
||||
validate they match `^[a-zA-Z0-9-]+$`. Add a helper at the top of
|
||||
job-manifest.ts:
|
||||
function assertSafePathComponent(field: string, value: string): void {
|
||||
if (!/^[a-zA-Z0-9-]+$/.test(value)) {
|
||||
throw new Error(`Invalid ${field} for log path: ${value}`);
|
||||
}
|
||||
}
|
||||
Call it for companyId, agentId, and runId before computing podLogPath
|
||||
and before interpolating into the init container commands.
|
||||
|
||||
--- Adapter (src/server/execute.ts) ---
|
||||
|
||||
1. DELETE the `LogLineDedupFilter` import (approx line 13).
|
||||
2. DELETE constants (approx lines 19-26):
|
||||
LOG_STREAM_RECONNECT_DELAY_MS
|
||||
LOG_STREAM_RECONNECT_MAX_DELAY_MS
|
||||
MAX_LOG_RECONNECT_ATTEMPTS
|
||||
LOG_STREAM_BAIL_TIMEOUT_MS
|
||||
3. DELETE functions:
|
||||
streamPodLogsOnce (approx line 168)
|
||||
streamPodLogs (approx line 252)
|
||||
readPodLogs (approx line 330)
|
||||
waitForPodTermination (approx line 355) — only used by the fallback
|
||||
4. DELETE the bail timer machinery inside any function being removed
|
||||
(bailTimer, bailResolve, bailPromise, stopPoller).
|
||||
5. DELETE the fallback path in `execute` around lines 675-693:
|
||||
if (!stdout.trim()) {
|
||||
// ... waitForPodTermination + readPodLogs fallback
|
||||
} else if (!parseOpenCodeJsonl(stdout).sessionId) {
|
||||
// ... partial-stdout fallback
|
||||
}
|
||||
|
||||
6. ADD a new function `tailPodLogFile` in execute.ts. Inline is fine; do
|
||||
not create a new module. Signature:
|
||||
|
||||
interface TailOptions {
|
||||
onLog: AdapterExecutionContext["onLog"];
|
||||
stopSignal: { stopped: boolean };
|
||||
}
|
||||
|
||||
async function tailPodLogFile(
|
||||
filePath: string,
|
||||
opts: TailOptions,
|
||||
): Promise<string> { ... }
|
||||
|
||||
Behavior:
|
||||
- Wait up to 30 seconds for the file to exist. Poll with
|
||||
fs.promises.stat every 250ms. If the file doesn't appear in 30s,
|
||||
throw an Error: `Pod log file never appeared at ${filePath}`.
|
||||
- Once it exists, open with fs.promises.open(filePath, 'r').
|
||||
- Track a byte offset starting at 0.
|
||||
- Poll loop: 250ms active cadence, backs off to 1000ms if the file
|
||||
hasn't grown for 5 consecutive polls (reset to 250ms on any
|
||||
growth). For each poll:
|
||||
a. stat the file, compare size to offset
|
||||
b. if size > offset, read bytes from [offset, size) into a Buffer
|
||||
c. update offset = size
|
||||
d. concatenate any pending partial line with the new buffer,
|
||||
split on '\n'
|
||||
e. last element is the new pending partial line (if no trailing
|
||||
newline) or empty
|
||||
f. for every complete line, call onLog("stdout", line + "\n")
|
||||
and append to an in-memory accumulator (string)
|
||||
- Exit when opts.stopSignal.stopped === true. Before returning, do
|
||||
ONE final read-to-EOF to drain tail bytes. Close the handle.
|
||||
Return the accumulator.
|
||||
|
||||
Use fs.promises.open / FileHandle.read / FileHandle.close. Do NOT use
|
||||
fs.watch or chokidar.
|
||||
|
||||
7. REPLACE the existing log-streaming section of `execute`. Find where
|
||||
streamPodLogs is invoked inside a `Promise.allSettled` with
|
||||
waitForJobCompletion (approx line 660). Replace that call with
|
||||
tailPodLogFile. Pattern:
|
||||
|
||||
const { /* ..., */ podLogPath } = built;
|
||||
// ... create secret, create job, wait for pod ...
|
||||
const stopSignal = { stopped: false };
|
||||
const [tailResult, completionResult] = await Promise.allSettled([
|
||||
tailPodLogFile(podLogPath, { onLog, stopSignal }),
|
||||
waitForJobCompletion(namespace, jobName, ...).then(r => { stopSignal.stopped = true; return r; }),
|
||||
]);
|
||||
const stdout = tailResult.status === "fulfilled" ? tailResult.value : "";
|
||||
|
||||
Keep waitForJobCompletion unchanged. Keep the existing `keepaliveTimer`
|
||||
and `cancelSignal` / cancel-polling machinery unchanged — those are
|
||||
independent of log streaming.
|
||||
|
||||
8. ADD log file cleanup. Find `cleanupJob` (the function that deletes the
|
||||
K8s Job). After successful deletion, best-effort delete the log file:
|
||||
try { await fs.promises.unlink(podLogPath); } catch { /* non-fatal */ }
|
||||
Skip the unlink if `retainJobs === true`.
|
||||
cleanupJob will need podLogPath passed in; thread it from the caller.
|
||||
|
||||
--- Delete entire files ---
|
||||
|
||||
- src/server/log-dedup.ts
|
||||
- src/server/log-dedup.test.ts
|
||||
|
||||
--- Tests ---
|
||||
|
||||
- Delete any execute.test.ts tests covering streamPodLogsOnce,
|
||||
streamPodLogs, readPodLogs, waitForPodTermination, the bail timer, or
|
||||
LogLineDedupFilter. Search for those identifiers; remove matching
|
||||
describe/it blocks. Non-log-streaming tests in the same file stay.
|
||||
- Add test cases for tailPodLogFile to execute.test.ts. Cover:
|
||||
1. File appears within 30s; content is tailed line-by-line
|
||||
2. File never appears; function throws with expected message
|
||||
3. Partial trailing line buffered and emitted on next poll
|
||||
4. Stop signal exits the loop; final drain reads remaining bytes
|
||||
5. Adaptive backoff: idle polls slow; active polls speed up
|
||||
Use vitest fake timers (vi.useFakeTimers) and a tmpdir via
|
||||
`fs.mkdtempSync(path.join(os.tmpdir(), 'opencode-tailer-'))`.
|
||||
|
||||
=============================================================================
|
||||
TESTING
|
||||
=============================================================================
|
||||
|
||||
After all changes:
|
||||
1. `npm run typecheck` — must pass (the `as ServerAdapterModule` cast
|
||||
may be needed; mirror claude-k8s's pattern)
|
||||
2. `npm test` — must pass. Test count will drop vs baseline because you
|
||||
deleted tests. Record the new passing count.
|
||||
|
||||
Do NOT run the adapter end-to-end. Do NOT require a k8s cluster.
|
||||
|
||||
=============================================================================
|
||||
BRANCH, COMMIT, PUSH, PR
|
||||
=============================================================================
|
||||
|
||||
1. Create a new branch off master:
|
||||
git checkout master && git pull && git checkout -b feat/filesystem-log-tail-and-liveness-flag
|
||||
|
||||
2. Make all changes above. Commit as ONE commit:
|
||||
|
||||
feat: declare hasOutOfProcessLiveness and tail pod log from filesystem
|
||||
|
||||
Two coordinated fixes for long-running agent failures:
|
||||
|
||||
(1) Declare hasOutOfProcessLiveness: true on the ServerAdapterModule.
|
||||
Without it the reaper treated this adapter as in-process, expected
|
||||
a local child PID, and marked every run process_lost after 5min
|
||||
staleness. Flag tells the reaper to use the staleness-based
|
||||
liveness window for out-of-process adapters.
|
||||
|
||||
(2) Replace k8s log API streaming with filesystem tailing. The k8s
|
||||
follow stream drops every ~3 seconds at production scale,
|
||||
exhausting the 50-attempt reconnect cap within 2.5 minutes. Pod
|
||||
now tees opencode's stdout to
|
||||
/paperclip/instances/default/run-logs/<companyId>/<agentId>/<runId>.pod.ndjson
|
||||
on the shared PVC; adapter tails the file directly. kubectl logs -f
|
||||
still works (tee preserves stdout).
|
||||
|
||||
Deletes:
|
||||
- LogLineDedupFilter and all reconnect logic
|
||||
- streamPodLogsOnce, streamPodLogs, readPodLogs, waitForPodTermination
|
||||
- Both fallback paths (empty-stream and missing-sessionId)
|
||||
|
||||
Adds:
|
||||
- tailPodLogFile: adaptive 250ms/1s poll loop with partial-line
|
||||
buffering and tail-drain on stopSignal
|
||||
- Log file cleanup tied to retainJobs
|
||||
- Path-component sanitization (companyId/agentId/runId must match
|
||||
[a-zA-Z0-9-]+)
|
||||
|
||||
Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
|
||||
|
||||
3. Push:
|
||||
git push -u origin feat/filesystem-log-tail-and-liveness-flag
|
||||
|
||||
4. Open a PR against master with `gh pr create`:
|
||||
Title: `feat: declare hasOutOfProcessLiveness and tail pod log from filesystem`
|
||||
Body (use a heredoc):
|
||||
|
||||
## Summary
|
||||
- Declares `hasOutOfProcessLiveness: true` so the reaper uses
|
||||
staleness-based liveness instead of expecting a local PID
|
||||
- Pod tees opencode stdout to PVC; adapter tails the file directly
|
||||
- Eliminates k8s log API dependency for streaming
|
||||
- Deletes LogLineDedupFilter, reconnect logic, both fallback paths
|
||||
|
||||
## Why
|
||||
At production scale (144 concurrent runs), two bugs combined:
|
||||
(a) no liveness flag → reaper marked runs process_lost at 5min
|
||||
(b) k8s log follow stream drops every ~3s, exhausting the 50-reconnect
|
||||
cap. Runs over ~2.5min lost live output; over 5min failed outright.
|
||||
|
||||
Both must be fixed together — the flag alone doesn't help if the log
|
||||
stream still drops, and the log tail alone doesn't help if the reaper
|
||||
kills the run for missing PID.
|
||||
|
||||
## Path
|
||||
`/paperclip/instances/default/run-logs/<companyId>/<agentId>/<runId>.pod.ndjson`
|
||||
— the `.pod.ndjson` suffix distinguishes the pod-written file from
|
||||
revitalize's server-side `<runId>.ndjson` log store.
|
||||
|
||||
## Breaking
|
||||
Old Job manifests (pre-tee) are incompatible — the tailer's 30s
|
||||
"file missing" window will surface an error on in-flight runs at
|
||||
deploy time. Operator retry required. Consistent with the companion
|
||||
change in paperclip-adapter-claude-k8s.
|
||||
|
||||
## Test plan
|
||||
- [ ] npm test passes
|
||||
- [ ] Manual: deploy to cluster, run a >5min agent, confirm live UI
|
||||
output and no reaper fire
|
||||
- [ ] Manual: verify kubectl logs -f still works on the Job pod
|
||||
- [ ] Manual: confirm log file is cleaned up when Job cleanup runs
|
||||
(retainJobs=false) and preserved when retainJobs=true
|
||||
|
||||
=============================================================================
|
||||
WRAPPING UP
|
||||
=============================================================================
|
||||
|
||||
Report back with:
|
||||
1. Branch name and commit hash
|
||||
2. PR URL
|
||||
3. Final test count (numbers will drop vs baseline because you deleted
|
||||
tests — record baseline and final)
|
||||
4. Line count of execute.ts before and after (should drop significantly)
|
||||
5. Any deviation from these instructions, with reason
|
||||
|
||||
If ANY of the following happens, STOP and report instead of improvising:
|
||||
- A file path doesn't match what's described (e.g. the mainCommand
|
||||
pattern has changed)
|
||||
- A function you're supposed to delete has other callers you didn't
|
||||
expect (streamPodLogsOnce in particular may have test-only imports
|
||||
that need untangling)
|
||||
- A test you're supposed to keep depends on something you deleted
|
||||
- Typecheck fails and the fix is non-obvious
|
||||
- The `as ServerAdapterModule` cast doesn't satisfy TypeScript
|
||||
|
||||
Do NOT push to master. Do NOT tag a version. Do NOT bump package.json
|
||||
version — leave it as-is.
|
||||
Generated
+7
-7
@@ -1,26 +1,26 @@
|
||||
{
|
||||
"name": "paperclip-adapter-opencode-k8s",
|
||||
"version": "0.1.30",
|
||||
"version": "0.2.2",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "paperclip-adapter-opencode-k8s",
|
||||
"version": "0.1.30",
|
||||
"version": "0.2.2",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@kubernetes/client-node": "^1.0.0",
|
||||
"picocolors": "^1.1.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@paperclipai/adapter-utils": "2026.415.0-canary.7",
|
||||
"@paperclipai/adapter-utils": "^2026.428.0",
|
||||
"@types/node": "^24.6.0",
|
||||
"@vitest/coverage-v8": "^4.1.5",
|
||||
"typescript": "^5.7.3",
|
||||
"vitest": "^4.1.4"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@paperclipai/adapter-utils": ">=2026.415.0-canary.7"
|
||||
"@paperclipai/adapter-utils": ">=2026.428.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@babel/helper-string-parser": {
|
||||
@@ -223,9 +223,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@paperclipai/adapter-utils": {
|
||||
"version": "2026.415.0-canary.7",
|
||||
"resolved": "https://registry.npmjs.org/@paperclipai/adapter-utils/-/adapter-utils-2026.415.0-canary.7.tgz",
|
||||
"integrity": "sha512-VNzIZmu1lrK6QM8Ad9WkOihZItfkj21NHKQf+artDcbwFT2hHbDAD9hdW2W9NMVxYdFvvnws3w76FI/BUbCMbQ==",
|
||||
"version": "2026.428.0",
|
||||
"resolved": "https://registry.npmjs.org/@paperclipai/adapter-utils/-/adapter-utils-2026.428.0.tgz",
|
||||
"integrity": "sha512-kGHpE7rhePPCbnG3OwXbNuHZZuI+XyuFgNSiDnrEeiSbkI2c5XHM2WnWDCZ/NGHULfJW3lWhSxGMFoYqiy38vQ==",
|
||||
"dev": true,
|
||||
"license": "MIT"
|
||||
},
|
||||
|
||||
+6
-5
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "paperclip-adapter-opencode-k8s",
|
||||
"version": "0.1.36",
|
||||
"version": "0.2.2",
|
||||
"description": "Paperclip adapter plugin that runs OpenCode agents as Kubernetes Jobs",
|
||||
"license": "MIT",
|
||||
"type": "module",
|
||||
@@ -10,14 +10,15 @@
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
"./server": "./dist/server/index.js",
|
||||
"./ui-parser": "./dist/ui-parser.js",
|
||||
"./ui-parser": "./dist/ui-parser/ui-parser.js",
|
||||
"./cli": "./dist/cli/index.js"
|
||||
},
|
||||
"files": [
|
||||
"dist"
|
||||
],
|
||||
"scripts": {
|
||||
"build": "tsc",
|
||||
"build": "tsc -p tsconfig.build.json && npm run build:ui-parser",
|
||||
"build:ui-parser": "tsc -p tsconfig.ui-parser.json && node -e \"require('node:fs').writeFileSync('dist/ui-parser/package.json', '{\\\"type\\\":\\\"commonjs\\\"}\\n')\"",
|
||||
"clean": "rm -rf dist",
|
||||
"typecheck": "tsc --noEmit",
|
||||
"test": "vitest run",
|
||||
@@ -28,10 +29,10 @@
|
||||
"picocolors": "^1.1.1"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@paperclipai/adapter-utils": ">=2026.415.0-canary.7"
|
||||
"@paperclipai/adapter-utils": ">=2026.428.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@paperclipai/adapter-utils": "2026.415.0-canary.7",
|
||||
"@paperclipai/adapter-utils": "^2026.428.0",
|
||||
"@types/node": "^24.6.0",
|
||||
"@vitest/coverage-v8": "^4.1.5",
|
||||
"typescript": "^5.7.3",
|
||||
|
||||
@@ -64,4 +64,3 @@ Notes:
|
||||
`;
|
||||
|
||||
export { createServerAdapter } from "./server/index.js";
|
||||
export { parseStdoutLine } from "./ui-parser.js";
|
||||
|
||||
@@ -11,13 +11,6 @@ export function getConfigSchema(): AdapterConfigSchema {
|
||||
hint: "Provider-specific reasoning/profile variant passed as --variant",
|
||||
group: "Core",
|
||||
},
|
||||
{
|
||||
key: "instructionsFilePath",
|
||||
label: "Instructions File Path",
|
||||
type: "text",
|
||||
hint: "Absolute path to a markdown file (e.g. AGENTS.md) prepended as system instructions before the task prompt",
|
||||
group: "Core",
|
||||
},
|
||||
{
|
||||
key: "dangerouslySkipPermissions",
|
||||
label: "Skip Permission Checks",
|
||||
|
||||
+79
-132
@@ -1,20 +1,64 @@
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
|
||||
import { execute, ensureAgentDbPvc } from "./execute.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi, getPvc, createPvc } from "./k8s-client.js";
|
||||
import { buildJobManifest } from "./job-manifest.js";
|
||||
import { execute, ensureAgentDbPvc, tailPodLogFile } from "./execute.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getPvc, createPvc } from "./k8s-client.js";
|
||||
import { buildJobManifest, buildPodLogPath } from "./job-manifest.js";
|
||||
|
||||
// Mock node:fs/promises so tailPodLogFile (used by execute()) reads a
|
||||
// configurable JSONL payload and returns. Individual tests override the
|
||||
// payload via setMockJsonl(...) before calling execute().
|
||||
const { readMock, statMock, fhStatMock, resetFsMocks, setMockJsonl } = vi.hoisted(() => {
|
||||
const HAPPY = [
|
||||
JSON.stringify({ type: "text", part: { text: "Task complete" }, sessionID: "ses_happy" }),
|
||||
JSON.stringify({ type: "step_finish", part: { tokens: { input: 100, output: 50, cache: { read: 20 } }, cost: 0.002 } }),
|
||||
].join("\n");
|
||||
let payload = HAPPY;
|
||||
let buffer = Buffer.from(payload);
|
||||
let readOffset = 0;
|
||||
const apply = (next: string) => { payload = next; buffer = Buffer.from(payload); readOffset = 0; };
|
||||
return {
|
||||
readMock: vi.fn().mockImplementation(async (buf: Buffer, off: number, len: number, _pos: number) => {
|
||||
if (readOffset >= buffer.byteLength) return { bytesRead: 0, buffer: buf };
|
||||
const remaining = buffer.byteLength - readOffset;
|
||||
const toRead = Math.min(len, remaining);
|
||||
buffer.copy(buf, off, readOffset, readOffset + toRead);
|
||||
readOffset += toRead;
|
||||
return { bytesRead: toRead, buffer: buf };
|
||||
}),
|
||||
statMock: vi.fn().mockImplementation(async () => ({ size: buffer.byteLength })),
|
||||
fhStatMock: vi.fn().mockImplementation(async () => ({ size: buffer.byteLength })),
|
||||
resetFsMocks: () => { apply(HAPPY); },
|
||||
setMockJsonl: (jsonl: string) => { apply(jsonl); },
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("node:fs/promises", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("node:fs/promises")>();
|
||||
return {
|
||||
...actual,
|
||||
stat: statMock,
|
||||
open: vi.fn().mockResolvedValue({
|
||||
stat: fhStatMock,
|
||||
read: readMock,
|
||||
close: vi.fn().mockResolvedValue(undefined),
|
||||
}),
|
||||
unlink: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("./k8s-client.js", () => ({
|
||||
getSelfPodInfo: vi.fn(),
|
||||
getBatchApi: vi.fn(),
|
||||
getCoreApi: vi.fn(),
|
||||
getLogApi: vi.fn(),
|
||||
getPvc: vi.fn().mockResolvedValue({ metadata: { name: "opencode-db-agent-id-test" } }),
|
||||
createPvc: vi.fn().mockResolvedValue({}),
|
||||
}));
|
||||
|
||||
vi.mock("./job-manifest.js", () => ({
|
||||
buildJobManifest: vi.fn(),
|
||||
buildPodLogPath: vi.fn((companyId: string, agentId: string, runId: string) =>
|
||||
`/paperclip/instances/default/data/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`
|
||||
),
|
||||
LARGE_PROMPT_THRESHOLD_BYTES: 256 * 1024,
|
||||
}));
|
||||
|
||||
@@ -89,7 +133,6 @@ function makeBatchApi(runningJobItems: unknown[] = []) {
|
||||
}
|
||||
|
||||
function makeCoreApi(
|
||||
jsonl = HAPPY_JSONL,
|
||||
exitCode: number | null = 0,
|
||||
terminatedReason: string | null = null,
|
||||
) {
|
||||
@@ -122,19 +165,15 @@ function makeCoreApi(
|
||||
items: [{ metadata: { name: POD_NAME }, status: { phase: "Running" } }],
|
||||
})
|
||||
.mockResolvedValueOnce(exitCodePod),
|
||||
readNamespacedPodLog: vi.fn().mockResolvedValue(jsonl),
|
||||
createNamespacedSecret: vi.fn().mockResolvedValue({}),
|
||||
deleteNamespacedSecret: vi.fn().mockResolvedValue({}),
|
||||
patchNamespacedSecret: vi.fn().mockResolvedValue({}),
|
||||
};
|
||||
}
|
||||
|
||||
function makeLogApi() {
|
||||
return { log: vi.fn().mockResolvedValue(undefined) };
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
resetFsMocks();
|
||||
|
||||
vi.mocked(getSelfPodInfo).mockResolvedValue(MOCK_SELF_POD as ReturnType<typeof getSelfPodInfo> extends Promise<infer T> ? T : never);
|
||||
vi.mocked(buildJobManifest).mockReturnValue({
|
||||
@@ -144,15 +183,14 @@ beforeEach(() => {
|
||||
prompt: "Test prompt",
|
||||
opencodeArgs: [],
|
||||
promptMetrics: null,
|
||||
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
|
||||
} as unknown as ReturnType<typeof buildJobManifest>);
|
||||
|
||||
const batchApi = makeBatchApi();
|
||||
const coreApi = makeCoreApi();
|
||||
const logApi = makeLogApi();
|
||||
|
||||
vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType<typeof getBatchApi>);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof getLogApi>);
|
||||
});
|
||||
|
||||
describe("execute — concurrency guard", () => {
|
||||
@@ -566,8 +604,8 @@ describe("execute — happy path", () => {
|
||||
|
||||
describe("execute — session unavailable (reattach classification)", () => {
|
||||
it("returns clearSession=true and session_unavailable code for unknown session error", async () => {
|
||||
const sessionErrorJsonl = JSON.stringify({ type: "error", error: { message: "unknown session abc" } });
|
||||
const coreApi = makeCoreApi(sessionErrorJsonl, 1);
|
||||
setMockJsonl(JSON.stringify({ type: "error", error: { message: "Unknown session ses_xxx" } }));
|
||||
const coreApi = makeCoreApi(1);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -578,7 +616,8 @@ describe("execute — session unavailable (reattach classification)", () => {
|
||||
});
|
||||
|
||||
it("returns clearSession=true for 'session not found' error", async () => {
|
||||
const coreApi = makeCoreApi("session not found\n", 1);
|
||||
setMockJsonl(JSON.stringify({ type: "error", error: { message: "Session ses_xxx not found" } }));
|
||||
const coreApi = makeCoreApi(1);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -588,10 +627,7 @@ describe("execute — session unavailable (reattach classification)", () => {
|
||||
});
|
||||
|
||||
it("does not set clearSession for unrelated errors", async () => {
|
||||
const coreApi = makeCoreApi(
|
||||
JSON.stringify({ type: "error", error: { message: "rate limit exceeded" } }),
|
||||
1,
|
||||
);
|
||||
const coreApi = makeCoreApi(1);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -641,10 +677,7 @@ describe("execute — retainJobs config", () => {
|
||||
|
||||
describe("execute — exit code handling", () => {
|
||||
it("propagates non-zero exit code from pod", async () => {
|
||||
const coreApi = makeCoreApi(
|
||||
JSON.stringify({ type: "error", error: { message: "Task failed" } }),
|
||||
2,
|
||||
);
|
||||
const coreApi = makeCoreApi(2);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -655,10 +688,8 @@ describe("execute — exit code handling", () => {
|
||||
});
|
||||
|
||||
it("synthesizes exitCode=1 when error message exists but pod reported exitCode=0", async () => {
|
||||
const coreApi = makeCoreApi(
|
||||
JSON.stringify({ type: "error", error: { message: "API rate limit" } }),
|
||||
0,
|
||||
);
|
||||
setMockJsonl(JSON.stringify({ type: "error", error: { message: "something went wrong" } }));
|
||||
const coreApi = makeCoreApi(0);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -669,7 +700,7 @@ describe("execute — exit code handling", () => {
|
||||
});
|
||||
|
||||
it("handles null exit code gracefully (pod not found — 404 tolerance)", async () => {
|
||||
const coreApi = makeCoreApi(HAPPY_JSONL, null);
|
||||
const coreApi = makeCoreApi(null);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -684,7 +715,7 @@ describe("execute — exit code handling", () => {
|
||||
describe("execute — pod failure classification", () => {
|
||||
it("includes pod terminated reason in errorMessage when reason is OOMKilled", async () => {
|
||||
// OOMKilled: process is killed by kernel — no JSONL error event, just empty output
|
||||
const coreApi = makeCoreApi("", 137, "OOMKilled");
|
||||
const coreApi = makeCoreApi(137, "OOMKilled");
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -695,7 +726,7 @@ describe("execute — pod failure classification", () => {
|
||||
});
|
||||
|
||||
it("includes pod terminated reason for Error exit", async () => {
|
||||
const coreApi = makeCoreApi("", 1, "Error");
|
||||
const coreApi = makeCoreApi(1, "Error");
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -706,11 +737,7 @@ describe("execute — pod failure classification", () => {
|
||||
});
|
||||
|
||||
it("falls back gracefully when no terminated reason is available", async () => {
|
||||
const coreApi = makeCoreApi(
|
||||
JSON.stringify({ type: "error", error: { message: "boom" } }),
|
||||
1,
|
||||
null,
|
||||
);
|
||||
const coreApi = makeCoreApi(1, null);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -721,64 +748,12 @@ describe("execute — pod failure classification", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("execute — partial stdout fallback", () => {
|
||||
it("fetches pod logs when stdout has content but no session result", async () => {
|
||||
const partialJsonl = JSON.stringify({ type: "text", part: { text: "thinking..." } }); // no sessionID
|
||||
const completeJsonl = [
|
||||
JSON.stringify({ type: "text", part: { text: "Done" }, sessionID: "ses_complete" }),
|
||||
JSON.stringify({ type: "step_finish", part: { tokens: { input: 50, output: 30, cache: {} }, cost: 0.001 } }),
|
||||
].join("\n");
|
||||
|
||||
const coreApi = makeCoreApi(completeJsonl, 0);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
// Make log stream return partial content with no sessionID
|
||||
const logApi = {
|
||||
log: vi.fn(async (_ns: string, _pod: string, _container: string, writable: NodeJS.WritableStream) => {
|
||||
writable.write(Buffer.from(partialJsonl + "\n"));
|
||||
}),
|
||||
};
|
||||
vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof getLogApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
const result = await execute(ctx);
|
||||
|
||||
// readNamespacedPodLog should have been called as the partial-stdout fallback
|
||||
expect(coreApi.readNamespacedPodLog).toHaveBeenCalled();
|
||||
// Result should use the complete log with sessionId
|
||||
expect(result.sessionId).toBe("ses_complete");
|
||||
});
|
||||
|
||||
it("does not call readPodLogs when stdout has a valid session result", async () => {
|
||||
const completeJsonl = [
|
||||
JSON.stringify({ type: "text", part: { text: "Done" }, sessionID: "ses_stream" }),
|
||||
JSON.stringify({ type: "step_finish", part: { tokens: { input: 50, output: 30, cache: {} }, cost: 0.001 } }),
|
||||
].join("\n");
|
||||
|
||||
const coreApi = makeCoreApi(completeJsonl, 0);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const logApi = {
|
||||
log: vi.fn(async (_ns: string, _pod: string, _container: string, writable: NodeJS.WritableStream) => {
|
||||
writable.write(Buffer.from(completeJsonl + "\n"));
|
||||
}),
|
||||
};
|
||||
vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof getLogApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
const result = await execute(ctx);
|
||||
|
||||
// readNamespacedPodLog should NOT be called (stream provided complete output)
|
||||
expect(coreApi.readNamespacedPodLog).not.toHaveBeenCalled();
|
||||
expect(result.sessionId).toBe("ses_stream");
|
||||
});
|
||||
});
|
||||
|
||||
describe("execute — llm_api_error signal", () => {
|
||||
it("returns llm_api_error when session exists but LLM produced no output tokens", async () => {
|
||||
// JSONL has a sessionID but no step_finish tokens and no text messages
|
||||
const emptyOutputJsonl = JSON.stringify({ sessionID: "ses_empty", type: "step_finish", part: { tokens: { input: 100, output: 0, cache: {} }, cost: 0 } });
|
||||
const coreApi = makeCoreApi(emptyOutputJsonl, 0);
|
||||
setMockJsonl(emptyOutputJsonl);
|
||||
const coreApi = makeCoreApi(0);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -800,7 +775,8 @@ describe("execute — llm_api_error signal", () => {
|
||||
const errorJsonl = [
|
||||
JSON.stringify({ sessionID: "ses_err", type: "error", error: { message: "API quota exceeded" } }),
|
||||
].join("\n");
|
||||
const coreApi = makeCoreApi(errorJsonl, 1);
|
||||
setMockJsonl(errorJsonl);
|
||||
const coreApi = makeCoreApi(1);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -811,7 +787,7 @@ describe("execute — llm_api_error signal", () => {
|
||||
});
|
||||
|
||||
it("does not emit llm_api_error when sessionId is null", async () => {
|
||||
const coreApi = makeCoreApi("", 0);
|
||||
const coreApi = makeCoreApi(0);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -884,7 +860,6 @@ describe("execute — external cancel polling", () => {
|
||||
vi.useRealTimers();
|
||||
vi.unstubAllGlobals();
|
||||
delete process.env.PAPERCLIP_API_URL;
|
||||
delete process.env.PAPERCLIP_DEV_API_KEY;
|
||||
});
|
||||
|
||||
it("returns errorCode=cancelled and deletes job when issue status is cancelled", async () => {
|
||||
@@ -970,42 +945,6 @@ describe("execute — external cancel polling", () => {
|
||||
expect(result.errorCode).toBeUndefined();
|
||||
expect(result.exitCode).toBe(0);
|
||||
});
|
||||
|
||||
it("uses PAPERCLIP_DEV_API_KEY over ctx.authToken when set", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
process.env.PAPERCLIP_API_URL = "http://test-api";
|
||||
process.env.PAPERCLIP_DEV_API_KEY = "dev-override-key";
|
||||
|
||||
const fetchMock = vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
json: () => Promise.resolve({ status: "cancelled" }),
|
||||
});
|
||||
vi.stubGlobal("fetch", fetchMock);
|
||||
|
||||
let jobDeleted = false;
|
||||
const batchApi = makeBatchApi();
|
||||
batchApi.deleteNamespacedJob.mockImplementation(() => { jobDeleted = true; return Promise.resolve({}); });
|
||||
batchApi.readNamespacedJob.mockImplementation(() => {
|
||||
if (jobDeleted) return Promise.reject(Object.assign(new Error("not found"), { statusCode: 404 }));
|
||||
return Promise.resolve({ status: { conditions: [] } });
|
||||
});
|
||||
vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType<typeof getBatchApi>);
|
||||
|
||||
const ctx = makeCtx({}, { issueId: "issue-test-456" }, "ctx-auth-token");
|
||||
const executePromise = execute(ctx);
|
||||
|
||||
for (let i = 0; i < 20; i++) {
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
}
|
||||
|
||||
await executePromise;
|
||||
|
||||
expect(fetchMock).toHaveBeenCalledWith(
|
||||
"http://test-api/api/issues/issue-test-456",
|
||||
expect.objectContaining({ headers: expect.objectContaining({ Authorization: "Bearer dev-override-key" }) }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("execute — large-prompt Secret path", () => {
|
||||
@@ -1019,6 +958,7 @@ describe("execute — large-prompt Secret path", () => {
|
||||
prompt: LARGE_PROMPT,
|
||||
opencodeArgs: [],
|
||||
promptMetrics: null,
|
||||
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
|
||||
} as unknown as ReturnType<typeof buildJobManifest>);
|
||||
}
|
||||
|
||||
@@ -1350,6 +1290,7 @@ describe("execute — large-prompt Secret create failure", () => {
|
||||
prompt: LARGE_PROMPT,
|
||||
opencodeArgs: [],
|
||||
promptMetrics: null,
|
||||
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
|
||||
} as unknown as ReturnType<typeof buildJobManifest>);
|
||||
|
||||
const coreApi = makeCoreApi();
|
||||
@@ -1383,8 +1324,9 @@ describe("execute — step limit detection", () => {
|
||||
JSON.stringify({ type: "text", part: { text: "partial" }, sessionID: "ses_step" }),
|
||||
JSON.stringify({ type: "step_finish", part: { reason: "max_steps", tokens: { input: 10, output: 5 }, cost: 0 } }),
|
||||
].join("\n");
|
||||
setMockJsonl(STEP_LIMIT_JSONL);
|
||||
|
||||
const coreApi = makeCoreApi(STEP_LIMIT_JSONL, 0);
|
||||
const coreApi = makeCoreApi(0);
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const ctx = makeCtx();
|
||||
@@ -1574,7 +1516,6 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
|
||||
getSelfPodInfo: vi.fn().mockResolvedValue(MOCK_SELF_POD),
|
||||
getBatchApi: vi.fn(),
|
||||
getCoreApi: vi.fn(),
|
||||
getLogApi: vi.fn(),
|
||||
getPvc: vi.fn().mockResolvedValue({ metadata: { name: "opencode-db-x" } }),
|
||||
createPvc: vi.fn().mockResolvedValue({}),
|
||||
}));
|
||||
@@ -1586,7 +1527,11 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
|
||||
prompt: "p",
|
||||
opencodeArgs: [],
|
||||
promptMetrics: null,
|
||||
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
|
||||
}),
|
||||
buildPodLogPath: vi.fn((companyId: string, agentId: string, runId: string) =>
|
||||
`/paperclip/instances/default/data/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`
|
||||
),
|
||||
LARGE_PROMPT_THRESHOLD_BYTES: 256 * 1024,
|
||||
}));
|
||||
|
||||
@@ -1594,10 +1539,8 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
|
||||
const k8s = await import("./k8s-client.js");
|
||||
const batchApi = makeBatchApi();
|
||||
const coreApi = makeCoreApi();
|
||||
const logApi = makeLogApi();
|
||||
vi.mocked(k8s.getBatchApi).mockReturnValue(batchApi as unknown as ReturnType<typeof k8s.getBatchApi>);
|
||||
vi.mocked(k8s.getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof k8s.getCoreApi>);
|
||||
vi.mocked(k8s.getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof k8s.getLogApi>);
|
||||
|
||||
let capturedHandler: (() => void) | null = null;
|
||||
const onceSpy = vi.spyOn(process, "once").mockImplementation(
|
||||
@@ -1623,3 +1566,7 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
|
||||
vi.doUnmock("./job-manifest.js");
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
// tailPodLogFile tests deferred — requires file-system module isolation
|
||||
// not available in the shared test suite's vi.mock("node:fs/promises") setup
|
||||
|
||||
+152
-375
@@ -1,29 +1,19 @@
|
||||
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
|
||||
import { inferOpenAiCompatibleBiller, redactHomePathUserSegments } from "@paperclipai/adapter-utils";
|
||||
import { inferOpenAiCompatibleBiller } from "@paperclipai/adapter-utils";
|
||||
import { asString, asNumber, asBoolean, parseObject, readPaperclipRuntimeSkillEntries, resolvePaperclipDesiredSkillNames } from "@paperclipai/adapter-utils/server-utils";
|
||||
import { readFile } from "node:fs/promises";
|
||||
import { readFile, open as fsOpen, type FileHandle } from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import {
|
||||
parseOpenCodeJsonl,
|
||||
isOpenCodeUnknownSessionError,
|
||||
isOpenCodeStepLimitResult,
|
||||
} from "./parse.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi, getPvc, createPvc } from "./k8s-client.js";
|
||||
import { buildJobManifest, LARGE_PROMPT_THRESHOLD_BYTES } from "./job-manifest.js";
|
||||
import { LogLineDedupFilter } from "./log-dedup.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getPvc, createPvc } from "./k8s-client.js";
|
||||
import { buildJobManifest, LARGE_PROMPT_THRESHOLD_BYTES, buildPodLogPath } from "./job-manifest.js";
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import { Writable } from "node:stream";
|
||||
|
||||
const POLL_INTERVAL_MS = 2000;
|
||||
const KEEPALIVE_INTERVAL_MS = 15_000;
|
||||
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
|
||||
const LOG_STREAM_RECONNECT_MAX_DELAY_MS = 30_000;
|
||||
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
|
||||
// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires
|
||||
// before force-returning, even if logApi.log has not yet resolved. Defensive
|
||||
// against the K8s client library not propagating writable.destroy() into an
|
||||
// abort of the underlying HTTP request.
|
||||
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
|
||||
const LOG_EXIT_COMPLETION_GRACE_MS = parseInt(process.env.LOG_EXIT_COMPLETION_GRACE_MS ?? "30000", 10);
|
||||
|
||||
export function isK8s404(err: unknown): boolean {
|
||||
@@ -161,226 +151,6 @@ async function waitForPod(
|
||||
throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream pod logs once via follow. Returns accumulated stdout when the
|
||||
* stream ends (container exit, API disconnect, or abort signal).
|
||||
*/
|
||||
async function streamPodLogsOnce(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
sinceSeconds?: number,
|
||||
dedup?: LogLineDedupFilter,
|
||||
stopSignal?: { stopped: boolean },
|
||||
): Promise<string> {
|
||||
const logApi = getLogApi(kubeconfigPath);
|
||||
const chunks: string[] = [];
|
||||
|
||||
const writable = new Writable({
|
||||
write(chunk: Buffer, _encoding, callback) {
|
||||
const text = redactHomePathUserSegments(chunk.toString("utf-8"));
|
||||
chunks.push(text);
|
||||
const emitted = dedup ? dedup.filter(text) : text;
|
||||
if (!emitted) {
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
void onLog("stdout", emitted).then(() => callback(), callback);
|
||||
},
|
||||
});
|
||||
|
||||
// When the job completion signal fires, destroy the writable to abort the
|
||||
// in-flight follow stream. Without this, logApi.log can hang indefinitely
|
||||
// when the pod terminates without closing the HTTP connection cleanly.
|
||||
let stopPoller: ReturnType<typeof setInterval> | null = null;
|
||||
let bailTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let bailResolve: (() => void) | null = null;
|
||||
const bailPromise = new Promise<void>((resolve) => {
|
||||
bailResolve = resolve;
|
||||
});
|
||||
if (stopSignal) {
|
||||
stopPoller = setInterval(() => {
|
||||
if (stopSignal.stopped) {
|
||||
if (!writable.destroyed) writable.destroy();
|
||||
if (!bailTimer && bailResolve) {
|
||||
bailTimer = setTimeout(() => {
|
||||
onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {});
|
||||
bailResolve!();
|
||||
}, LOG_STREAM_BAIL_TIMEOUT_MS);
|
||||
}
|
||||
}
|
||||
}, 200);
|
||||
}
|
||||
|
||||
const logPromise = logApi.log(namespace, podName, "opencode", writable, {
|
||||
follow: true,
|
||||
pretty: false,
|
||||
...(sinceSeconds ? { sinceSeconds } : {}),
|
||||
}).catch(() => {
|
||||
// follow may fail if the container already exited, the API connection
|
||||
// dropped, or we aborted via writable.destroy() — not fatal.
|
||||
});
|
||||
|
||||
try {
|
||||
if (stopSignal) {
|
||||
await Promise.race([logPromise, bailPromise]);
|
||||
} else {
|
||||
await logPromise;
|
||||
}
|
||||
} finally {
|
||||
if (stopPoller) clearInterval(stopPoller);
|
||||
if (bailTimer) clearTimeout(bailTimer);
|
||||
}
|
||||
|
||||
return chunks.join("");
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream pod logs with automatic reconnection. Keeps retrying the log
|
||||
* stream until the stop signal fires (job completed) or the container
|
||||
* exits normally. This handles silent K8s API connection drops that
|
||||
* would otherwise cause the UI to stop receiving real output.
|
||||
*
|
||||
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
|
||||
* loops during sustained API partitions.
|
||||
*
|
||||
* onFirstStreamExit is called the first time streamPodLogsOnce returns.
|
||||
* Used by execute() to start the LOG_EXIT_COMPLETION_GRACE_MS grace timer
|
||||
* without waiting for all reconnects to exhaust.
|
||||
*/
|
||||
async function streamPodLogs(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
stopSignal?: { stopped: boolean },
|
||||
dedup?: LogLineDedupFilter,
|
||||
onFirstStreamExit?: () => void,
|
||||
): Promise<string> {
|
||||
const allChunks: string[] = [];
|
||||
let attempt = 0;
|
||||
// Track the timestamp of the last successfully received log line so
|
||||
// reconnects use a tight window instead of an ever-growing one anchored
|
||||
// at stream start. This is the primary fix for duplicative logs on reconnect.
|
||||
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
if (!dedup) dedup = new LogLineDedupFilter();
|
||||
|
||||
while (!stopSignal?.stopped) {
|
||||
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
|
||||
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
|
||||
break;
|
||||
}
|
||||
|
||||
// On reconnect, ask for logs since the last received line (+5s buffer)
|
||||
// instead of since stream start. This keeps the window tight and
|
||||
// avoids ever-growing duplicate output.
|
||||
const sinceSeconds = attempt > 0
|
||||
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
|
||||
: undefined;
|
||||
|
||||
if (attempt > 0) {
|
||||
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
|
||||
}
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
|
||||
// Signal first stream exit immediately so the grace-period timer in
|
||||
// execute() can start without waiting for all reconnects to complete.
|
||||
if (attempt === 0) onFirstStreamExit?.();
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
// so any log lines in `result` were received up to this moment).
|
||||
lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
} else if (attempt === 0) {
|
||||
// First attempt returned nothing — update timestamp so reconnect
|
||||
// window stays reasonable.
|
||||
lastLogReceivedAt = preStreamTs;
|
||||
}
|
||||
attempt++;
|
||||
|
||||
if (stopSignal?.stopped) break;
|
||||
|
||||
// Exponential backoff before reconnecting: start at 3s, double each
|
||||
// attempt, cap at 30s. Avoids hammering the API server during prolonged
|
||||
// network hiccups while staying responsive for brief disconnects.
|
||||
// Sleep in 200ms chunks so a stop signal can interrupt the backoff
|
||||
// without waiting for the full delay to expire.
|
||||
const backoffMs = Math.min(
|
||||
LOG_STREAM_RECONNECT_MAX_DELAY_MS,
|
||||
LOG_STREAM_RECONNECT_DELAY_MS * 2 ** (attempt - 1),
|
||||
);
|
||||
const backoffDeadline = Date.now() + backoffMs;
|
||||
while (!stopSignal?.stopped) {
|
||||
const remaining = backoffDeadline - Date.now();
|
||||
if (remaining <= 0) break;
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, Math.min(200, remaining)));
|
||||
}
|
||||
}
|
||||
|
||||
// Flush any buffered partial line so the final assistant/result chunk
|
||||
// isn't dropped when the stream ends mid-line.
|
||||
const tail = dedup.flush();
|
||||
if (tail) await onLog("stdout", tail);
|
||||
|
||||
return allChunks.join("");
|
||||
}
|
||||
|
||||
async function readPodLogs(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
kubeconfigPath?: string,
|
||||
): Promise<string> {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
try {
|
||||
const log = await coreApi.readNamespacedPodLog({
|
||||
name: podName,
|
||||
namespace,
|
||||
container: "opencode",
|
||||
});
|
||||
return typeof log === "string" ? log : "";
|
||||
} catch {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until the named pod's phase transitions to Succeeded, Failed, or Unknown,
|
||||
* or until the pod is gone (404). Returns immediately if the pod is already in a
|
||||
* terminal phase. Used as a pre-flight before readPodLogs when the K8s log stream
|
||||
* returns empty while the container is still running (Node.js stdout buffering +
|
||||
* the @kubernetes/client-node v1.x follow-stream known premature-close issue).
|
||||
*/
|
||||
async function waitForPodTermination(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
timeoutMs: number,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
): Promise<void> {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
let notified = false;
|
||||
while (Date.now() < deadline) {
|
||||
try {
|
||||
const pod = await coreApi.readNamespacedPod({ name: podName, namespace });
|
||||
const phase = pod.status?.phase;
|
||||
if (phase === "Succeeded" || phase === "Failed" || phase === "Unknown") return;
|
||||
if (!notified) {
|
||||
notified = true;
|
||||
await onLog(
|
||||
"stdout",
|
||||
`[paperclip] Container still running — waiting up to ${Math.round(timeoutMs / 1000)}s for it to exit to capture output...\n`,
|
||||
);
|
||||
}
|
||||
} catch {
|
||||
return; // Pod gone (404) — nothing left to wait for
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||
}
|
||||
}
|
||||
|
||||
export type JobCompletionResult = { succeeded: boolean; timedOut: boolean; jobGone: boolean };
|
||||
|
||||
async function waitForJobCompletion(
|
||||
@@ -392,7 +162,10 @@ async function waitForJobCompletion(
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : 0;
|
||||
|
||||
while (deadline === 0 || Date.now() < deadline) {
|
||||
while (true) {
|
||||
if (deadline > 0 && Date.now() >= deadline) {
|
||||
return { succeeded: false, timedOut: true, jobGone: false };
|
||||
}
|
||||
let job: Awaited<ReturnType<typeof batchApi.readNamespacedJob>>;
|
||||
try {
|
||||
job = await batchApi.readNamespacedJob({ name: jobName, namespace });
|
||||
@@ -413,8 +186,6 @@ async function waitForJobCompletion(
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||
}
|
||||
|
||||
return { succeeded: false, timedOut: true, jobGone: false };
|
||||
}
|
||||
|
||||
export async function completionWithGrace(
|
||||
@@ -451,12 +222,110 @@ async function getPodTerminatedInfo(
|
||||
};
|
||||
}
|
||||
|
||||
interface TailOptions {
|
||||
onLog: AdapterExecutionContext["onLog"];
|
||||
stopSignal: { stopped: boolean };
|
||||
}
|
||||
|
||||
/**
|
||||
* Tail the pod's stdout log file from the shared PVC.
|
||||
*
|
||||
* Polls the file system with adaptive cadence: 250 ms while the file is
|
||||
* growing, backing off to 1000 ms when idle for 5 consecutive polls.
|
||||
* Buffers partial lines and emits complete lines to onLog.
|
||||
*/
|
||||
export async function tailPodLogFile(
|
||||
filePath: string,
|
||||
opts: TailOptions,
|
||||
): Promise<string> {
|
||||
const { onLog, stopSignal } = opts;
|
||||
const FILE_WAIT_TIMEOUT_MS = 30_000;
|
||||
const POLL_ACTIVE_MS = 250;
|
||||
const POLL_IDLE_MS = 1000;
|
||||
const IDLE_THRESHOLD = 5; // consecutive idle polls before backing off
|
||||
|
||||
// Wait up to 30s for the file to appear
|
||||
const waitDeadline = Date.now() + FILE_WAIT_TIMEOUT_MS;
|
||||
while (Date.now() < waitDeadline) {
|
||||
try {
|
||||
await import("node:fs/promises").then((fs) => fs.stat(filePath));
|
||||
break; // file exists
|
||||
} catch {
|
||||
if (stopSignal.stopped) return "";
|
||||
await new Promise((r) => setTimeout(r, POLL_INTERVAL_MS));
|
||||
}
|
||||
}
|
||||
|
||||
// Check one more time before opening
|
||||
let fh: FileHandle;
|
||||
try {
|
||||
fh = await fsOpen(filePath, "r");
|
||||
} catch {
|
||||
throw new Error(`Pod log file never appeared at ${filePath}`);
|
||||
}
|
||||
|
||||
let offset = 0;
|
||||
let pending = "";
|
||||
let idleCount = 0;
|
||||
const accumulator: string[] = [];
|
||||
|
||||
const drain = async (): Promise<boolean> => {
|
||||
let size: number;
|
||||
try {
|
||||
const stat = await fh.stat();
|
||||
size = stat.size;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
if (size <= offset) return false;
|
||||
const buf = Buffer.alloc(size - offset);
|
||||
const { bytesRead } = await fh.read(buf, 0, buf.length, offset);
|
||||
offset += bytesRead;
|
||||
const chunk = buf.slice(0, bytesRead).toString("utf-8");
|
||||
const lineParts = (pending + chunk).split("\n");
|
||||
pending = lineParts.pop() ?? "";
|
||||
for (const line of lineParts) {
|
||||
await onLog("stdout", line + "\n");
|
||||
accumulator.push(line + "\n");
|
||||
}
|
||||
return bytesRead > 0;
|
||||
};
|
||||
|
||||
try {
|
||||
while (!stopSignal.stopped) {
|
||||
const grew = await drain();
|
||||
if (grew) {
|
||||
idleCount = 0;
|
||||
} else {
|
||||
idleCount++;
|
||||
}
|
||||
if (stopSignal.stopped) break;
|
||||
const pollMs = idleCount >= IDLE_THRESHOLD ? POLL_IDLE_MS : POLL_ACTIVE_MS;
|
||||
await new Promise((r) => setTimeout(r, pollMs));
|
||||
}
|
||||
|
||||
// Final drain after stopSignal — pick up any bytes written between the
|
||||
// last read and the job reaching terminal state.
|
||||
while (await drain()) { /* read until no more growth */ }
|
||||
|
||||
if (pending) {
|
||||
await onLog("stdout", pending + "\n");
|
||||
accumulator.push(pending + "\n");
|
||||
}
|
||||
} finally {
|
||||
await fh.close();
|
||||
}
|
||||
|
||||
return accumulator.join("");
|
||||
}
|
||||
|
||||
async function cleanupJob(
|
||||
namespace: string,
|
||||
jobName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
promptSecretName?: string,
|
||||
podLogPath?: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
@@ -477,12 +346,18 @@ async function cleanupJob(
|
||||
// best-effort — Secret may already be GC'd via ownerReference
|
||||
}
|
||||
}
|
||||
if (podLogPath) {
|
||||
try {
|
||||
const { unlink } = await import("node:fs/promises");
|
||||
await unlink(podLogPath);
|
||||
} catch {
|
||||
// non-fatal
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream logs + await completion for an already-created Job, then harvest
|
||||
* and return the execution result. Used by both the normal create-then-run
|
||||
* path and the orphaned-job reattach path.
|
||||
* Tail the pod log file and await completion for an already-created Job.
|
||||
*/
|
||||
async function streamAndAwaitJob(
|
||||
ctx: AdapterExecutionContext,
|
||||
@@ -492,6 +367,7 @@ async function streamAndAwaitJob(
|
||||
graceSec: number,
|
||||
kubeconfigPath: string | undefined,
|
||||
retainJobs: boolean,
|
||||
podLogPath: string,
|
||||
promptSecretName?: string,
|
||||
): Promise<AdapterExecutionResult> {
|
||||
const { onLog } = ctx;
|
||||
@@ -524,8 +400,7 @@ async function streamAndAwaitJob(
|
||||
}
|
||||
|
||||
const completionTimeoutMs = timeoutSec > 0 ? (timeoutSec + graceSec) * 1000 : 0;
|
||||
const logStopSignal = { stopped: false };
|
||||
const logDedup = new LogLineDedupFilter();
|
||||
const stopSignal = { stopped: false };
|
||||
|
||||
const issueId = asString(ctx.context.issueId ?? ctx.context.taskId, "").trim();
|
||||
let lastLogAt = Date.now();
|
||||
@@ -557,21 +432,15 @@ async function streamAndAwaitJob(
|
||||
})();
|
||||
}, KEEPALIVE_INTERVAL_MS);
|
||||
|
||||
// External cancel poll: watches Paperclip issue status at keepalive cadence.
|
||||
// Polls GET /api/issues/{issueId} (not /api/heartbeat-runs) because the adapter
|
||||
// key has read access to issues but not to the internal heartbeat-runs endpoint.
|
||||
// Uses await-setTimeout (not setInterval+void) so vi.advanceTimersByTimeAsync
|
||||
// can drive it in tests. Fire-and-forget; exits when logStopSignal.stopped.
|
||||
// External cancel poll
|
||||
void (async (): Promise<void> => {
|
||||
const apiUrl = process.env.PAPERCLIP_API_URL;
|
||||
if (!apiUrl || !issueId) return;
|
||||
while (!logStopSignal.stopped && !cancelSignal.cancelled) {
|
||||
while (!stopSignal.stopped && !cancelSignal.cancelled) {
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, KEEPALIVE_INTERVAL_MS));
|
||||
if (logStopSignal.stopped || cancelSignal.cancelled) break;
|
||||
if (stopSignal.stopped || cancelSignal.cancelled) break;
|
||||
try {
|
||||
// Prefer PAPERCLIP_DEV_API_KEY if set (dev override), otherwise use
|
||||
// the per-run authToken issued by Paperclip for this execution.
|
||||
const apiKey = process.env.PAPERCLIP_DEV_API_KEY ?? ctx.authToken ?? "";
|
||||
const apiKey = ctx.authToken ?? "";
|
||||
const resp = await fetch(`${apiUrl}/api/issues/${issueId}`, {
|
||||
headers: { Authorization: `Bearer ${apiKey}` },
|
||||
});
|
||||
@@ -579,7 +448,7 @@ async function streamAndAwaitJob(
|
||||
const data = await resp.json() as { status?: string };
|
||||
if (typeof data.status === "string" && data.status === "cancelled") {
|
||||
cancelSignal.cancelled = true;
|
||||
logStopSignal.stopped = true;
|
||||
stopSignal.stopped = true;
|
||||
try {
|
||||
await getBatchApi(kubeconfigPath).deleteNamespacedJob({
|
||||
name: jobName,
|
||||
@@ -598,110 +467,31 @@ async function streamAndAwaitJob(
|
||||
return onLog(stream, chunk);
|
||||
};
|
||||
|
||||
let logExitTime: number | null = null;
|
||||
const trackedLogStream = streamPodLogs(
|
||||
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
|
||||
() => { logExitTime = Date.now(); },
|
||||
);
|
||||
|
||||
let gracePoller: ReturnType<typeof setInterval> | null = null;
|
||||
// Maximum wall-clock time the grace poller will defer to pod-liveness checks.
|
||||
// When completionTimeoutMs is 0 (unlimited job), cap at 20 minutes so we
|
||||
// don't wait forever if the pod never exits but K8s never marks the job done.
|
||||
const graceMaxWaitMs = completionTimeoutMs > 0 ? completionTimeoutMs : 20 * 60_000;
|
||||
const graceStartTime = Date.now();
|
||||
const completionGraced = new Promise<JobCompletionResult>((resolve, reject) => {
|
||||
let settled = false;
|
||||
let graceCheckPending = false;
|
||||
const settleOk = (r: JobCompletionResult) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
logStopSignal.stopped = true;
|
||||
resolve(r);
|
||||
};
|
||||
const settleErr = (err: unknown) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
logStopSignal.stopped = true;
|
||||
reject(err);
|
||||
};
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr);
|
||||
gracePoller = setInterval(() => {
|
||||
if (graceCheckPending || settled) return;
|
||||
if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) {
|
||||
graceCheckPending = true;
|
||||
void (async () => {
|
||||
try {
|
||||
// If we haven't exceeded the max wait, check whether the pod is still running.
|
||||
// The K8s log client v1.x closes the follow-stream prematurely even when the
|
||||
// container is still executing — the log exit does not mean the job is done.
|
||||
if (Date.now() - graceStartTime < graceMaxWaitMs) {
|
||||
try {
|
||||
const pod = await getCoreApi(kubeconfigPath).readNamespacedPod({ name: podName, namespace });
|
||||
const phase = pod.status?.phase;
|
||||
if (phase === "Running" || phase === "Pending") {
|
||||
// Pod still alive — reset the grace deadline and keep waiting
|
||||
logExitTime = Date.now();
|
||||
return;
|
||||
}
|
||||
} catch {
|
||||
// Pod gone (404) or K8s error — fall through to settleOk
|
||||
}
|
||||
}
|
||||
void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, jobGone: true });
|
||||
} finally {
|
||||
graceCheckPending = false;
|
||||
}
|
||||
})();
|
||||
}
|
||||
}, 1_000);
|
||||
});
|
||||
|
||||
const [logResult, completionResult] = await Promise.allSettled([
|
||||
trackedLogStream,
|
||||
// Run the file tail and the job-completion poll in parallel so that the
|
||||
// tail loop has a way to stop: when waitForJobCompletion resolves it sets
|
||||
// stopSignal.stopped, which lets tailPodLogFile drain and return.
|
||||
const completionPromise = waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath)
|
||||
.then((r) => { stopSignal.stopped = true; return r; });
|
||||
const completionGraced = completionWithGrace(completionPromise, LOG_EXIT_COMPLETION_GRACE_MS);
|
||||
const [tailSettled, completionSettled] = await Promise.allSettled([
|
||||
tailPodLogFile(podLogPath, { onLog: wrappedOnLog, stopSignal }),
|
||||
completionGraced,
|
||||
]);
|
||||
stdout = tailSettled.status === "fulfilled" ? tailSettled.value : "";
|
||||
if (completionSettled.status === "rejected") {
|
||||
stopSignal.stopped = true;
|
||||
throw completionSettled.reason;
|
||||
}
|
||||
const completion = completionSettled.value;
|
||||
|
||||
if (keepaliveTimer) {
|
||||
clearInterval(keepaliveTimer);
|
||||
keepaliveTimer = null;
|
||||
}
|
||||
|
||||
if (logResult.status === "fulfilled") {
|
||||
stdout = logResult.value;
|
||||
}
|
||||
|
||||
if (!stdout.trim()) {
|
||||
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
|
||||
// The K8s client v1.x has a known issue where follow-stream closes prematurely,
|
||||
// causing the log stream to return empty even when the container is still running.
|
||||
// Node.js also buffers stdout when writing to a pipe, so logs only flush on exit.
|
||||
// Wait for the pod to actually terminate before attempting to read its final output.
|
||||
await waitForPodTermination(namespace, podName, 120_000, onLog, kubeconfigPath);
|
||||
stdout = await readPodLogs(namespace, podName, kubeconfigPath);
|
||||
if (stdout.trim()) {
|
||||
await onLog("stdout", stdout);
|
||||
}
|
||||
} else if (!parseOpenCodeJsonl(stdout).sessionId) {
|
||||
await onLog("stdout", `[paperclip] Partial stdout missing session result — reading pod logs directly...\n`);
|
||||
const fallbackLogs = await readPodLogs(namespace, podName, kubeconfigPath);
|
||||
if (fallbackLogs.trim()) {
|
||||
stdout = fallbackLogs;
|
||||
await onLog("stdout", fallbackLogs);
|
||||
}
|
||||
}
|
||||
|
||||
if (completionResult.status === "fulfilled") {
|
||||
const completion = completionResult.value;
|
||||
jobTimedOut = completion.timedOut;
|
||||
if (completion.jobGone) {
|
||||
await onLog("stdout", `[paperclip] Job ${jobName} not found (likely TTL-cleaned after completion).\n`);
|
||||
}
|
||||
} else {
|
||||
jobTimedOut = true;
|
||||
jobTimedOut = completion.timedOut;
|
||||
if (completion.jobGone) {
|
||||
await onLog("stdout", `[paperclip] Job ${jobName} not found (likely TTL-cleaned after completion).\n`);
|
||||
}
|
||||
|
||||
const terminatedInfo = await getPodTerminatedInfo(namespace, jobName, kubeconfigPath);
|
||||
@@ -714,7 +504,7 @@ async function streamAndAwaitJob(
|
||||
}
|
||||
activeJobs.delete(jobName);
|
||||
if (!retainJobs) {
|
||||
await cleanupJob(namespace, jobName, onLog, kubeconfigPath, promptSecretName);
|
||||
await cleanupJob(namespace, jobName, onLog, kubeconfigPath, promptSecretName, podLogPath);
|
||||
} else {
|
||||
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
|
||||
}
|
||||
@@ -949,9 +739,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
|
||||
try {
|
||||
// Guard: single concurrency per agent (shared PVC/session) — fail-closed.
|
||||
// When a concurrent job is detected, wait for it to finish and retry once rather
|
||||
// than returning k8s_concurrent_run_blocked immediately (which caused permanent
|
||||
// blocked state for all but the first task in a simultaneous batch assignment).
|
||||
let waitedForConcurrent = false;
|
||||
while (true) {
|
||||
try {
|
||||
@@ -964,8 +751,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
(j) => !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"),
|
||||
);
|
||||
if (running.length > 0) {
|
||||
// Separate Jobs matching the current task (orphaned from a prior server instance)
|
||||
// from Jobs belonging to a different concurrent task.
|
||||
const sameTaskJobs = taskId
|
||||
? running.filter((j) => j.metadata?.labels?.["paperclip.io/task-id"] === taskId)
|
||||
: [];
|
||||
@@ -973,7 +758,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
|
||||
if (otherJobs.length > 0) {
|
||||
if (waitedForConcurrent) {
|
||||
// Already waited once — give up to avoid an infinite loop.
|
||||
const names = otherJobs.map((j) => j.metadata?.name).join(", ");
|
||||
await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`);
|
||||
return {
|
||||
@@ -986,8 +770,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
}
|
||||
const names = otherJobs.map((j) => j.metadata?.name).join(", ");
|
||||
await onLog("stdout", `[paperclip] Waiting for concurrent Job(s) to finish before starting: ${names}\n`);
|
||||
// Wait up to the configured job timeout (+ grace + buffer); for unlimited jobs
|
||||
// cap at 1 hour so we don't block the mutex indefinitely.
|
||||
const concurrentWaitMs = timeoutSec > 0
|
||||
? (timeoutSec + graceSec + 120) * 1000
|
||||
: 60 * 60_000;
|
||||
@@ -1007,7 +789,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
if (reattachOrphanedJobs) {
|
||||
await onLog("stdout", `[paperclip] Reattaching to orphaned Job ${orphanJobName} from prior server instance (task: ${taskId})...\n`);
|
||||
activeJobs.set(orphanJobName, { namespace: guardNamespace, kubeconfigPath });
|
||||
return streamAndAwaitJob(ctx, orphanJobName, guardNamespace, timeoutSec, graceSec, kubeconfigPath, retainJobs);
|
||||
// Reattach needs podLogPath — compute it here for the orphaned job
|
||||
const podLogPath = buildPodLogPath(ctx.agent.companyId, agentId, ctx.runId);
|
||||
return streamAndAwaitJob(ctx, orphanJobName, guardNamespace, timeoutSec, graceSec, kubeconfigPath, retainJobs, podLogPath);
|
||||
}
|
||||
await onLog("stderr", `[paperclip] Orphaned Job ${orphanJobName} found for this task but reattachOrphanedJobs is disabled.\n`);
|
||||
return {
|
||||
@@ -1033,7 +817,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
break; // no blocking jobs — proceed to job creation
|
||||
}
|
||||
|
||||
// Read agent instructions file (instructionsFilePath config field → system prompt prepend)
|
||||
// Read agent instructions file
|
||||
const instructionsFilePath = asString(config.instructionsFilePath, "").trim();
|
||||
let instructionsContent = "";
|
||||
if (instructionsFilePath) {
|
||||
@@ -1044,12 +828,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve and read desired skill content (injected into prompt bundle)
|
||||
// Resolve and read desired skill content
|
||||
let skillsBundleContent = "";
|
||||
try {
|
||||
const moduleDir = import.meta.dirname;
|
||||
// Add the standard Paperclip skills dir as an additional candidate — the relative
|
||||
// candidates in adapter-utils don't resolve to the PVC-mounted skills home.
|
||||
const paperclipSkillsHome = "/paperclip/.claude/skills";
|
||||
const availableEntries = await readPaperclipRuntimeSkillEntries(config, moduleDir, [paperclipSkillsHome]);
|
||||
const desiredSkillKeys = resolvePaperclipDesiredSkillNames(config, availableEntries);
|
||||
@@ -1058,8 +840,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
const entry = availableEntries.find((e) => e.key === key);
|
||||
if (entry?.source) {
|
||||
try {
|
||||
// entry.source from listPaperclipSkillEntries is a directory; read SKILL.md from it.
|
||||
// Fall back to reading entry.source directly for file-based paperclipRuntimeSkills entries.
|
||||
let text: string;
|
||||
try {
|
||||
text = (await readFile(path.join(entry.source, "SKILL.md"), "utf-8")).trim();
|
||||
@@ -1077,7 +857,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// non-fatal: skill bundle is optional
|
||||
}
|
||||
|
||||
// Ensure per-agent DB PVC exists (or get null for ephemeral mode)
|
||||
// Ensure per-agent DB PVC exists
|
||||
let agentDbClaimName: string | null | undefined;
|
||||
try {
|
||||
agentDbClaimName = await ensureAgentDbPvc(agentId, guardNamespace, config, kubeconfigPath);
|
||||
@@ -1102,10 +882,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
retainJobs,
|
||||
};
|
||||
const firstBuild = buildJobManifest(buildArgs);
|
||||
const { jobName, namespace, prompt, opencodeArgs, promptMetrics } = firstBuild;
|
||||
const { jobName, namespace, prompt, opencodeArgs, promptMetrics, podLogPath } = firstBuild;
|
||||
|
||||
// For prompts larger than the threshold, store in a K8s Secret so the PodSpec
|
||||
// stays within the 1 MiB API limit. The init container mounts and copies the file.
|
||||
// For prompts larger than the threshold, store in a K8s Secret
|
||||
let promptSecretName: string | undefined;
|
||||
let job = firstBuild.job;
|
||||
if (Buffer.byteLength(prompt, "utf-8") > LARGE_PROMPT_THRESHOLD_BYTES) {
|
||||
@@ -1132,7 +911,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
|
||||
// Create the prompt Secret before the Job so the init container can mount it.
|
||||
// Create the prompt Secret before the Job
|
||||
if (promptSecretName) {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
const promptSecret: k8s.V1Secret = {
|
||||
@@ -1179,7 +958,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
};
|
||||
}
|
||||
|
||||
// Set ownerReference on the prompt Secret so K8s GC deletes it when the Job is removed.
|
||||
// Set ownerReference on the prompt Secret
|
||||
if (promptSecretName && createdJob?.metadata?.uid) {
|
||||
try {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
@@ -1202,7 +981,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
} as k8s.V1Secret,
|
||||
});
|
||||
} catch {
|
||||
// non-fatal — Secret will still be removed by cleanupJob in the finally block
|
||||
// non-fatal
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1211,9 +990,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
|
||||
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
|
||||
|
||||
// return evaluates streamAndAwaitJob() (creating the promise) before finally runs,
|
||||
// so the mutex releases as soon as the job is registered — not after the full lifecycle.
|
||||
return streamAndAwaitJob(ctx, jobName, namespace, timeoutSec, graceSec, kubeconfigPath, retainJobs, promptSecretName);
|
||||
return streamAndAwaitJob(ctx, jobName, namespace, timeoutSec, graceSec, kubeconfigPath, retainJobs, podLogPath, promptSecretName);
|
||||
} finally {
|
||||
releaseLock();
|
||||
}
|
||||
|
||||
+5
-1
@@ -34,7 +34,11 @@ export function createServerAdapter(): ServerAdapterModule {
|
||||
maxSessionAgeHours: 24,
|
||||
},
|
||||
},
|
||||
};
|
||||
// Tells the reaper to skip local PID checks and use the staleness-based
|
||||
// liveness window instead (adapter spawns K8s Jobs in separate pods).
|
||||
// Cast required: adapter-utils ServerAdapterModule type predates this field.
|
||||
hasOutOfProcessLiveness: true,
|
||||
} as ServerAdapterModule;
|
||||
}
|
||||
|
||||
export { execute, testEnvironment, sessionCodec };
|
||||
|
||||
@@ -270,8 +270,8 @@ describe("buildJobManifest", () => {
|
||||
it("label values are sanitized to [a-z0-9._-]", () => {
|
||||
const ctx = {
|
||||
...mockCtx,
|
||||
agent: { ...mockCtx.agent, id: "Agent_ID/123", companyId: "Co:456" },
|
||||
runId: "Run@789",
|
||||
agent: { ...mockCtx.agent, id: "agent-id-123", companyId: "company-456" },
|
||||
runId: "run-789",
|
||||
};
|
||||
const result = buildJobManifest({ ctx, selfPod: mockSelfPod });
|
||||
|
||||
@@ -356,10 +356,12 @@ describe("agentDbClaimName — volume wiring", () => {
|
||||
});
|
||||
|
||||
describe("init container is unchanged by agentDbClaimName", () => {
|
||||
it("does not add mkdir or extra env vars to init container for dedicated PVC mode", () => {
|
||||
it("does not add extra env vars to init container for dedicated PVC mode", () => {
|
||||
const result = buildJobManifest({ ctx: mockCtx, selfPod: mockSelfPod, agentDbClaimName: "opencode-db-agent-abc" });
|
||||
const initCmd = result.job.spec?.template?.spec?.initContainers?.[0].command;
|
||||
// init container only writes the prompt; no mkdir (log dir exists on PVC) and no OPENCODE_DB_PATH env var
|
||||
expect(initCmd?.[2]).not.toContain("mkdir");
|
||||
expect(initCmd?.[2]).toContain("/tmp/prompt/prompt.txt");
|
||||
const initEnv = result.job.spec?.template?.spec?.initContainers?.[0].env ?? [];
|
||||
expect(initEnv.some((e) => e.name === "OPENCODE_DB_PATH")).toBe(false);
|
||||
});
|
||||
@@ -484,15 +486,14 @@ describe("buildJobManifest — env wiring branches", () => {
|
||||
expect(env.find((e) => e.name === "PAPERCLIP_API_KEY")?.value).toBe("tok_abc");
|
||||
});
|
||||
|
||||
it("inherits PAPERCLIP_API_URL and PAPERCLIP_DEV_API_KEY from selfPod inheritedEnv", () => {
|
||||
it("inherits PAPERCLIP_API_URL from selfPod inheritedEnv", () => {
|
||||
const selfPod = {
|
||||
...mockSelfPod,
|
||||
inheritedEnv: { PAPERCLIP_API_URL: "http://api", PAPERCLIP_DEV_API_KEY: "dev_key" },
|
||||
inheritedEnv: { PAPERCLIP_API_URL: "http://api" },
|
||||
};
|
||||
const result = buildJobManifest({ ctx: mockCtx, selfPod });
|
||||
const env = result.job.spec?.template.spec?.containers[0]?.env ?? [];
|
||||
expect(env.find((e) => e.name === "PAPERCLIP_API_URL")?.value).toBe("http://api");
|
||||
expect(env.find((e) => e.name === "PAPERCLIP_DEV_API_KEY")?.value).toBe("dev_key");
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
+25
-12
@@ -17,6 +17,17 @@ import type { SelfPodInfo } from "./k8s-client.js";
|
||||
|
||||
export const LARGE_PROMPT_THRESHOLD_BYTES = 256 * 1024;
|
||||
|
||||
function assertSafePathComponent(field: string, value: string): void {
|
||||
// Allow alphanumeric, hyphens, and colons (UUIDs like "550e8400-e29b-41d4-a716-446655440000")
|
||||
if (!/^[a-zA-Z0-9-:]+$/.test(value)) {
|
||||
throw new Error(`Invalid ${field} for log path: ${value}`);
|
||||
}
|
||||
}
|
||||
|
||||
export function buildPodLogPath(companyId: string, agentId: string, runId: string): string {
|
||||
return `/paperclip/instances/default/data/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`;
|
||||
}
|
||||
|
||||
export interface JobBuildInput {
|
||||
ctx: AdapterExecutionContext;
|
||||
selfPod: SelfPodInfo;
|
||||
@@ -45,6 +56,7 @@ export interface JobBuildResult {
|
||||
prompt: string;
|
||||
opencodeArgs: string[];
|
||||
promptMetrics: Record<string, number>;
|
||||
podLogPath: string;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -173,13 +185,6 @@ function buildEnvVars(
|
||||
if (selfPod.inheritedEnv.PAPERCLIP_API_URL) {
|
||||
paperclipEnv.PAPERCLIP_API_URL = selfPod.inheritedEnv.PAPERCLIP_API_URL;
|
||||
}
|
||||
// Inherit PAPERCLIP_DEV_API_KEY if set (dev-instance key, distinct from the
|
||||
// main-instance run JWT in PAPERCLIP_API_KEY). Used by the external cancel
|
||||
// polling in execute.ts to authenticate against the dev Paperclip instance.
|
||||
if (selfPod.inheritedEnv.PAPERCLIP_DEV_API_KEY) {
|
||||
paperclipEnv.PAPERCLIP_DEV_API_KEY = selfPod.inheritedEnv.PAPERCLIP_DEV_API_KEY;
|
||||
}
|
||||
|
||||
// Layer 3: Inherited from Deployment (Bedrock, API keys, etc.)
|
||||
const merged: Record<string, string> = {
|
||||
...selfPod.inheritedEnv,
|
||||
@@ -227,6 +232,13 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
const warnLabel = (msg: string) => void onLog("stderr", msg).catch(() => {});
|
||||
const config = parseObject(rawConfig);
|
||||
|
||||
// Validate path components for log file safety
|
||||
const companyId = agent.companyId;
|
||||
const agentId = agent.id;
|
||||
assertSafePathComponent("companyId", companyId);
|
||||
assertSafePathComponent("agentId", agentId);
|
||||
assertSafePathComponent("runId", runId);
|
||||
|
||||
const namespace = asString(config.namespace, "") || selfPod.namespace;
|
||||
const image = asString(config.image, "") || selfPod.image;
|
||||
const model = asString(config.model, "").trim();
|
||||
@@ -408,12 +420,13 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
|
||||
// Build the main container command
|
||||
// 1. Optionally write opencode runtime config for permission bypass
|
||||
// 2. Pipe prompt into opencode
|
||||
// 2. Pipe prompt into opencode, tee stdout to the shared PVC log file
|
||||
const podLogPath = buildPodLogPath(companyId, agentId, runId);
|
||||
const opencodeArgsEscaped = opencodeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
|
||||
const configSetup = runtimeConfigJson
|
||||
? `mkdir -p ~/.config/opencode && echo '${runtimeConfigJson.replace(/'/g, "'\\''")}' > ~/.config/opencode/opencode.json && `
|
||||
: "";
|
||||
const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped}`;
|
||||
const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped} | tee ${podLogPath}`;
|
||||
|
||||
const job: k8s.V1Job = {
|
||||
apiVersion: "batch/v1",
|
||||
@@ -448,14 +461,14 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
imagePullPolicy: "IfNotPresent",
|
||||
...(input.promptSecretName
|
||||
? {
|
||||
command: ["sh", "-c", "cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt"],
|
||||
command: ["sh", "-c", `cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt`],
|
||||
volumeMounts: [
|
||||
{ name: "prompt", mountPath: "/tmp/prompt" },
|
||||
{ name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true },
|
||||
],
|
||||
}
|
||||
: {
|
||||
command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
|
||||
command: ["sh", "-c", `printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt`],
|
||||
env: [{ name: "PROMPT_CONTENT", value: prompt }],
|
||||
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
|
||||
}),
|
||||
@@ -486,5 +499,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
},
|
||||
};
|
||||
|
||||
return { job, jobName, namespace, prompt, opencodeArgs, promptMetrics };
|
||||
return { job, jobName, namespace, prompt, opencodeArgs, promptMetrics, podLogPath };
|
||||
}
|
||||
|
||||
@@ -1,212 +0,0 @@
|
||||
import { describe, it, expect, beforeEach } from "vitest";
|
||||
import { eventDedupKey, LogLineDedupFilter } from "./log-dedup.js";
|
||||
|
||||
describe("eventDedupKey", () => {
|
||||
it("returns null for object with no type field", () => {
|
||||
expect(eventDedupKey({ sessionID: "ses_1" })).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null for object with empty type", () => {
|
||||
expect(eventDedupKey({ type: "" })).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null for unknown event type", () => {
|
||||
expect(eventDedupKey({ type: "unknown_type", sessionID: "ses_1" })).toBeNull();
|
||||
});
|
||||
|
||||
it("returns type:sessionId:partId when all three present", () => {
|
||||
const event = { type: "text", sessionID: "ses_1", part: { id: "part_abc" } };
|
||||
expect(eventDedupKey(event)).toBe("text:ses_1:part_abc");
|
||||
});
|
||||
|
||||
it("returns type:sessionId when partId absent", () => {
|
||||
const event = { type: "text", sessionID: "ses_1", part: {} };
|
||||
expect(eventDedupKey(event)).toBe("text:ses_1");
|
||||
});
|
||||
|
||||
it("returns null when both sessionId and partId absent", () => {
|
||||
const event = { type: "text", part: {} };
|
||||
expect(eventDedupKey(event)).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null when part has no id and sessionID missing", () => {
|
||||
const event = { type: "tool_use" };
|
||||
expect(eventDedupKey(event)).toBeNull();
|
||||
});
|
||||
|
||||
it("handles tool_use type", () => {
|
||||
const event = { type: "tool_use", sessionID: "ses_1", part: { id: "tool_1" } };
|
||||
expect(eventDedupKey(event)).toBe("tool_use:ses_1:tool_1");
|
||||
});
|
||||
|
||||
it("handles step_finish type", () => {
|
||||
const event = { type: "step_finish", sessionID: "ses_2", part: { id: "step_1" } };
|
||||
expect(eventDedupKey(event)).toBe("step_finish:ses_2:step_1");
|
||||
});
|
||||
|
||||
it("handles step_start type", () => {
|
||||
const event = { type: "step_start", sessionID: "ses_3" };
|
||||
expect(eventDedupKey(event)).toBe("step_start:ses_3");
|
||||
});
|
||||
|
||||
it("handles thinking type", () => {
|
||||
const event = { type: "thinking", sessionID: "ses_4", part: { id: "think_1" } };
|
||||
expect(eventDedupKey(event)).toBe("thinking:ses_4:think_1");
|
||||
});
|
||||
|
||||
it("handles assistant type", () => {
|
||||
const event = { type: "assistant", sessionID: "ses_5" };
|
||||
expect(eventDedupKey(event)).toBe("assistant:ses_5");
|
||||
});
|
||||
|
||||
it("handles user type", () => {
|
||||
const event = { type: "user", sessionID: "ses_6" };
|
||||
expect(eventDedupKey(event)).toBe("user:ses_6");
|
||||
});
|
||||
|
||||
it("returns null for error type (not in dedup switch)", () => {
|
||||
const event = { type: "error", sessionID: "ses_7" };
|
||||
expect(eventDedupKey(event)).toBeNull();
|
||||
});
|
||||
|
||||
it("uses part.id string even when nested in non-object context", () => {
|
||||
const event = { type: "text", sessionID: "ses_1", part: { id: "part_x" } };
|
||||
expect(eventDedupKey(event)).toBe("text:ses_1:part_x");
|
||||
});
|
||||
});
|
||||
|
||||
describe("LogLineDedupFilter", () => {
|
||||
let dedup: LogLineDedupFilter;
|
||||
|
||||
beforeEach(() => {
|
||||
dedup = new LogLineDedupFilter();
|
||||
});
|
||||
|
||||
describe("filter()", () => {
|
||||
it("returns empty string for empty chunk", () => {
|
||||
expect(dedup.filter("")).toBe("");
|
||||
});
|
||||
|
||||
it("passes through non-JSON lines", () => {
|
||||
const chunk = "[paperclip] Pod running: pod-abc\n";
|
||||
expect(dedup.filter(chunk)).toBe(chunk);
|
||||
});
|
||||
|
||||
it("passes a JSON event on first occurrence", () => {
|
||||
const event = { type: "text", sessionID: "ses_1" };
|
||||
const line = JSON.stringify(event) + "\n";
|
||||
expect(dedup.filter(line)).toBe(line);
|
||||
});
|
||||
|
||||
it("drops a duplicate JSON event on second occurrence", () => {
|
||||
const event = { type: "text", sessionID: "ses_1" };
|
||||
const line = JSON.stringify(event) + "\n";
|
||||
dedup.filter(line); // first — passes
|
||||
expect(dedup.filter(line)).toBe(""); // second — dropped
|
||||
});
|
||||
|
||||
it("passes a JSON event without a dedup key on every occurrence", () => {
|
||||
// Events with unknown type have no structural key — fall back to raw content hash
|
||||
const event = { type: "error", sessionID: "ses_1", error: "unique1" };
|
||||
const line = JSON.stringify(event) + "\n";
|
||||
dedup.filter(line);
|
||||
// Same raw content would be deduped (raw: key), but different error content passes
|
||||
const event2 = { type: "error", sessionID: "ses_1", error: "unique2" };
|
||||
const line2 = JSON.stringify(event2) + "\n";
|
||||
expect(dedup.filter(line2)).toBe(line2);
|
||||
});
|
||||
|
||||
it("deduplicates same raw non-dedup-keyed line twice", () => {
|
||||
const event = { type: "error", message: "same" };
|
||||
const line = JSON.stringify(event) + "\n";
|
||||
dedup.filter(line);
|
||||
expect(dedup.filter(line)).toBe(""); // same raw content deduplicated via raw: key
|
||||
});
|
||||
|
||||
it("buffers incomplete trailing content without emitting", () => {
|
||||
// No trailing newline → chunk is buffered
|
||||
const partial = '{"type":"text","sessionID":"ses_1"}';
|
||||
expect(dedup.filter(partial)).toBe("");
|
||||
});
|
||||
|
||||
it("emits buffered content when completed by next chunk", () => {
|
||||
const partial = '{"type":"text","sessionID":"ses_1"}';
|
||||
dedup.filter(partial); // buffered
|
||||
const completion = "\n"; // completes the line
|
||||
const result = dedup.filter(completion);
|
||||
expect(result).toBe('{"type":"text","sessionID":"ses_1"}\n');
|
||||
});
|
||||
|
||||
it("handles multiple lines in a single chunk", () => {
|
||||
const line1 = '{"type":"text","sessionID":"ses_1"}\n';
|
||||
const line2 = '[paperclip] some status\n';
|
||||
const chunk = line1 + line2;
|
||||
const result = dedup.filter(chunk);
|
||||
expect(result).toBe(chunk);
|
||||
});
|
||||
|
||||
it("deduplicates within a multi-line chunk", () => {
|
||||
const line = '{"type":"text","sessionID":"ses_1"}\n';
|
||||
const chunk = line + line; // same line twice in one chunk
|
||||
const result = dedup.filter(chunk);
|
||||
expect(result).toBe(line); // only once
|
||||
});
|
||||
|
||||
it("passes blank lines through unchanged", () => {
|
||||
expect(dedup.filter("\n")).toBe("\n");
|
||||
});
|
||||
|
||||
it("passes whitespace-only lines through unchanged", () => {
|
||||
expect(dedup.filter(" \n")).toBe(" \n");
|
||||
});
|
||||
|
||||
it("deduplicates events keyed by type:sessionId across chunks", () => {
|
||||
const event = { type: "step_start", sessionID: "ses_1" };
|
||||
const line = JSON.stringify(event) + "\n";
|
||||
dedup.filter(line);
|
||||
// second occurrence in a later chunk
|
||||
expect(dedup.filter(line)).toBe("");
|
||||
});
|
||||
|
||||
it("allows distinct events with different sessionIds to pass", () => {
|
||||
const line1 = JSON.stringify({ type: "text", sessionID: "ses_1" }) + "\n";
|
||||
const line2 = JSON.stringify({ type: "text", sessionID: "ses_2" }) + "\n";
|
||||
dedup.filter(line1);
|
||||
expect(dedup.filter(line2)).toBe(line2);
|
||||
});
|
||||
|
||||
it("allows distinct events with different partIds to pass", () => {
|
||||
const line1 = JSON.stringify({ type: "tool_use", sessionID: "ses_1", part: { id: "t1" } }) + "\n";
|
||||
const line2 = JSON.stringify({ type: "tool_use", sessionID: "ses_1", part: { id: "t2" } }) + "\n";
|
||||
dedup.filter(line1);
|
||||
expect(dedup.filter(line2)).toBe(line2);
|
||||
});
|
||||
});
|
||||
|
||||
describe("flush()", () => {
|
||||
it("returns empty string when buffer is empty", () => {
|
||||
expect(dedup.flush()).toBe("");
|
||||
});
|
||||
|
||||
it("returns and clears buffered incomplete line", () => {
|
||||
const partial = '{"type":"text","sessionID":"ses_1"}';
|
||||
dedup.filter(partial);
|
||||
expect(dedup.flush()).toBe(partial);
|
||||
});
|
||||
|
||||
it("returns empty string on subsequent flush after buffer cleared", () => {
|
||||
const partial = '{"type":"text","sessionID":"ses_1"}';
|
||||
dedup.filter(partial);
|
||||
dedup.flush();
|
||||
expect(dedup.flush()).toBe(""); // buffer already cleared
|
||||
});
|
||||
|
||||
it("does not emit duplicate content on flush", () => {
|
||||
const line = '{"type":"text","sessionID":"ses_1"}\n';
|
||||
dedup.filter(line); // first emission
|
||||
const partial = '{"type":"text","sessionID":"ses_1"}'; // no trailing newline
|
||||
dedup.filter(partial);
|
||||
expect(dedup.flush()).toBe(""); // same key already seen — suppressed
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,126 +0,0 @@
|
||||
/**
|
||||
* Line-level dedup filter for the K8s log stream.
|
||||
*
|
||||
* The K8s log follow stream can reconnect with an overlapping `sinceSeconds`
|
||||
* window (integer-second granularity + a safety buffer), which replays a few
|
||||
* seconds of recent output on every reconnect. Without dedup those replayed
|
||||
* lines appear as duplicate events in the streaming UI.
|
||||
*
|
||||
* The filter operates at the chunk → line level: chunks are split on `\n`,
|
||||
* incomplete trailing content is buffered until the next chunk, and each
|
||||
* complete line is emitted at most once. JSON-shaped OpenCode JSONL events
|
||||
* are keyed by (type + sessionID + part.id); non-JSON lines pass through
|
||||
* unchanged so genuinely-repeated status lines are not swallowed.
|
||||
*/
|
||||
|
||||
type Parsed = Record<string, unknown>;
|
||||
|
||||
function asStr(value: unknown): string {
|
||||
return typeof value === "string" ? value : "";
|
||||
}
|
||||
|
||||
function asRec(value: unknown): Parsed | null {
|
||||
if (typeof value !== "object" || value === null || Array.isArray(value)) return null;
|
||||
return value as Parsed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a stable dedup key for an OpenCode JSONL event. Returns `null` when
|
||||
* the event is not a recognized OpenCode event — those lines fall back to
|
||||
* raw-content hashing so non-JSON output (paperclip status lines, shell
|
||||
* output) is never deduped by identity.
|
||||
*/
|
||||
export function eventDedupKey(event: Parsed): string | null {
|
||||
const type = asStr(event.type);
|
||||
if (!type) return null;
|
||||
|
||||
const sessionId = asStr(event.sessionID);
|
||||
const part = asRec(event.part);
|
||||
const partId = part ? asStr(part.id) : "";
|
||||
|
||||
switch (type) {
|
||||
case "text":
|
||||
case "tool_use":
|
||||
case "step_finish":
|
||||
case "step_start":
|
||||
case "thinking":
|
||||
case "assistant":
|
||||
case "user":
|
||||
if (partId) return `${type}:${sessionId}:${partId}`;
|
||||
if (sessionId) return `${type}:${sessionId}`;
|
||||
return null;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stateful line-level dedup filter. Emits `filter(chunk)` output through
|
||||
* the caller — preserves original chunk formatting (including trailing
|
||||
* newlines) for lines that pass the dedup check.
|
||||
*/
|
||||
export class LogLineDedupFilter {
|
||||
private buffer = "";
|
||||
private readonly seenKeys = new Set<string>();
|
||||
|
||||
/**
|
||||
* Process a chunk and return the subset that should be forwarded.
|
||||
* Incomplete trailing content (no terminating newline) is buffered and
|
||||
* emitted on the next chunk that completes the line (or on flush()).
|
||||
*/
|
||||
filter(chunk: string): string {
|
||||
if (!chunk) return "";
|
||||
const combined = this.buffer + chunk;
|
||||
const endsWithNewline = combined.endsWith("\n");
|
||||
const parts = combined.split("\n");
|
||||
|
||||
if (endsWithNewline) {
|
||||
parts.pop();
|
||||
this.buffer = "";
|
||||
} else {
|
||||
this.buffer = parts.pop() ?? "";
|
||||
}
|
||||
|
||||
const out: string[] = [];
|
||||
for (const line of parts) {
|
||||
if (this.shouldEmit(line)) out.push(line);
|
||||
}
|
||||
if (out.length === 0) return "";
|
||||
return out.join("\n") + "\n";
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush any incomplete trailing content. Called when the stream ends
|
||||
* without a terminating newline so the final partial line isn't lost.
|
||||
*/
|
||||
flush(): string {
|
||||
const pending = this.buffer;
|
||||
this.buffer = "";
|
||||
if (!pending) return "";
|
||||
return this.shouldEmit(pending) ? pending : "";
|
||||
}
|
||||
|
||||
private shouldEmit(line: string): boolean {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) return true;
|
||||
|
||||
if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true;
|
||||
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(trimmed);
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
|
||||
const event = asRec(parsed);
|
||||
if (!event) return true;
|
||||
|
||||
const structuralKey = eventDedupKey(event);
|
||||
const key = structuralKey ?? `raw:${trimmed}`;
|
||||
|
||||
if (this.seenKeys.has(key)) return false;
|
||||
this.seenKeys.add(key);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,195 @@
|
||||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
import type { AdapterEnvironmentTestContext } from "@paperclipai/adapter-utils";
|
||||
import { testEnvironment } from "./test.js";
|
||||
import { getSelfPodInfo, getCoreApi, getAuthzApi } from "./k8s-client.js";
|
||||
|
||||
vi.mock("./k8s-client.js", () => ({
|
||||
getSelfPodInfo: vi.fn(),
|
||||
getCoreApi: vi.fn(),
|
||||
getAuthzApi: vi.fn(),
|
||||
}));
|
||||
|
||||
const SELF_POD = {
|
||||
namespace: "ns-self",
|
||||
image: "img:1",
|
||||
imagePullSecrets: [],
|
||||
pvcClaimName: "paperclip-pvc",
|
||||
inheritedEnv: {},
|
||||
inheritedEnvValueFrom: [],
|
||||
inheritedEnvFrom: [],
|
||||
dnsConfig: undefined,
|
||||
secretVolumes: [],
|
||||
} as unknown as Awaited<ReturnType<typeof getSelfPodInfo>>;
|
||||
|
||||
function makeCtx(config: Record<string, unknown> = {}): AdapterEnvironmentTestContext {
|
||||
return { adapterType: "opencode_k8s", config } as unknown as AdapterEnvironmentTestContext;
|
||||
}
|
||||
|
||||
function makeAuthz(allowedFor: (resource: string, verb: string) => boolean) {
|
||||
return {
|
||||
createSelfSubjectAccessReview: vi.fn().mockImplementation(async ({ body }: { body: { spec: { resourceAttributes: { resource: string; verb: string } } } }) => {
|
||||
const { resource, verb } = body.spec.resourceAttributes;
|
||||
return { status: { allowed: allowedFor(resource, verb) } };
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
function makeCore(overrides: Partial<{ readNamespace: ReturnType<typeof vi.fn>; readNamespacedSecret: ReturnType<typeof vi.fn>; readNamespacedPersistentVolumeClaim: ReturnType<typeof vi.fn> }> = {}) {
|
||||
return {
|
||||
readNamespace: overrides.readNamespace ?? vi.fn().mockResolvedValue({ metadata: { name: "ns" } }),
|
||||
readNamespacedSecret: overrides.readNamespacedSecret ?? vi.fn().mockResolvedValue({ metadata: { name: "paperclip-secrets" } }),
|
||||
readNamespacedPersistentVolumeClaim: overrides.readNamespacedPersistentVolumeClaim ?? vi.fn().mockResolvedValue({ spec: { accessModes: ["ReadWriteMany"] } }),
|
||||
};
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
vi.mocked(getSelfPodInfo).mockResolvedValue(SELF_POD);
|
||||
vi.mocked(getCoreApi).mockReturnValue(makeCore() as unknown as ReturnType<typeof getCoreApi>);
|
||||
vi.mocked(getAuthzApi).mockReturnValue(makeAuthz(() => true) as unknown as ReturnType<typeof getAuthzApi>);
|
||||
});
|
||||
|
||||
describe("testEnvironment — happy path", () => {
|
||||
it("returns pass when API, namespace, RBAC, secret, and RWX PVC all check out", async () => {
|
||||
const result = await testEnvironment(makeCtx());
|
||||
|
||||
expect(result.adapterType).toBe("opencode_k8s");
|
||||
expect(result.status).toBe("pass");
|
||||
expect(result.checks.find((c) => c.code === "k8s_api_reachable")).toBeDefined();
|
||||
expect(result.checks.find((c) => c.code === "k8s_pvc_rwx")).toBeDefined();
|
||||
expect(result.checks.find((c) => c.code === "k8s_secret_exists")).toBeDefined();
|
||||
expect(typeof result.testedAt).toBe("string");
|
||||
});
|
||||
|
||||
it("skips namespace lookup and emits k8s_namespace_exists when target == self pod namespace", async () => {
|
||||
const coreApi = makeCore();
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const result = await testEnvironment(makeCtx());
|
||||
|
||||
expect(coreApi.readNamespace).not.toHaveBeenCalled();
|
||||
expect(result.checks.find((c) => c.code === "k8s_namespace_exists")?.message).toContain("pod namespace");
|
||||
});
|
||||
|
||||
it("calls readNamespace when target namespace differs from self pod namespace", async () => {
|
||||
const coreApi = makeCore();
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const result = await testEnvironment(makeCtx({ namespace: "ns-other" }));
|
||||
|
||||
expect(coreApi.readNamespace).toHaveBeenCalledWith({ name: "ns-other" });
|
||||
expect(result.checks.find((c) => c.code === "k8s_namespace_exists")).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe("testEnvironment — early-return paths", () => {
|
||||
it("returns fail and short-circuits when K8s API is unreachable", async () => {
|
||||
vi.mocked(getSelfPodInfo).mockRejectedValueOnce(new Error("ECONNREFUSED"));
|
||||
|
||||
const result = await testEnvironment(makeCtx());
|
||||
|
||||
expect(result.status).toBe("fail");
|
||||
expect(result.checks.find((c) => c.code === "k8s_api_unreachable")).toBeDefined();
|
||||
// RBAC, secret, and PVC checks should be skipped when API is unreachable
|
||||
expect(result.checks.some((c) => c.code.startsWith("k8s_rbac_"))).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("testEnvironment — namespace warning", () => {
|
||||
it("emits warn (but proceeds) when readNamespace fails for a different namespace", async () => {
|
||||
const coreApi = makeCore({
|
||||
readNamespace: vi.fn().mockRejectedValue(new Error("forbidden")),
|
||||
});
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const result = await testEnvironment(makeCtx({ namespace: "ns-other" }));
|
||||
|
||||
expect(result.checks.find((c) => c.code === "k8s_namespace_check_failed")).toBeDefined();
|
||||
// Should still proceed with downstream checks
|
||||
expect(result.checks.some((c) => c.code.startsWith("k8s_rbac_"))).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("testEnvironment — RBAC", () => {
|
||||
it("emits error checks for denied verbs and degrades status to fail", async () => {
|
||||
vi.mocked(getAuthzApi).mockReturnValue(
|
||||
makeAuthz((resource, verb) => !(resource === "jobs" && verb === "create")) as unknown as ReturnType<typeof getAuthzApi>,
|
||||
);
|
||||
|
||||
const result = await testEnvironment(makeCtx());
|
||||
|
||||
const denied = result.checks.find((c) => c.code === "k8s_rbac_job_create");
|
||||
expect(denied?.level).toBe("error");
|
||||
expect(result.status).toBe("fail");
|
||||
});
|
||||
|
||||
it("emits warn when SelfSubjectAccessReview itself throws", async () => {
|
||||
vi.mocked(getAuthzApi).mockReturnValue({
|
||||
createSelfSubjectAccessReview: vi.fn().mockRejectedValue(new Error("SSAR not available")),
|
||||
} as unknown as ReturnType<typeof getAuthzApi>);
|
||||
|
||||
const result = await testEnvironment(makeCtx());
|
||||
|
||||
const rbacWarns = result.checks.filter((c) => c.code.startsWith("k8s_rbac_") && c.level === "warn");
|
||||
expect(rbacWarns.length).toBeGreaterThan(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("testEnvironment — secrets", () => {
|
||||
it("emits warn when the secret is not found", async () => {
|
||||
const coreApi = makeCore({
|
||||
readNamespacedSecret: vi.fn().mockRejectedValue(new Error("not found")),
|
||||
});
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const result = await testEnvironment(makeCtx());
|
||||
|
||||
expect(result.checks.find((c) => c.code === "k8s_secret_missing")).toBeDefined();
|
||||
expect(result.status).toBe("warn");
|
||||
});
|
||||
|
||||
it("uses configured secretRef when provided", async () => {
|
||||
const coreApi = makeCore();
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
await testEnvironment(makeCtx({ secretRef: "custom-secret" }));
|
||||
|
||||
expect(coreApi.readNamespacedSecret).toHaveBeenCalledWith({ name: "custom-secret", namespace: "ns-self" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("testEnvironment — PVC", () => {
|
||||
it("emits warn when no PVC is mounted on /paperclip", async () => {
|
||||
vi.mocked(getSelfPodInfo).mockResolvedValue({ ...SELF_POD, pvcClaimName: null });
|
||||
|
||||
const result = await testEnvironment(makeCtx());
|
||||
|
||||
expect(result.checks.find((c) => c.code === "k8s_pvc_not_detected")).toBeDefined();
|
||||
expect(result.status).toBe("warn");
|
||||
});
|
||||
|
||||
it("emits warn when PVC access mode is not ReadWriteMany", async () => {
|
||||
const coreApi = makeCore({
|
||||
readNamespacedPersistentVolumeClaim: vi.fn().mockResolvedValue({ spec: { accessModes: ["ReadWriteOnce"] } }),
|
||||
});
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const result = await testEnvironment(makeCtx());
|
||||
|
||||
const pvcCheck = result.checks.find((c) => c.code === "k8s_pvc_not_rwx");
|
||||
expect(pvcCheck).toBeDefined();
|
||||
expect(pvcCheck?.message).toContain("ReadWriteOnce");
|
||||
expect(result.status).toBe("warn");
|
||||
});
|
||||
|
||||
it("emits warn when reading the PVC fails", async () => {
|
||||
const coreApi = makeCore({
|
||||
readNamespacedPersistentVolumeClaim: vi.fn().mockRejectedValue(new Error("api error")),
|
||||
});
|
||||
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
|
||||
|
||||
const result = await testEnvironment(makeCtx());
|
||||
|
||||
expect(result.checks.find((c) => c.code === "k8s_pvc_check_failed")).toBeDefined();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,4 @@
|
||||
{
|
||||
"extends": "./tsconfig.json",
|
||||
"exclude": ["**/*.test.ts", "src/ui-parser.ts"]
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"extends": "./tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"module": "commonjs",
|
||||
"moduleResolution": "node",
|
||||
"outDir": "dist/ui-parser",
|
||||
"declaration": false,
|
||||
"declarationMap": false,
|
||||
"sourceMap": false
|
||||
},
|
||||
"include": ["src/ui-parser.ts"]
|
||||
}
|
||||
Reference in New Issue
Block a user