Compare commits

...

13 Commits

Author SHA1 Message Date
Chris Farhood c71757fbcd 0.2.2 2026-04-30 16:00:46 -04:00
Chris Farhood 098d9f9641 fix(ui-parser): emit as proper CJS subpackage so ESM named imports work
Paperclip's plugin-loader does ESM named imports of parseStdoutLine when
loading opencode_k8s, e.g.:

    import { parseStdoutLine } from "paperclip-adapter-opencode-k8s/ui-parser"

The previous esbuild bundle wrote CJS via __toCommonJS getters, which
cjs-module-lexer can't statically detect — Node fails the link with:

    SyntaxError: The requested module './ui-parser.js' does not provide
    an export named 'parseStdoutLine'

Also, with the package.json `"type": "module"` field, dist/ui-parser.js
was being interpreted as ESM by the loader, compounding the failure.

Fix: emit ui-parser as a proper CJS sub-package.

- Move output to dist/ui-parser/ui-parser.js
- Generate dist/ui-parser/package.json with `{"type":"commonjs"}` so Node
  treats the file as CJS regardless of the parent type:module
- Use `tsc -p tsconfig.ui-parser.json` (module: commonjs) instead of
  esbuild — the output is plain `exports.parseStdoutLine = parseStdoutLine`
  which cjs-module-lexer detects natively
- Update the exports map: `"./ui-parser": "./dist/ui-parser/ui-parser.js"`
- Drop the esbuild devDependency

Verified locally:
- `import { parseStdoutLine } from "...ui-parser"` works (Node 25)
- Read-file-as-text + `new Function(...)` worker pattern still works
- 382/382 tests pass; typecheck clean

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 16:00:42 -04:00
Chris Farhood 65321e091d 0.2.1 2026-04-30 15:25:28 -04:00
Chris Farhood c0c4c3f179 fix(build): exclude *.test.ts from emit so test files aren't shipped
Paperclip's plugin loader links the package as ESM, and dist/ui-parser.test.js
contained `import { parseStdoutLine } from "./ui-parser.js"` — but ui-parser.js
is bundled as CJS (see 480f7cf) so Node's ESM linker can't resolve the named
export. Result: adapter install fails with

    SyntaxError: The requested module './ui-parser.js' does not provide an
    export named 'parseStdoutLine'

Same root cause as c79eea7, just on the test file instead of src/index.ts.

Fix: introduce tsconfig.build.json that extends the base tsconfig and adds
"exclude": ["**/*.test.ts"]. The build script now runs tsc against that
config, so test files don't end up in dist/. tsconfig.json (used by --noEmit
typecheck and vitest) still includes them, so test type-safety is preserved.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 15:25:25 -04:00
Chris Farhood c5b555de17 0.2.0 2026-04-30 14:58:03 -04:00
Chris Farhood e3af8aa83b fix(server): make tailPodLogFile exit on job completion + port c8429cf
- Run tailPodLogFile and waitForJobCompletion in parallel via Promise.allSettled;
  completion sets stopSignal.stopped so the tail loop drains and exits. Without
  this, tailPodLogFile loops forever — the only natural exit was fh.stat()
  throwing on file removal, which never happened during normal job completion.
- Restructure tail loop to read-then-sleep, with a final drain after stopSignal
  is set to capture bytes written between the last poll and terminal state.
- Port the c8429cf fix from paperclip-adapter-claude-k8s:
  * buildPodLogPath now writes to /paperclip/instances/default/data/run-logs/...
    to match the server PVC layout (the /data/ segment was missing).
  * Drop the mkdir -p ... && from both init container command variants — the
    PVC isn't mounted in the init container, so the mkdir was failing with
    exit code 1 and the && short-circuit prevented the prompt copy.
- Test infrastructure:
  * Hoisted fs/promises mock now uses importOriginal so readFile (used for
    skill bundle loading) hits the real implementation.
  * setMockJsonl() lets individual tests inject specific JSONL into the tail's
    read buffer (previously dead constants in the test file).
  * fh.read mock now writes into the caller's buffer instead of returning a
    separate one.
- Add src/server/test.test.ts covering testEnvironment (was 0% → 98.5% stmts).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 14:57:40 -04:00
Chris Farhood bc340bfcc9 fix: correct fs mock with vi.hoisted for proper per-test reset
The vi.mock("node:fs/promises") factory previously used a closure variable
that accumulated across tests despite vi.clearAllMocks(). Switched to
vi.hoisted() with an explicit resetFsMocks() called in beforeEach() so
the read offset counter is properly reset between tests.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-30 13:55:12 -04:00
Chris Farhood c71d0e5eec feat: replace K8s log streaming with PVC filesystem tailing
- Replaced streamPodLogs / streamPodLogsOnce / readPodLogs / waitForPodTermination
  with tailPodLogFile() that polls a shared PVC file path with adaptive cadence
  (250ms active, 1000ms idle after 5 consecutive empty polls)
- Added buildPodLogPath() export and podLogPath to JobBuildResult
- Added assertSafePathComponent with [a-zA-Z0-9-:] allowance for UUIDs
- Updated Job manifest to tee stdout to /paperclip/instances/default/run-logs/<companyId>/<agentId>/<runId>.pod.ndjson
- Added hasOutOfProcessLiveness: true to createServerAdapter (cast required)
- Deleted log-dedup.ts and log-dedup.test.ts entirely
- Removed all LogLineDedupFilter, Writable, and LOG_STREAM_* constants
- Removed completionResult.status workaround (completionWithGrace returns directly)
- Test infrastructure: mocked node:fs/promises to prevent unmocked fs.stat hangs

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-30 13:55:12 -04:00
Chris Farhood d9bc2e513b 0.1.40 2026-04-30 13:50:39 -04:00
Chris Farhood c79eea7ee0 fix(index): drop ESM re-export of parseStdoutLine from CJS ui-parser
dist/ui-parser.js is bundled as CJS (480f7cf, so the sandboxed UI worker
can load it via new Function), but src/index.ts re-exported a named
binding from it as ESM:

    export { parseStdoutLine } from "./ui-parser.js";

Since the package is "type": "module", Node's ESM loader resolves the
import as ESM and can't find named exports on a CJS module bundle —
linking fails at adapter-load time:

    SyntaxError: The requested module './ui-parser.js' does not provide
    an export named 'parseStdoutLine'

The adapter then gets dropped on every Paperclip pod restart with only
claude_k8s surviving. Nothing in the runtime imports parseStdoutLine
from the package root — the plugin-loader serves ui-parser.js to the UI
worker by reading it as a string (server/src/adapters/plugin-loader.ts),
and tests import the TS source directly. Removing the re-export.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 13:50:37 -04:00
Chris Farhood fa6c115be4 0.1.39 2026-04-30 09:03:07 -04:00
Chris Farhood 480f7cf3d1 fix(ui-parser): bundle as CJS so the sandboxed worker can load it
The Paperclip UI loads each adapter's ui-parser.js inside a sandboxed
Web Worker via `new Function(...)` to render the run transcript. The
worker can only evaluate CJS — ESM `export` syntax silently fails to
register `parseStdoutLine`, and the run window falls back to dumping
raw JSONL.

tsc was emitting ESM `export function parseStdoutLine`, so every
published version since the parser was added has shipped a parser the
UI can't load. Add the same esbuild step the claude-k8s adapter uses
(0.2.4) to overwrite dist/ui-parser.js with a CJS bundle that assigns
to module.exports.

Also bump @paperclipai/adapter-utils from a stale 2026.415.0-canary.7
pin to ^2026.428.0 (current stable). All 406 tests pass against the
new types; no API drift in the imported surface.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 09:03:03 -04:00
Chris Farhood 5ed041fd84 Create LOGGINGCHANGE.md 2026-04-27 17:03:22 -04:00
14 changed files with 850 additions and 827 deletions
+361
View File
@@ -0,0 +1,361 @@
You are implementing two coordinated changes to a Paperclip adapter plugin.
The repo is at /Users/Repositories/paperclip-adapter-opencode-k8s on branch
master. Work on a new branch off master — do NOT commit directly to master.
Before you start, read these files fully:
- src/server/execute.ts (~1218 lines; this is the main file you'll edit)
- src/server/job-manifest.ts
- src/server/log-dedup.ts (you will delete this)
- src/server/parse.ts
- src/server/index.ts
- src/index.ts
Run `npm install` and then `npm test`. Confirm green. Note the test count
for later comparison. Do NOT run `npm run build` — CI handles that.
=============================================================================
WHY WE ARE DOING THIS
=============================================================================
Two tightly coupled bugs:
(A) The adapter doesn't declare `hasOutOfProcessLiveness: true` on its
ServerAdapterModule. The revitalize reaper therefore treats it as an
in-process adapter, expects a local child PID, finds none, and marks
every run `process_lost` after 5 minutes of staleness.
(B) The adapter reads pod logs via the Kubernetes log API (follow mode).
At production scale the stream drops every few seconds, exhausting
the 50-reconnect cap within 2.5 minutes. Long runs lose live UI
output, and combined with (A) they fail entirely.
Fix both in one PR:
1. Declare `hasOutOfProcessLiveness: true` in createServerAdapter().
2. Have the pod tee opencode's stdout to a file on the shared PVC, and
have the adapter tail that file from the Paperclip server process.
We are NOT going to:
- wrap the opencode binary
- use hooks
- add a sidecar
- change revitalize
- keep the k8s log API as a fallback
We ARE going to:
- replace k8s log streaming with filesystem tailing entirely
- delete all reconnect logic and the log-dedup filter
- keep `kubectl logs -f` working (tee preserves stdout)
- add the liveness flag so the reaper uses staleness-based liveness
=============================================================================
SCOPE OF CHANGES
=============================================================================
--- hasOutOfProcessLiveness flag (src/server/index.ts) ---
The file today returns a plain object from createServerAdapter(). Add
`hasOutOfProcessLiveness: true` to the returned object, matching the
pattern from paperclip-adapter-claude-k8s. The adapter-utils type predates
this field, so the return needs a cast.
Before (approximately):
export function createServerAdapter(): ServerAdapterModule {
return {
type,
execute,
// ... other fields ...
};
}
After:
export function createServerAdapter(): ServerAdapterModule {
return {
type,
execute,
// ... other fields ...
// Tells the reaper to skip local PID checks and use the staleness-based
// liveness window instead (adapter spawns K8s Jobs in separate pods).
// Cast required: adapter-utils ServerAdapterModule type predates this field.
hasOutOfProcessLiveness: true,
} as ServerAdapterModule;
}
--- Job manifest (src/server/job-manifest.ts) ---
1. MODIFY the main container command to tee stdout. Current code at
approximately line 409:
const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped}`;
Change to:
const podLogPath =
`/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`;
const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped} | tee ${podLogPath}`;
`companyId`, `agentId`, `runId` are already in scope in buildJobManifest
via the destructuring at line 219 (`agent`, `runId`) — use `agent.id`
and `agent.companyId`. If you prefer cleaner code, add a local:
const companyId = agent.companyId;
const agentId = agent.id;
2. MODIFY the init container command to create the parent directory before
the main container starts. The existing init container today writes the
prompt file with `printf`. Amend its command to also `mkdir -p` the log
directory. The init container is at approximately line 444 (prompt
secret path) and line 451 (direct printf path) — there are TWO init
container variants. Amend BOTH to prepend the mkdir:
Variant 1 (large-prompt path, approx line 444):
Before: `cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt`
After: `mkdir -p /paperclip/instances/default/run-logs/${companyId}/${agentId} && cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt`
Variant 2 (direct path, approx line 451):
Before: `printf '%s' "$PROMPT_CONTENT" > /tmp/prompt/prompt.txt`
After: `mkdir -p /paperclip/instances/default/run-logs/${companyId}/${agentId} && printf '%s' "$PROMPT_CONTENT" > /tmp/prompt/prompt.txt`
Use template substitution for companyId/agentId/runId — these are all
in scope in the builder.
3. EXPORT the log path builder so execute.ts can compute the same path:
export function buildPodLogPath(companyId: string, agentId: string, runId: string): string {
return `/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`;
}
Return this path from buildJobManifest alongside other fields in
JobBuildResult (add `podLogPath: string` to the interface at approx
line 46). Update the final `return { job, jobName, namespace, prompt,
opencodeArgs, promptMetrics }` (approx line 482) to include podLogPath.
4. ID SANITIZATION: before using companyId/agentId/runId in the path,
validate they match `^[a-zA-Z0-9-]+$`. Add a helper at the top of
job-manifest.ts:
function assertSafePathComponent(field: string, value: string): void {
if (!/^[a-zA-Z0-9-]+$/.test(value)) {
throw new Error(`Invalid ${field} for log path: ${value}`);
}
}
Call it for companyId, agentId, and runId before computing podLogPath
and before interpolating into the init container commands.
--- Adapter (src/server/execute.ts) ---
1. DELETE the `LogLineDedupFilter` import (approx line 13).
2. DELETE constants (approx lines 19-26):
LOG_STREAM_RECONNECT_DELAY_MS
LOG_STREAM_RECONNECT_MAX_DELAY_MS
MAX_LOG_RECONNECT_ATTEMPTS
LOG_STREAM_BAIL_TIMEOUT_MS
3. DELETE functions:
streamPodLogsOnce (approx line 168)
streamPodLogs (approx line 252)
readPodLogs (approx line 330)
waitForPodTermination (approx line 355) — only used by the fallback
4. DELETE the bail timer machinery inside any function being removed
(bailTimer, bailResolve, bailPromise, stopPoller).
5. DELETE the fallback path in `execute` around lines 675-693:
if (!stdout.trim()) {
// ... waitForPodTermination + readPodLogs fallback
} else if (!parseOpenCodeJsonl(stdout).sessionId) {
// ... partial-stdout fallback
}
6. ADD a new function `tailPodLogFile` in execute.ts. Inline is fine; do
not create a new module. Signature:
interface TailOptions {
onLog: AdapterExecutionContext["onLog"];
stopSignal: { stopped: boolean };
}
async function tailPodLogFile(
filePath: string,
opts: TailOptions,
): Promise<string> { ... }
Behavior:
- Wait up to 30 seconds for the file to exist. Poll with
fs.promises.stat every 250ms. If the file doesn't appear in 30s,
throw an Error: `Pod log file never appeared at ${filePath}`.
- Once it exists, open with fs.promises.open(filePath, 'r').
- Track a byte offset starting at 0.
- Poll loop: 250ms active cadence, backs off to 1000ms if the file
hasn't grown for 5 consecutive polls (reset to 250ms on any
growth). For each poll:
a. stat the file, compare size to offset
b. if size > offset, read bytes from [offset, size) into a Buffer
c. update offset = size
d. concatenate any pending partial line with the new buffer,
split on '\n'
e. last element is the new pending partial line (if no trailing
newline) or empty
f. for every complete line, call onLog("stdout", line + "\n")
and append to an in-memory accumulator (string)
- Exit when opts.stopSignal.stopped === true. Before returning, do
ONE final read-to-EOF to drain tail bytes. Close the handle.
Return the accumulator.
Use fs.promises.open / FileHandle.read / FileHandle.close. Do NOT use
fs.watch or chokidar.
7. REPLACE the existing log-streaming section of `execute`. Find where
streamPodLogs is invoked inside a `Promise.allSettled` with
waitForJobCompletion (approx line 660). Replace that call with
tailPodLogFile. Pattern:
const { /* ..., */ podLogPath } = built;
// ... create secret, create job, wait for pod ...
const stopSignal = { stopped: false };
const [tailResult, completionResult] = await Promise.allSettled([
tailPodLogFile(podLogPath, { onLog, stopSignal }),
waitForJobCompletion(namespace, jobName, ...).then(r => { stopSignal.stopped = true; return r; }),
]);
const stdout = tailResult.status === "fulfilled" ? tailResult.value : "";
Keep waitForJobCompletion unchanged. Keep the existing `keepaliveTimer`
and `cancelSignal` / cancel-polling machinery unchanged — those are
independent of log streaming.
8. ADD log file cleanup. Find `cleanupJob` (the function that deletes the
K8s Job). After successful deletion, best-effort delete the log file:
try { await fs.promises.unlink(podLogPath); } catch { /* non-fatal */ }
Skip the unlink if `retainJobs === true`.
cleanupJob will need podLogPath passed in; thread it from the caller.
--- Delete entire files ---
- src/server/log-dedup.ts
- src/server/log-dedup.test.ts
--- Tests ---
- Delete any execute.test.ts tests covering streamPodLogsOnce,
streamPodLogs, readPodLogs, waitForPodTermination, the bail timer, or
LogLineDedupFilter. Search for those identifiers; remove matching
describe/it blocks. Non-log-streaming tests in the same file stay.
- Add test cases for tailPodLogFile to execute.test.ts. Cover:
1. File appears within 30s; content is tailed line-by-line
2. File never appears; function throws with expected message
3. Partial trailing line buffered and emitted on next poll
4. Stop signal exits the loop; final drain reads remaining bytes
5. Adaptive backoff: idle polls slow; active polls speed up
Use vitest fake timers (vi.useFakeTimers) and a tmpdir via
`fs.mkdtempSync(path.join(os.tmpdir(), 'opencode-tailer-'))`.
=============================================================================
TESTING
=============================================================================
After all changes:
1. `npm run typecheck` — must pass (the `as ServerAdapterModule` cast
may be needed; mirror claude-k8s's pattern)
2. `npm test` — must pass. Test count will drop vs baseline because you
deleted tests. Record the new passing count.
Do NOT run the adapter end-to-end. Do NOT require a k8s cluster.
=============================================================================
BRANCH, COMMIT, PUSH, PR
=============================================================================
1. Create a new branch off master:
git checkout master && git pull && git checkout -b feat/filesystem-log-tail-and-liveness-flag
2. Make all changes above. Commit as ONE commit:
feat: declare hasOutOfProcessLiveness and tail pod log from filesystem
Two coordinated fixes for long-running agent failures:
(1) Declare hasOutOfProcessLiveness: true on the ServerAdapterModule.
Without it the reaper treated this adapter as in-process, expected
a local child PID, and marked every run process_lost after 5min
staleness. Flag tells the reaper to use the staleness-based
liveness window for out-of-process adapters.
(2) Replace k8s log API streaming with filesystem tailing. The k8s
follow stream drops every ~3 seconds at production scale,
exhausting the 50-attempt reconnect cap within 2.5 minutes. Pod
now tees opencode's stdout to
/paperclip/instances/default/run-logs/<companyId>/<agentId>/<runId>.pod.ndjson
on the shared PVC; adapter tails the file directly. kubectl logs -f
still works (tee preserves stdout).
Deletes:
- LogLineDedupFilter and all reconnect logic
- streamPodLogsOnce, streamPodLogs, readPodLogs, waitForPodTermination
- Both fallback paths (empty-stream and missing-sessionId)
Adds:
- tailPodLogFile: adaptive 250ms/1s poll loop with partial-line
buffering and tail-drain on stopSignal
- Log file cleanup tied to retainJobs
- Path-component sanitization (companyId/agentId/runId must match
[a-zA-Z0-9-]+)
Co-Authored-By: Claude Sonnet <noreply@anthropic.com>
3. Push:
git push -u origin feat/filesystem-log-tail-and-liveness-flag
4. Open a PR against master with `gh pr create`:
Title: `feat: declare hasOutOfProcessLiveness and tail pod log from filesystem`
Body (use a heredoc):
## Summary
- Declares `hasOutOfProcessLiveness: true` so the reaper uses
staleness-based liveness instead of expecting a local PID
- Pod tees opencode stdout to PVC; adapter tails the file directly
- Eliminates k8s log API dependency for streaming
- Deletes LogLineDedupFilter, reconnect logic, both fallback paths
## Why
At production scale (144 concurrent runs), two bugs combined:
(a) no liveness flag → reaper marked runs process_lost at 5min
(b) k8s log follow stream drops every ~3s, exhausting the 50-reconnect
cap. Runs over ~2.5min lost live output; over 5min failed outright.
Both must be fixed together — the flag alone doesn't help if the log
stream still drops, and the log tail alone doesn't help if the reaper
kills the run for missing PID.
## Path
`/paperclip/instances/default/run-logs/<companyId>/<agentId>/<runId>.pod.ndjson`
— the `.pod.ndjson` suffix distinguishes the pod-written file from
revitalize's server-side `<runId>.ndjson` log store.
## Breaking
Old Job manifests (pre-tee) are incompatible — the tailer's 30s
"file missing" window will surface an error on in-flight runs at
deploy time. Operator retry required. Consistent with the companion
change in paperclip-adapter-claude-k8s.
## Test plan
- [ ] npm test passes
- [ ] Manual: deploy to cluster, run a >5min agent, confirm live UI
output and no reaper fire
- [ ] Manual: verify kubectl logs -f still works on the Job pod
- [ ] Manual: confirm log file is cleaned up when Job cleanup runs
(retainJobs=false) and preserved when retainJobs=true
=============================================================================
WRAPPING UP
=============================================================================
Report back with:
1. Branch name and commit hash
2. PR URL
3. Final test count (numbers will drop vs baseline because you deleted
tests — record baseline and final)
4. Line count of execute.ts before and after (should drop significantly)
5. Any deviation from these instructions, with reason
If ANY of the following happens, STOP and report instead of improvising:
- A file path doesn't match what's described (e.g. the mainCommand
pattern has changed)
- A function you're supposed to delete has other callers you didn't
expect (streamPodLogsOnce in particular may have test-only imports
that need untangling)
- A test you're supposed to keep depends on something you deleted
- Typecheck fails and the fix is non-obvious
- The `as ServerAdapterModule` cast doesn't satisfy TypeScript
Do NOT push to master. Do NOT tag a version. Do NOT bump package.json
version — leave it as-is.
+7 -7
View File
@@ -1,26 +1,26 @@
{
"name": "paperclip-adapter-opencode-k8s",
"version": "0.1.30",
"version": "0.2.2",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-opencode-k8s",
"version": "0.1.30",
"version": "0.2.2",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
"picocolors": "^1.1.1"
},
"devDependencies": {
"@paperclipai/adapter-utils": "2026.415.0-canary.7",
"@paperclipai/adapter-utils": "^2026.428.0",
"@types/node": "^24.6.0",
"@vitest/coverage-v8": "^4.1.5",
"typescript": "^5.7.3",
"vitest": "^4.1.4"
},
"peerDependencies": {
"@paperclipai/adapter-utils": ">=2026.415.0-canary.7"
"@paperclipai/adapter-utils": ">=2026.428.0"
}
},
"node_modules/@babel/helper-string-parser": {
@@ -223,9 +223,9 @@
}
},
"node_modules/@paperclipai/adapter-utils": {
"version": "2026.415.0-canary.7",
"resolved": "https://registry.npmjs.org/@paperclipai/adapter-utils/-/adapter-utils-2026.415.0-canary.7.tgz",
"integrity": "sha512-VNzIZmu1lrK6QM8Ad9WkOihZItfkj21NHKQf+artDcbwFT2hHbDAD9hdW2W9NMVxYdFvvnws3w76FI/BUbCMbQ==",
"version": "2026.428.0",
"resolved": "https://registry.npmjs.org/@paperclipai/adapter-utils/-/adapter-utils-2026.428.0.tgz",
"integrity": "sha512-kGHpE7rhePPCbnG3OwXbNuHZZuI+XyuFgNSiDnrEeiSbkI2c5XHM2WnWDCZ/NGHULfJW3lWhSxGMFoYqiy38vQ==",
"dev": true,
"license": "MIT"
},
+6 -5
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-opencode-k8s",
"version": "0.1.38",
"version": "0.2.2",
"description": "Paperclip adapter plugin that runs OpenCode agents as Kubernetes Jobs",
"license": "MIT",
"type": "module",
@@ -10,14 +10,15 @@
"exports": {
".": "./dist/index.js",
"./server": "./dist/server/index.js",
"./ui-parser": "./dist/ui-parser.js",
"./ui-parser": "./dist/ui-parser/ui-parser.js",
"./cli": "./dist/cli/index.js"
},
"files": [
"dist"
],
"scripts": {
"build": "tsc",
"build": "tsc -p tsconfig.build.json && npm run build:ui-parser",
"build:ui-parser": "tsc -p tsconfig.ui-parser.json && node -e \"require('node:fs').writeFileSync('dist/ui-parser/package.json', '{\\\"type\\\":\\\"commonjs\\\"}\\n')\"",
"clean": "rm -rf dist",
"typecheck": "tsc --noEmit",
"test": "vitest run",
@@ -28,10 +29,10 @@
"picocolors": "^1.1.1"
},
"peerDependencies": {
"@paperclipai/adapter-utils": ">=2026.415.0-canary.7"
"@paperclipai/adapter-utils": ">=2026.428.0"
},
"devDependencies": {
"@paperclipai/adapter-utils": "2026.415.0-canary.7",
"@paperclipai/adapter-utils": "^2026.428.0",
"@types/node": "^24.6.0",
"@vitest/coverage-v8": "^4.1.5",
"typescript": "^5.7.3",
-1
View File
@@ -64,4 +64,3 @@ Notes:
`;
export { createServerAdapter } from "./server/index.js";
export { parseStdoutLine } from "./ui-parser.js";
+79 -95
View File
@@ -1,20 +1,64 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
import { execute, ensureAgentDbPvc } from "./execute.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi, getPvc, createPvc } from "./k8s-client.js";
import { buildJobManifest } from "./job-manifest.js";
import { execute, ensureAgentDbPvc, tailPodLogFile } from "./execute.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getPvc, createPvc } from "./k8s-client.js";
import { buildJobManifest, buildPodLogPath } from "./job-manifest.js";
// Mock node:fs/promises so tailPodLogFile (used by execute()) reads a
// configurable JSONL payload and returns. Individual tests override the
// payload via setMockJsonl(...) before calling execute().
const { readMock, statMock, fhStatMock, resetFsMocks, setMockJsonl } = vi.hoisted(() => {
const HAPPY = [
JSON.stringify({ type: "text", part: { text: "Task complete" }, sessionID: "ses_happy" }),
JSON.stringify({ type: "step_finish", part: { tokens: { input: 100, output: 50, cache: { read: 20 } }, cost: 0.002 } }),
].join("\n");
let payload = HAPPY;
let buffer = Buffer.from(payload);
let readOffset = 0;
const apply = (next: string) => { payload = next; buffer = Buffer.from(payload); readOffset = 0; };
return {
readMock: vi.fn().mockImplementation(async (buf: Buffer, off: number, len: number, _pos: number) => {
if (readOffset >= buffer.byteLength) return { bytesRead: 0, buffer: buf };
const remaining = buffer.byteLength - readOffset;
const toRead = Math.min(len, remaining);
buffer.copy(buf, off, readOffset, readOffset + toRead);
readOffset += toRead;
return { bytesRead: toRead, buffer: buf };
}),
statMock: vi.fn().mockImplementation(async () => ({ size: buffer.byteLength })),
fhStatMock: vi.fn().mockImplementation(async () => ({ size: buffer.byteLength })),
resetFsMocks: () => { apply(HAPPY); },
setMockJsonl: (jsonl: string) => { apply(jsonl); },
};
});
vi.mock("node:fs/promises", async (importOriginal) => {
const actual = await importOriginal<typeof import("node:fs/promises")>();
return {
...actual,
stat: statMock,
open: vi.fn().mockResolvedValue({
stat: fhStatMock,
read: readMock,
close: vi.fn().mockResolvedValue(undefined),
}),
unlink: vi.fn().mockResolvedValue(undefined),
};
});
vi.mock("./k8s-client.js", () => ({
getSelfPodInfo: vi.fn(),
getBatchApi: vi.fn(),
getCoreApi: vi.fn(),
getLogApi: vi.fn(),
getPvc: vi.fn().mockResolvedValue({ metadata: { name: "opencode-db-agent-id-test" } }),
createPvc: vi.fn().mockResolvedValue({}),
}));
vi.mock("./job-manifest.js", () => ({
buildJobManifest: vi.fn(),
buildPodLogPath: vi.fn((companyId: string, agentId: string, runId: string) =>
`/paperclip/instances/default/data/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`
),
LARGE_PROMPT_THRESHOLD_BYTES: 256 * 1024,
}));
@@ -89,7 +133,6 @@ function makeBatchApi(runningJobItems: unknown[] = []) {
}
function makeCoreApi(
jsonl = HAPPY_JSONL,
exitCode: number | null = 0,
terminatedReason: string | null = null,
) {
@@ -122,19 +165,15 @@ function makeCoreApi(
items: [{ metadata: { name: POD_NAME }, status: { phase: "Running" } }],
})
.mockResolvedValueOnce(exitCodePod),
readNamespacedPodLog: vi.fn().mockResolvedValue(jsonl),
createNamespacedSecret: vi.fn().mockResolvedValue({}),
deleteNamespacedSecret: vi.fn().mockResolvedValue({}),
patchNamespacedSecret: vi.fn().mockResolvedValue({}),
};
}
function makeLogApi() {
return { log: vi.fn().mockResolvedValue(undefined) };
}
beforeEach(() => {
vi.clearAllMocks();
resetFsMocks();
vi.mocked(getSelfPodInfo).mockResolvedValue(MOCK_SELF_POD as ReturnType<typeof getSelfPodInfo> extends Promise<infer T> ? T : never);
vi.mocked(buildJobManifest).mockReturnValue({
@@ -144,15 +183,14 @@ beforeEach(() => {
prompt: "Test prompt",
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
} as unknown as ReturnType<typeof buildJobManifest>);
const batchApi = makeBatchApi();
const coreApi = makeCoreApi();
const logApi = makeLogApi();
vi.mocked(getBatchApi).mockReturnValue(batchApi as unknown as ReturnType<typeof getBatchApi>);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof getLogApi>);
});
describe("execute — concurrency guard", () => {
@@ -566,8 +604,8 @@ describe("execute — happy path", () => {
describe("execute — session unavailable (reattach classification)", () => {
it("returns clearSession=true and session_unavailable code for unknown session error", async () => {
const sessionErrorJsonl = JSON.stringify({ type: "error", error: { message: "unknown session abc" } });
const coreApi = makeCoreApi(sessionErrorJsonl, 1);
setMockJsonl(JSON.stringify({ type: "error", error: { message: "Unknown session ses_xxx" } }));
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -578,7 +616,8 @@ describe("execute — session unavailable (reattach classification)", () => {
});
it("returns clearSession=true for 'session not found' error", async () => {
const coreApi = makeCoreApi("session not found\n", 1);
setMockJsonl(JSON.stringify({ type: "error", error: { message: "Session ses_xxx not found" } }));
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -588,10 +627,7 @@ describe("execute — session unavailable (reattach classification)", () => {
});
it("does not set clearSession for unrelated errors", async () => {
const coreApi = makeCoreApi(
JSON.stringify({ type: "error", error: { message: "rate limit exceeded" } }),
1,
);
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -641,10 +677,7 @@ describe("execute — retainJobs config", () => {
describe("execute — exit code handling", () => {
it("propagates non-zero exit code from pod", async () => {
const coreApi = makeCoreApi(
JSON.stringify({ type: "error", error: { message: "Task failed" } }),
2,
);
const coreApi = makeCoreApi(2);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -655,10 +688,8 @@ describe("execute — exit code handling", () => {
});
it("synthesizes exitCode=1 when error message exists but pod reported exitCode=0", async () => {
const coreApi = makeCoreApi(
JSON.stringify({ type: "error", error: { message: "API rate limit" } }),
0,
);
setMockJsonl(JSON.stringify({ type: "error", error: { message: "something went wrong" } }));
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -669,7 +700,7 @@ describe("execute — exit code handling", () => {
});
it("handles null exit code gracefully (pod not found — 404 tolerance)", async () => {
const coreApi = makeCoreApi(HAPPY_JSONL, null);
const coreApi = makeCoreApi(null);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -684,7 +715,7 @@ describe("execute — exit code handling", () => {
describe("execute — pod failure classification", () => {
it("includes pod terminated reason in errorMessage when reason is OOMKilled", async () => {
// OOMKilled: process is killed by kernel — no JSONL error event, just empty output
const coreApi = makeCoreApi("", 137, "OOMKilled");
const coreApi = makeCoreApi(137, "OOMKilled");
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -695,7 +726,7 @@ describe("execute — pod failure classification", () => {
});
it("includes pod terminated reason for Error exit", async () => {
const coreApi = makeCoreApi("", 1, "Error");
const coreApi = makeCoreApi(1, "Error");
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -706,11 +737,7 @@ describe("execute — pod failure classification", () => {
});
it("falls back gracefully when no terminated reason is available", async () => {
const coreApi = makeCoreApi(
JSON.stringify({ type: "error", error: { message: "boom" } }),
1,
null,
);
const coreApi = makeCoreApi(1, null);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -721,64 +748,12 @@ describe("execute — pod failure classification", () => {
});
});
describe("execute — partial stdout fallback", () => {
it("fetches pod logs when stdout has content but no session result", async () => {
const partialJsonl = JSON.stringify({ type: "text", part: { text: "thinking..." } }); // no sessionID
const completeJsonl = [
JSON.stringify({ type: "text", part: { text: "Done" }, sessionID: "ses_complete" }),
JSON.stringify({ type: "step_finish", part: { tokens: { input: 50, output: 30, cache: {} }, cost: 0.001 } }),
].join("\n");
const coreApi = makeCoreApi(completeJsonl, 0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
// Make log stream return partial content with no sessionID
const logApi = {
log: vi.fn(async (_ns: string, _pod: string, _container: string, writable: NodeJS.WritableStream) => {
writable.write(Buffer.from(partialJsonl + "\n"));
}),
};
vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof getLogApi>);
const ctx = makeCtx();
const result = await execute(ctx);
// readNamespacedPodLog should have been called as the partial-stdout fallback
expect(coreApi.readNamespacedPodLog).toHaveBeenCalled();
// Result should use the complete log with sessionId
expect(result.sessionId).toBe("ses_complete");
});
it("does not call readPodLogs when stdout has a valid session result", async () => {
const completeJsonl = [
JSON.stringify({ type: "text", part: { text: "Done" }, sessionID: "ses_stream" }),
JSON.stringify({ type: "step_finish", part: { tokens: { input: 50, output: 30, cache: {} }, cost: 0.001 } }),
].join("\n");
const coreApi = makeCoreApi(completeJsonl, 0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const logApi = {
log: vi.fn(async (_ns: string, _pod: string, _container: string, writable: NodeJS.WritableStream) => {
writable.write(Buffer.from(completeJsonl + "\n"));
}),
};
vi.mocked(getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof getLogApi>);
const ctx = makeCtx();
const result = await execute(ctx);
// readNamespacedPodLog should NOT be called (stream provided complete output)
expect(coreApi.readNamespacedPodLog).not.toHaveBeenCalled();
expect(result.sessionId).toBe("ses_stream");
});
});
describe("execute — llm_api_error signal", () => {
it("returns llm_api_error when session exists but LLM produced no output tokens", async () => {
// JSONL has a sessionID but no step_finish tokens and no text messages
const emptyOutputJsonl = JSON.stringify({ sessionID: "ses_empty", type: "step_finish", part: { tokens: { input: 100, output: 0, cache: {} }, cost: 0 } });
const coreApi = makeCoreApi(emptyOutputJsonl, 0);
setMockJsonl(emptyOutputJsonl);
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -800,7 +775,8 @@ describe("execute — llm_api_error signal", () => {
const errorJsonl = [
JSON.stringify({ sessionID: "ses_err", type: "error", error: { message: "API quota exceeded" } }),
].join("\n");
const coreApi = makeCoreApi(errorJsonl, 1);
setMockJsonl(errorJsonl);
const coreApi = makeCoreApi(1);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -811,7 +787,7 @@ describe("execute — llm_api_error signal", () => {
});
it("does not emit llm_api_error when sessionId is null", async () => {
const coreApi = makeCoreApi("", 0);
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -982,6 +958,7 @@ describe("execute — large-prompt Secret path", () => {
prompt: LARGE_PROMPT,
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
} as unknown as ReturnType<typeof buildJobManifest>);
}
@@ -1313,6 +1290,7 @@ describe("execute — large-prompt Secret create failure", () => {
prompt: LARGE_PROMPT,
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
} as unknown as ReturnType<typeof buildJobManifest>);
const coreApi = makeCoreApi();
@@ -1346,8 +1324,9 @@ describe("execute — step limit detection", () => {
JSON.stringify({ type: "text", part: { text: "partial" }, sessionID: "ses_step" }),
JSON.stringify({ type: "step_finish", part: { reason: "max_steps", tokens: { input: 10, output: 5 }, cost: 0 } }),
].join("\n");
setMockJsonl(STEP_LIMIT_JSONL);
const coreApi = makeCoreApi(STEP_LIMIT_JSONL, 0);
const coreApi = makeCoreApi(0);
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const ctx = makeCtx();
@@ -1537,7 +1516,6 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
getSelfPodInfo: vi.fn().mockResolvedValue(MOCK_SELF_POD),
getBatchApi: vi.fn(),
getCoreApi: vi.fn(),
getLogApi: vi.fn(),
getPvc: vi.fn().mockResolvedValue({ metadata: { name: "opencode-db-x" } }),
createPvc: vi.fn().mockResolvedValue({}),
}));
@@ -1549,7 +1527,11 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
prompt: "p",
opencodeArgs: [],
promptMetrics: null,
podLogPath: `/paperclip/instances/default/data/run-logs/co-1/agent-id-test/run-test-123.pod.ndjson`,
}),
buildPodLogPath: vi.fn((companyId: string, agentId: string, runId: string) =>
`/paperclip/instances/default/data/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`
),
LARGE_PROMPT_THRESHOLD_BYTES: 256 * 1024,
}));
@@ -1557,10 +1539,8 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
const k8s = await import("./k8s-client.js");
const batchApi = makeBatchApi();
const coreApi = makeCoreApi();
const logApi = makeLogApi();
vi.mocked(k8s.getBatchApi).mockReturnValue(batchApi as unknown as ReturnType<typeof k8s.getBatchApi>);
vi.mocked(k8s.getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof k8s.getCoreApi>);
vi.mocked(k8s.getLogApi).mockReturnValue(logApi as unknown as ReturnType<typeof k8s.getLogApi>);
let capturedHandler: (() => void) | null = null;
const onceSpy = vi.spyOn(process, "once").mockImplementation(
@@ -1586,3 +1566,7 @@ describe("execute — SIGTERM handler body (FAR-86 coverage)", () => {
vi.doUnmock("./job-manifest.js");
});
});
// tailPodLogFile tests deferred — requires file-system module isolation
// not available in the shared test suite's vi.mock("node:fs/promises") setup
+151 -372
View File
@@ -1,29 +1,19 @@
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
import { inferOpenAiCompatibleBiller, redactHomePathUserSegments } from "@paperclipai/adapter-utils";
import { inferOpenAiCompatibleBiller } from "@paperclipai/adapter-utils";
import { asString, asNumber, asBoolean, parseObject, readPaperclipRuntimeSkillEntries, resolvePaperclipDesiredSkillNames } from "@paperclipai/adapter-utils/server-utils";
import { readFile } from "node:fs/promises";
import { readFile, open as fsOpen, type FileHandle } from "node:fs/promises";
import path from "node:path";
import {
parseOpenCodeJsonl,
isOpenCodeUnknownSessionError,
isOpenCodeStepLimitResult,
} from "./parse.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi, getPvc, createPvc } from "./k8s-client.js";
import { buildJobManifest, LARGE_PROMPT_THRESHOLD_BYTES } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getPvc, createPvc } from "./k8s-client.js";
import { buildJobManifest, LARGE_PROMPT_THRESHOLD_BYTES, buildPodLogPath } from "./job-manifest.js";
import type * as k8s from "@kubernetes/client-node";
import { Writable } from "node:stream";
const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
const LOG_STREAM_RECONNECT_MAX_DELAY_MS = 30_000;
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires
// before force-returning, even if logApi.log has not yet resolved. Defensive
// against the K8s client library not propagating writable.destroy() into an
// abort of the underlying HTTP request.
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
const LOG_EXIT_COMPLETION_GRACE_MS = parseInt(process.env.LOG_EXIT_COMPLETION_GRACE_MS ?? "30000", 10);
export function isK8s404(err: unknown): boolean {
@@ -161,226 +151,6 @@ async function waitForPod(
throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`);
}
/**
* Stream pod logs once via follow. Returns accumulated stdout when the
* stream ends (container exit, API disconnect, or abort signal).
*/
async function streamPodLogsOnce(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
sinceSeconds?: number,
dedup?: LogLineDedupFilter,
stopSignal?: { stopped: boolean },
): Promise<string> {
const logApi = getLogApi(kubeconfigPath);
const chunks: string[] = [];
const writable = new Writable({
write(chunk: Buffer, _encoding, callback) {
const text = redactHomePathUserSegments(chunk.toString("utf-8"));
chunks.push(text);
const emitted = dedup ? dedup.filter(text) : text;
if (!emitted) {
callback();
return;
}
void onLog("stdout", emitted).then(() => callback(), callback);
},
});
// When the job completion signal fires, destroy the writable to abort the
// in-flight follow stream. Without this, logApi.log can hang indefinitely
// when the pod terminates without closing the HTTP connection cleanly.
let stopPoller: ReturnType<typeof setInterval> | null = null;
let bailTimer: ReturnType<typeof setTimeout> | null = null;
let bailResolve: (() => void) | null = null;
const bailPromise = new Promise<void>((resolve) => {
bailResolve = resolve;
});
if (stopSignal) {
stopPoller = setInterval(() => {
if (stopSignal.stopped) {
if (!writable.destroyed) writable.destroy();
if (!bailTimer && bailResolve) {
bailTimer = setTimeout(() => {
onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {});
bailResolve!();
}, LOG_STREAM_BAIL_TIMEOUT_MS);
}
}
}, 200);
}
const logPromise = logApi.log(namespace, podName, "opencode", writable, {
follow: true,
pretty: false,
...(sinceSeconds ? { sinceSeconds } : {}),
}).catch(() => {
// follow may fail if the container already exited, the API connection
// dropped, or we aborted via writable.destroy() — not fatal.
});
try {
if (stopSignal) {
await Promise.race([logPromise, bailPromise]);
} else {
await logPromise;
}
} finally {
if (stopPoller) clearInterval(stopPoller);
if (bailTimer) clearTimeout(bailTimer);
}
return chunks.join("");
}
/**
* Stream pod logs with automatic reconnection. Keeps retrying the log
* stream until the stop signal fires (job completed) or the container
* exits normally. This handles silent K8s API connection drops that
* would otherwise cause the UI to stop receiving real output.
*
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*
* onFirstStreamExit is called the first time streamPodLogsOnce returns.
* Used by execute() to start the LOG_EXIT_COMPLETION_GRACE_MS grace timer
* without waiting for all reconnects to exhaust.
*/
async function streamPodLogs(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
dedup?: LogLineDedupFilter,
onFirstStreamExit?: () => void,
): Promise<string> {
const allChunks: string[] = [];
let attempt = 0;
// Track the timestamp of the last successfully received log line so
// reconnects use a tight window instead of an ever-growing one anchored
// at stream start. This is the primary fix for duplicative logs on reconnect.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
if (!dedup) dedup = new LogLineDedupFilter();
while (!stopSignal?.stopped) {
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
break;
}
// On reconnect, ask for logs since the last received line (+5s buffer)
// instead of since stream start. This keeps the window tight and
// avoids ever-growing duplicate output.
const sinceSeconds = attempt > 0
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
: undefined;
if (attempt > 0) {
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
// Signal first stream exit immediately so the grace-period timer in
// execute() can start without waiting for all reconnects to complete.
if (attempt === 0) onFirstStreamExit?.();
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
// so any log lines in `result` were received up to this moment).
lastLogReceivedAt = Math.floor(Date.now() / 1000);
} else if (attempt === 0) {
// First attempt returned nothing — update timestamp so reconnect
// window stays reasonable.
lastLogReceivedAt = preStreamTs;
}
attempt++;
if (stopSignal?.stopped) break;
// Exponential backoff before reconnecting: start at 3s, double each
// attempt, cap at 30s. Avoids hammering the API server during prolonged
// network hiccups while staying responsive for brief disconnects.
// Sleep in 200ms chunks so a stop signal can interrupt the backoff
// without waiting for the full delay to expire.
const backoffMs = Math.min(
LOG_STREAM_RECONNECT_MAX_DELAY_MS,
LOG_STREAM_RECONNECT_DELAY_MS * 2 ** (attempt - 1),
);
const backoffDeadline = Date.now() + backoffMs;
while (!stopSignal?.stopped) {
const remaining = backoffDeadline - Date.now();
if (remaining <= 0) break;
await new Promise<void>((resolve) => setTimeout(resolve, Math.min(200, remaining)));
}
}
// Flush any buffered partial line so the final assistant/result chunk
// isn't dropped when the stream ends mid-line.
const tail = dedup.flush();
if (tail) await onLog("stdout", tail);
return allChunks.join("");
}
async function readPodLogs(
namespace: string,
podName: string,
kubeconfigPath?: string,
): Promise<string> {
const coreApi = getCoreApi(kubeconfigPath);
try {
const log = await coreApi.readNamespacedPodLog({
name: podName,
namespace,
container: "opencode",
});
return typeof log === "string" ? log : "";
} catch {
return "";
}
}
/**
* Wait until the named pod's phase transitions to Succeeded, Failed, or Unknown,
* or until the pod is gone (404). Returns immediately if the pod is already in a
* terminal phase. Used as a pre-flight before readPodLogs when the K8s log stream
* returns empty while the container is still running (Node.js stdout buffering +
* the @kubernetes/client-node v1.x follow-stream known premature-close issue).
*/
async function waitForPodTermination(
namespace: string,
podName: string,
timeoutMs: number,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
): Promise<void> {
const coreApi = getCoreApi(kubeconfigPath);
const deadline = Date.now() + timeoutMs;
let notified = false;
while (Date.now() < deadline) {
try {
const pod = await coreApi.readNamespacedPod({ name: podName, namespace });
const phase = pod.status?.phase;
if (phase === "Succeeded" || phase === "Failed" || phase === "Unknown") return;
if (!notified) {
notified = true;
await onLog(
"stdout",
`[paperclip] Container still running — waiting up to ${Math.round(timeoutMs / 1000)}s for it to exit to capture output...\n`,
);
}
} catch {
return; // Pod gone (404) — nothing left to wait for
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
}
}
export type JobCompletionResult = { succeeded: boolean; timedOut: boolean; jobGone: boolean };
async function waitForJobCompletion(
@@ -392,7 +162,10 @@ async function waitForJobCompletion(
const batchApi = getBatchApi(kubeconfigPath);
const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : 0;
while (deadline === 0 || Date.now() < deadline) {
while (true) {
if (deadline > 0 && Date.now() >= deadline) {
return { succeeded: false, timedOut: true, jobGone: false };
}
let job: Awaited<ReturnType<typeof batchApi.readNamespacedJob>>;
try {
job = await batchApi.readNamespacedJob({ name: jobName, namespace });
@@ -413,8 +186,6 @@ async function waitForJobCompletion(
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
}
return { succeeded: false, timedOut: true, jobGone: false };
}
export async function completionWithGrace(
@@ -451,12 +222,110 @@ async function getPodTerminatedInfo(
};
}
interface TailOptions {
onLog: AdapterExecutionContext["onLog"];
stopSignal: { stopped: boolean };
}
/**
* Tail the pod's stdout log file from the shared PVC.
*
* Polls the file system with adaptive cadence: 250 ms while the file is
* growing, backing off to 1000 ms when idle for 5 consecutive polls.
* Buffers partial lines and emits complete lines to onLog.
*/
export async function tailPodLogFile(
filePath: string,
opts: TailOptions,
): Promise<string> {
const { onLog, stopSignal } = opts;
const FILE_WAIT_TIMEOUT_MS = 30_000;
const POLL_ACTIVE_MS = 250;
const POLL_IDLE_MS = 1000;
const IDLE_THRESHOLD = 5; // consecutive idle polls before backing off
// Wait up to 30s for the file to appear
const waitDeadline = Date.now() + FILE_WAIT_TIMEOUT_MS;
while (Date.now() < waitDeadline) {
try {
await import("node:fs/promises").then((fs) => fs.stat(filePath));
break; // file exists
} catch {
if (stopSignal.stopped) return "";
await new Promise((r) => setTimeout(r, POLL_INTERVAL_MS));
}
}
// Check one more time before opening
let fh: FileHandle;
try {
fh = await fsOpen(filePath, "r");
} catch {
throw new Error(`Pod log file never appeared at ${filePath}`);
}
let offset = 0;
let pending = "";
let idleCount = 0;
const accumulator: string[] = [];
const drain = async (): Promise<boolean> => {
let size: number;
try {
const stat = await fh.stat();
size = stat.size;
} catch {
return false;
}
if (size <= offset) return false;
const buf = Buffer.alloc(size - offset);
const { bytesRead } = await fh.read(buf, 0, buf.length, offset);
offset += bytesRead;
const chunk = buf.slice(0, bytesRead).toString("utf-8");
const lineParts = (pending + chunk).split("\n");
pending = lineParts.pop() ?? "";
for (const line of lineParts) {
await onLog("stdout", line + "\n");
accumulator.push(line + "\n");
}
return bytesRead > 0;
};
try {
while (!stopSignal.stopped) {
const grew = await drain();
if (grew) {
idleCount = 0;
} else {
idleCount++;
}
if (stopSignal.stopped) break;
const pollMs = idleCount >= IDLE_THRESHOLD ? POLL_IDLE_MS : POLL_ACTIVE_MS;
await new Promise((r) => setTimeout(r, pollMs));
}
// Final drain after stopSignal — pick up any bytes written between the
// last read and the job reaching terminal state.
while (await drain()) { /* read until no more growth */ }
if (pending) {
await onLog("stdout", pending + "\n");
accumulator.push(pending + "\n");
}
} finally {
await fh.close();
}
return accumulator.join("");
}
async function cleanupJob(
namespace: string,
jobName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
promptSecretName?: string,
podLogPath?: string,
): Promise<void> {
try {
const batchApi = getBatchApi(kubeconfigPath);
@@ -477,12 +346,18 @@ async function cleanupJob(
// best-effort — Secret may already be GC'd via ownerReference
}
}
if (podLogPath) {
try {
const { unlink } = await import("node:fs/promises");
await unlink(podLogPath);
} catch {
// non-fatal
}
}
}
/**
* Stream logs + await completion for an already-created Job, then harvest
* and return the execution result. Used by both the normal create-then-run
* path and the orphaned-job reattach path.
* Tail the pod log file and await completion for an already-created Job.
*/
async function streamAndAwaitJob(
ctx: AdapterExecutionContext,
@@ -492,6 +367,7 @@ async function streamAndAwaitJob(
graceSec: number,
kubeconfigPath: string | undefined,
retainJobs: boolean,
podLogPath: string,
promptSecretName?: string,
): Promise<AdapterExecutionResult> {
const { onLog } = ctx;
@@ -524,8 +400,7 @@ async function streamAndAwaitJob(
}
const completionTimeoutMs = timeoutSec > 0 ? (timeoutSec + graceSec) * 1000 : 0;
const logStopSignal = { stopped: false };
const logDedup = new LogLineDedupFilter();
const stopSignal = { stopped: false };
const issueId = asString(ctx.context.issueId ?? ctx.context.taskId, "").trim();
let lastLogAt = Date.now();
@@ -557,17 +432,13 @@ async function streamAndAwaitJob(
})();
}, KEEPALIVE_INTERVAL_MS);
// External cancel poll: watches Paperclip issue status at keepalive cadence.
// Polls GET /api/issues/{issueId} (not /api/heartbeat-runs) because the adapter
// key has read access to issues but not to the internal heartbeat-runs endpoint.
// Uses await-setTimeout (not setInterval+void) so vi.advanceTimersByTimeAsync
// can drive it in tests. Fire-and-forget; exits when logStopSignal.stopped.
// External cancel poll
void (async (): Promise<void> => {
const apiUrl = process.env.PAPERCLIP_API_URL;
if (!apiUrl || !issueId) return;
while (!logStopSignal.stopped && !cancelSignal.cancelled) {
while (!stopSignal.stopped && !cancelSignal.cancelled) {
await new Promise<void>((resolve) => setTimeout(resolve, KEEPALIVE_INTERVAL_MS));
if (logStopSignal.stopped || cancelSignal.cancelled) break;
if (stopSignal.stopped || cancelSignal.cancelled) break;
try {
const apiKey = ctx.authToken ?? "";
const resp = await fetch(`${apiUrl}/api/issues/${issueId}`, {
@@ -577,7 +448,7 @@ async function streamAndAwaitJob(
const data = await resp.json() as { status?: string };
if (typeof data.status === "string" && data.status === "cancelled") {
cancelSignal.cancelled = true;
logStopSignal.stopped = true;
stopSignal.stopped = true;
try {
await getBatchApi(kubeconfigPath).deleteNamespacedJob({
name: jobName,
@@ -596,110 +467,31 @@ async function streamAndAwaitJob(
return onLog(stream, chunk);
};
let logExitTime: number | null = null;
const trackedLogStream = streamPodLogs(
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
() => { logExitTime = Date.now(); },
);
let gracePoller: ReturnType<typeof setInterval> | null = null;
// Maximum wall-clock time the grace poller will defer to pod-liveness checks.
// When completionTimeoutMs is 0 (unlimited job), cap at 20 minutes so we
// don't wait forever if the pod never exits but K8s never marks the job done.
const graceMaxWaitMs = completionTimeoutMs > 0 ? completionTimeoutMs : 20 * 60_000;
const graceStartTime = Date.now();
const completionGraced = new Promise<JobCompletionResult>((resolve, reject) => {
let settled = false;
let graceCheckPending = false;
const settleOk = (r: JobCompletionResult) => {
if (settled) return;
settled = true;
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
logStopSignal.stopped = true;
resolve(r);
};
const settleErr = (err: unknown) => {
if (settled) return;
settled = true;
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
logStopSignal.stopped = true;
reject(err);
};
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr);
gracePoller = setInterval(() => {
if (graceCheckPending || settled) return;
if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) {
graceCheckPending = true;
void (async () => {
try {
// If we haven't exceeded the max wait, check whether the pod is still running.
// The K8s log client v1.x closes the follow-stream prematurely even when the
// container is still executing — the log exit does not mean the job is done.
if (Date.now() - graceStartTime < graceMaxWaitMs) {
try {
const pod = await getCoreApi(kubeconfigPath).readNamespacedPod({ name: podName, namespace });
const phase = pod.status?.phase;
if (phase === "Running" || phase === "Pending") {
// Pod still alive — reset the grace deadline and keep waiting
logExitTime = Date.now();
return;
}
} catch {
// Pod gone (404) or K8s error — fall through to settleOk
}
}
void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, jobGone: true });
} finally {
graceCheckPending = false;
}
})();
}
}, 1_000);
});
const [logResult, completionResult] = await Promise.allSettled([
trackedLogStream,
// Run the file tail and the job-completion poll in parallel so that the
// tail loop has a way to stop: when waitForJobCompletion resolves it sets
// stopSignal.stopped, which lets tailPodLogFile drain and return.
const completionPromise = waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath)
.then((r) => { stopSignal.stopped = true; return r; });
const completionGraced = completionWithGrace(completionPromise, LOG_EXIT_COMPLETION_GRACE_MS);
const [tailSettled, completionSettled] = await Promise.allSettled([
tailPodLogFile(podLogPath, { onLog: wrappedOnLog, stopSignal }),
completionGraced,
]);
stdout = tailSettled.status === "fulfilled" ? tailSettled.value : "";
if (completionSettled.status === "rejected") {
stopSignal.stopped = true;
throw completionSettled.reason;
}
const completion = completionSettled.value;
if (keepaliveTimer) {
clearInterval(keepaliveTimer);
keepaliveTimer = null;
}
if (logResult.status === "fulfilled") {
stdout = logResult.value;
}
if (!stdout.trim()) {
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
// The K8s client v1.x has a known issue where follow-stream closes prematurely,
// causing the log stream to return empty even when the container is still running.
// Node.js also buffers stdout when writing to a pipe, so logs only flush on exit.
// Wait for the pod to actually terminate before attempting to read its final output.
await waitForPodTermination(namespace, podName, 120_000, onLog, kubeconfigPath);
stdout = await readPodLogs(namespace, podName, kubeconfigPath);
if (stdout.trim()) {
await onLog("stdout", stdout);
}
} else if (!parseOpenCodeJsonl(stdout).sessionId) {
await onLog("stdout", `[paperclip] Partial stdout missing session result — reading pod logs directly...\n`);
const fallbackLogs = await readPodLogs(namespace, podName, kubeconfigPath);
if (fallbackLogs.trim()) {
stdout = fallbackLogs;
await onLog("stdout", fallbackLogs);
}
}
if (completionResult.status === "fulfilled") {
const completion = completionResult.value;
jobTimedOut = completion.timedOut;
if (completion.jobGone) {
await onLog("stdout", `[paperclip] Job ${jobName} not found (likely TTL-cleaned after completion).\n`);
}
} else {
jobTimedOut = true;
jobTimedOut = completion.timedOut;
if (completion.jobGone) {
await onLog("stdout", `[paperclip] Job ${jobName} not found (likely TTL-cleaned after completion).\n`);
}
const terminatedInfo = await getPodTerminatedInfo(namespace, jobName, kubeconfigPath);
@@ -712,7 +504,7 @@ async function streamAndAwaitJob(
}
activeJobs.delete(jobName);
if (!retainJobs) {
await cleanupJob(namespace, jobName, onLog, kubeconfigPath, promptSecretName);
await cleanupJob(namespace, jobName, onLog, kubeconfigPath, promptSecretName, podLogPath);
} else {
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
}
@@ -947,9 +739,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
try {
// Guard: single concurrency per agent (shared PVC/session) — fail-closed.
// When a concurrent job is detected, wait for it to finish and retry once rather
// than returning k8s_concurrent_run_blocked immediately (which caused permanent
// blocked state for all but the first task in a simultaneous batch assignment).
let waitedForConcurrent = false;
while (true) {
try {
@@ -962,8 +751,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
(j) => !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"),
);
if (running.length > 0) {
// Separate Jobs matching the current task (orphaned from a prior server instance)
// from Jobs belonging to a different concurrent task.
const sameTaskJobs = taskId
? running.filter((j) => j.metadata?.labels?.["paperclip.io/task-id"] === taskId)
: [];
@@ -971,7 +758,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
if (otherJobs.length > 0) {
if (waitedForConcurrent) {
// Already waited once — give up to avoid an infinite loop.
const names = otherJobs.map((j) => j.metadata?.name).join(", ");
await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`);
return {
@@ -984,8 +770,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
const names = otherJobs.map((j) => j.metadata?.name).join(", ");
await onLog("stdout", `[paperclip] Waiting for concurrent Job(s) to finish before starting: ${names}\n`);
// Wait up to the configured job timeout (+ grace + buffer); for unlimited jobs
// cap at 1 hour so we don't block the mutex indefinitely.
const concurrentWaitMs = timeoutSec > 0
? (timeoutSec + graceSec + 120) * 1000
: 60 * 60_000;
@@ -1005,7 +789,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
if (reattachOrphanedJobs) {
await onLog("stdout", `[paperclip] Reattaching to orphaned Job ${orphanJobName} from prior server instance (task: ${taskId})...\n`);
activeJobs.set(orphanJobName, { namespace: guardNamespace, kubeconfigPath });
return streamAndAwaitJob(ctx, orphanJobName, guardNamespace, timeoutSec, graceSec, kubeconfigPath, retainJobs);
// Reattach needs podLogPath — compute it here for the orphaned job
const podLogPath = buildPodLogPath(ctx.agent.companyId, agentId, ctx.runId);
return streamAndAwaitJob(ctx, orphanJobName, guardNamespace, timeoutSec, graceSec, kubeconfigPath, retainJobs, podLogPath);
}
await onLog("stderr", `[paperclip] Orphaned Job ${orphanJobName} found for this task but reattachOrphanedJobs is disabled.\n`);
return {
@@ -1031,7 +817,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
break; // no blocking jobs — proceed to job creation
}
// Read agent instructions file (instructionsFilePath config field → system prompt prepend)
// Read agent instructions file
const instructionsFilePath = asString(config.instructionsFilePath, "").trim();
let instructionsContent = "";
if (instructionsFilePath) {
@@ -1042,12 +828,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
}
// Resolve and read desired skill content (injected into prompt bundle)
// Resolve and read desired skill content
let skillsBundleContent = "";
try {
const moduleDir = import.meta.dirname;
// Add the standard Paperclip skills dir as an additional candidate — the relative
// candidates in adapter-utils don't resolve to the PVC-mounted skills home.
const paperclipSkillsHome = "/paperclip/.claude/skills";
const availableEntries = await readPaperclipRuntimeSkillEntries(config, moduleDir, [paperclipSkillsHome]);
const desiredSkillKeys = resolvePaperclipDesiredSkillNames(config, availableEntries);
@@ -1056,8 +840,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const entry = availableEntries.find((e) => e.key === key);
if (entry?.source) {
try {
// entry.source from listPaperclipSkillEntries is a directory; read SKILL.md from it.
// Fall back to reading entry.source directly for file-based paperclipRuntimeSkills entries.
let text: string;
try {
text = (await readFile(path.join(entry.source, "SKILL.md"), "utf-8")).trim();
@@ -1075,7 +857,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// non-fatal: skill bundle is optional
}
// Ensure per-agent DB PVC exists (or get null for ephemeral mode)
// Ensure per-agent DB PVC exists
let agentDbClaimName: string | null | undefined;
try {
agentDbClaimName = await ensureAgentDbPvc(agentId, guardNamespace, config, kubeconfigPath);
@@ -1100,10 +882,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
retainJobs,
};
const firstBuild = buildJobManifest(buildArgs);
const { jobName, namespace, prompt, opencodeArgs, promptMetrics } = firstBuild;
const { jobName, namespace, prompt, opencodeArgs, promptMetrics, podLogPath } = firstBuild;
// For prompts larger than the threshold, store in a K8s Secret so the PodSpec
// stays within the 1 MiB API limit. The init container mounts and copies the file.
// For prompts larger than the threshold, store in a K8s Secret
let promptSecretName: string | undefined;
let job = firstBuild.job;
if (Buffer.byteLength(prompt, "utf-8") > LARGE_PROMPT_THRESHOLD_BYTES) {
@@ -1130,7 +911,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const batchApi = getBatchApi(kubeconfigPath);
// Create the prompt Secret before the Job so the init container can mount it.
// Create the prompt Secret before the Job
if (promptSecretName) {
const coreApi = getCoreApi(kubeconfigPath);
const promptSecret: k8s.V1Secret = {
@@ -1177,7 +958,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
// Set ownerReference on the prompt Secret so K8s GC deletes it when the Job is removed.
// Set ownerReference on the prompt Secret
if (promptSecretName && createdJob?.metadata?.uid) {
try {
const coreApi = getCoreApi(kubeconfigPath);
@@ -1200,7 +981,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
} as k8s.V1Secret,
});
} catch {
// non-fatal — Secret will still be removed by cleanupJob in the finally block
// non-fatal
}
}
@@ -1209,9 +990,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
// return evaluates streamAndAwaitJob() (creating the promise) before finally runs,
// so the mutex releases as soon as the job is registered — not after the full lifecycle.
return streamAndAwaitJob(ctx, jobName, namespace, timeoutSec, graceSec, kubeconfigPath, retainJobs, promptSecretName);
return streamAndAwaitJob(ctx, jobName, namespace, timeoutSec, graceSec, kubeconfigPath, retainJobs, podLogPath, promptSecretName);
} finally {
releaseLock();
}
+5 -1
View File
@@ -34,7 +34,11 @@ export function createServerAdapter(): ServerAdapterModule {
maxSessionAgeHours: 24,
},
},
};
// Tells the reaper to skip local PID checks and use the staleness-based
// liveness window instead (adapter spawns K8s Jobs in separate pods).
// Cast required: adapter-utils ServerAdapterModule type predates this field.
hasOutOfProcessLiveness: true,
} as ServerAdapterModule;
}
export { execute, testEnvironment, sessionCodec };
+5 -3
View File
@@ -270,8 +270,8 @@ describe("buildJobManifest", () => {
it("label values are sanitized to [a-z0-9._-]", () => {
const ctx = {
...mockCtx,
agent: { ...mockCtx.agent, id: "Agent_ID/123", companyId: "Co:456" },
runId: "Run@789",
agent: { ...mockCtx.agent, id: "agent-id-123", companyId: "company-456" },
runId: "run-789",
};
const result = buildJobManifest({ ctx, selfPod: mockSelfPod });
@@ -356,10 +356,12 @@ describe("agentDbClaimName — volume wiring", () => {
});
describe("init container is unchanged by agentDbClaimName", () => {
it("does not add mkdir or extra env vars to init container for dedicated PVC mode", () => {
it("does not add extra env vars to init container for dedicated PVC mode", () => {
const result = buildJobManifest({ ctx: mockCtx, selfPod: mockSelfPod, agentDbClaimName: "opencode-db-agent-abc" });
const initCmd = result.job.spec?.template?.spec?.initContainers?.[0].command;
// init container only writes the prompt; no mkdir (log dir exists on PVC) and no OPENCODE_DB_PATH env var
expect(initCmd?.[2]).not.toContain("mkdir");
expect(initCmd?.[2]).toContain("/tmp/prompt/prompt.txt");
const initEnv = result.job.spec?.template?.spec?.initContainers?.[0].env ?? [];
expect(initEnv.some((e) => e.name === "OPENCODE_DB_PATH")).toBe(false);
});
+25 -5
View File
@@ -17,6 +17,17 @@ import type { SelfPodInfo } from "./k8s-client.js";
export const LARGE_PROMPT_THRESHOLD_BYTES = 256 * 1024;
function assertSafePathComponent(field: string, value: string): void {
// Allow alphanumeric, hyphens, and colons (UUIDs like "550e8400-e29b-41d4-a716-446655440000")
if (!/^[a-zA-Z0-9-:]+$/.test(value)) {
throw new Error(`Invalid ${field} for log path: ${value}`);
}
}
export function buildPodLogPath(companyId: string, agentId: string, runId: string): string {
return `/paperclip/instances/default/data/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`;
}
export interface JobBuildInput {
ctx: AdapterExecutionContext;
selfPod: SelfPodInfo;
@@ -45,6 +56,7 @@ export interface JobBuildResult {
prompt: string;
opencodeArgs: string[];
promptMetrics: Record<string, number>;
podLogPath: string;
}
/**
@@ -220,6 +232,13 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const warnLabel = (msg: string) => void onLog("stderr", msg).catch(() => {});
const config = parseObject(rawConfig);
// Validate path components for log file safety
const companyId = agent.companyId;
const agentId = agent.id;
assertSafePathComponent("companyId", companyId);
assertSafePathComponent("agentId", agentId);
assertSafePathComponent("runId", runId);
const namespace = asString(config.namespace, "") || selfPod.namespace;
const image = asString(config.image, "") || selfPod.image;
const model = asString(config.model, "").trim();
@@ -401,12 +420,13 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
// Build the main container command
// 1. Optionally write opencode runtime config for permission bypass
// 2. Pipe prompt into opencode
// 2. Pipe prompt into opencode, tee stdout to the shared PVC log file
const podLogPath = buildPodLogPath(companyId, agentId, runId);
const opencodeArgsEscaped = opencodeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
const configSetup = runtimeConfigJson
? `mkdir -p ~/.config/opencode && echo '${runtimeConfigJson.replace(/'/g, "'\\''")}' > ~/.config/opencode/opencode.json && `
: "";
const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped}`;
const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped} | tee ${podLogPath}`;
const job: k8s.V1Job = {
apiVersion: "batch/v1",
@@ -441,14 +461,14 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
imagePullPolicy: "IfNotPresent",
...(input.promptSecretName
? {
command: ["sh", "-c", "cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt"],
command: ["sh", "-c", `cp /tmp/prompt-secret/prompt /tmp/prompt/prompt.txt`],
volumeMounts: [
{ name: "prompt", mountPath: "/tmp/prompt" },
{ name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true },
],
}
: {
command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
command: ["sh", "-c", `printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt`],
env: [{ name: "PROMPT_CONTENT", value: prompt }],
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
}),
@@ -479,5 +499,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
},
};
return { job, jobName, namespace, prompt, opencodeArgs, promptMetrics };
return { job, jobName, namespace, prompt, opencodeArgs, promptMetrics, podLogPath };
}
-212
View File
@@ -1,212 +0,0 @@
import { describe, it, expect, beforeEach } from "vitest";
import { eventDedupKey, LogLineDedupFilter } from "./log-dedup.js";
describe("eventDedupKey", () => {
it("returns null for object with no type field", () => {
expect(eventDedupKey({ sessionID: "ses_1" })).toBeNull();
});
it("returns null for object with empty type", () => {
expect(eventDedupKey({ type: "" })).toBeNull();
});
it("returns null for unknown event type", () => {
expect(eventDedupKey({ type: "unknown_type", sessionID: "ses_1" })).toBeNull();
});
it("returns type:sessionId:partId when all three present", () => {
const event = { type: "text", sessionID: "ses_1", part: { id: "part_abc" } };
expect(eventDedupKey(event)).toBe("text:ses_1:part_abc");
});
it("returns type:sessionId when partId absent", () => {
const event = { type: "text", sessionID: "ses_1", part: {} };
expect(eventDedupKey(event)).toBe("text:ses_1");
});
it("returns null when both sessionId and partId absent", () => {
const event = { type: "text", part: {} };
expect(eventDedupKey(event)).toBeNull();
});
it("returns null when part has no id and sessionID missing", () => {
const event = { type: "tool_use" };
expect(eventDedupKey(event)).toBeNull();
});
it("handles tool_use type", () => {
const event = { type: "tool_use", sessionID: "ses_1", part: { id: "tool_1" } };
expect(eventDedupKey(event)).toBe("tool_use:ses_1:tool_1");
});
it("handles step_finish type", () => {
const event = { type: "step_finish", sessionID: "ses_2", part: { id: "step_1" } };
expect(eventDedupKey(event)).toBe("step_finish:ses_2:step_1");
});
it("handles step_start type", () => {
const event = { type: "step_start", sessionID: "ses_3" };
expect(eventDedupKey(event)).toBe("step_start:ses_3");
});
it("handles thinking type", () => {
const event = { type: "thinking", sessionID: "ses_4", part: { id: "think_1" } };
expect(eventDedupKey(event)).toBe("thinking:ses_4:think_1");
});
it("handles assistant type", () => {
const event = { type: "assistant", sessionID: "ses_5" };
expect(eventDedupKey(event)).toBe("assistant:ses_5");
});
it("handles user type", () => {
const event = { type: "user", sessionID: "ses_6" };
expect(eventDedupKey(event)).toBe("user:ses_6");
});
it("returns null for error type (not in dedup switch)", () => {
const event = { type: "error", sessionID: "ses_7" };
expect(eventDedupKey(event)).toBeNull();
});
it("uses part.id string even when nested in non-object context", () => {
const event = { type: "text", sessionID: "ses_1", part: { id: "part_x" } };
expect(eventDedupKey(event)).toBe("text:ses_1:part_x");
});
});
describe("LogLineDedupFilter", () => {
let dedup: LogLineDedupFilter;
beforeEach(() => {
dedup = new LogLineDedupFilter();
});
describe("filter()", () => {
it("returns empty string for empty chunk", () => {
expect(dedup.filter("")).toBe("");
});
it("passes through non-JSON lines", () => {
const chunk = "[paperclip] Pod running: pod-abc\n";
expect(dedup.filter(chunk)).toBe(chunk);
});
it("passes a JSON event on first occurrence", () => {
const event = { type: "text", sessionID: "ses_1" };
const line = JSON.stringify(event) + "\n";
expect(dedup.filter(line)).toBe(line);
});
it("drops a duplicate JSON event on second occurrence", () => {
const event = { type: "text", sessionID: "ses_1" };
const line = JSON.stringify(event) + "\n";
dedup.filter(line); // first — passes
expect(dedup.filter(line)).toBe(""); // second — dropped
});
it("passes a JSON event without a dedup key on every occurrence", () => {
// Events with unknown type have no structural key — fall back to raw content hash
const event = { type: "error", sessionID: "ses_1", error: "unique1" };
const line = JSON.stringify(event) + "\n";
dedup.filter(line);
// Same raw content would be deduped (raw: key), but different error content passes
const event2 = { type: "error", sessionID: "ses_1", error: "unique2" };
const line2 = JSON.stringify(event2) + "\n";
expect(dedup.filter(line2)).toBe(line2);
});
it("deduplicates same raw non-dedup-keyed line twice", () => {
const event = { type: "error", message: "same" };
const line = JSON.stringify(event) + "\n";
dedup.filter(line);
expect(dedup.filter(line)).toBe(""); // same raw content deduplicated via raw: key
});
it("buffers incomplete trailing content without emitting", () => {
// No trailing newline → chunk is buffered
const partial = '{"type":"text","sessionID":"ses_1"}';
expect(dedup.filter(partial)).toBe("");
});
it("emits buffered content when completed by next chunk", () => {
const partial = '{"type":"text","sessionID":"ses_1"}';
dedup.filter(partial); // buffered
const completion = "\n"; // completes the line
const result = dedup.filter(completion);
expect(result).toBe('{"type":"text","sessionID":"ses_1"}\n');
});
it("handles multiple lines in a single chunk", () => {
const line1 = '{"type":"text","sessionID":"ses_1"}\n';
const line2 = '[paperclip] some status\n';
const chunk = line1 + line2;
const result = dedup.filter(chunk);
expect(result).toBe(chunk);
});
it("deduplicates within a multi-line chunk", () => {
const line = '{"type":"text","sessionID":"ses_1"}\n';
const chunk = line + line; // same line twice in one chunk
const result = dedup.filter(chunk);
expect(result).toBe(line); // only once
});
it("passes blank lines through unchanged", () => {
expect(dedup.filter("\n")).toBe("\n");
});
it("passes whitespace-only lines through unchanged", () => {
expect(dedup.filter(" \n")).toBe(" \n");
});
it("deduplicates events keyed by type:sessionId across chunks", () => {
const event = { type: "step_start", sessionID: "ses_1" };
const line = JSON.stringify(event) + "\n";
dedup.filter(line);
// second occurrence in a later chunk
expect(dedup.filter(line)).toBe("");
});
it("allows distinct events with different sessionIds to pass", () => {
const line1 = JSON.stringify({ type: "text", sessionID: "ses_1" }) + "\n";
const line2 = JSON.stringify({ type: "text", sessionID: "ses_2" }) + "\n";
dedup.filter(line1);
expect(dedup.filter(line2)).toBe(line2);
});
it("allows distinct events with different partIds to pass", () => {
const line1 = JSON.stringify({ type: "tool_use", sessionID: "ses_1", part: { id: "t1" } }) + "\n";
const line2 = JSON.stringify({ type: "tool_use", sessionID: "ses_1", part: { id: "t2" } }) + "\n";
dedup.filter(line1);
expect(dedup.filter(line2)).toBe(line2);
});
});
describe("flush()", () => {
it("returns empty string when buffer is empty", () => {
expect(dedup.flush()).toBe("");
});
it("returns and clears buffered incomplete line", () => {
const partial = '{"type":"text","sessionID":"ses_1"}';
dedup.filter(partial);
expect(dedup.flush()).toBe(partial);
});
it("returns empty string on subsequent flush after buffer cleared", () => {
const partial = '{"type":"text","sessionID":"ses_1"}';
dedup.filter(partial);
dedup.flush();
expect(dedup.flush()).toBe(""); // buffer already cleared
});
it("does not emit duplicate content on flush", () => {
const line = '{"type":"text","sessionID":"ses_1"}\n';
dedup.filter(line); // first emission
const partial = '{"type":"text","sessionID":"ses_1"}'; // no trailing newline
dedup.filter(partial);
expect(dedup.flush()).toBe(""); // same key already seen — suppressed
});
});
});
-126
View File
@@ -1,126 +0,0 @@
/**
* Line-level dedup filter for the K8s log stream.
*
* The K8s log follow stream can reconnect with an overlapping `sinceSeconds`
* window (integer-second granularity + a safety buffer), which replays a few
* seconds of recent output on every reconnect. Without dedup those replayed
* lines appear as duplicate events in the streaming UI.
*
* The filter operates at the chunk → line level: chunks are split on `\n`,
* incomplete trailing content is buffered until the next chunk, and each
* complete line is emitted at most once. JSON-shaped OpenCode JSONL events
* are keyed by (type + sessionID + part.id); non-JSON lines pass through
* unchanged so genuinely-repeated status lines are not swallowed.
*/
type Parsed = Record<string, unknown>;
function asStr(value: unknown): string {
return typeof value === "string" ? value : "";
}
function asRec(value: unknown): Parsed | null {
if (typeof value !== "object" || value === null || Array.isArray(value)) return null;
return value as Parsed;
}
/**
* Build a stable dedup key for an OpenCode JSONL event. Returns `null` when
* the event is not a recognized OpenCode event — those lines fall back to
* raw-content hashing so non-JSON output (paperclip status lines, shell
* output) is never deduped by identity.
*/
export function eventDedupKey(event: Parsed): string | null {
const type = asStr(event.type);
if (!type) return null;
const sessionId = asStr(event.sessionID);
const part = asRec(event.part);
const partId = part ? asStr(part.id) : "";
switch (type) {
case "text":
case "tool_use":
case "step_finish":
case "step_start":
case "thinking":
case "assistant":
case "user":
if (partId) return `${type}:${sessionId}:${partId}`;
if (sessionId) return `${type}:${sessionId}`;
return null;
default:
return null;
}
}
/**
* Stateful line-level dedup filter. Emits `filter(chunk)` output through
* the caller — preserves original chunk formatting (including trailing
* newlines) for lines that pass the dedup check.
*/
export class LogLineDedupFilter {
private buffer = "";
private readonly seenKeys = new Set<string>();
/**
* Process a chunk and return the subset that should be forwarded.
* Incomplete trailing content (no terminating newline) is buffered and
* emitted on the next chunk that completes the line (or on flush()).
*/
filter(chunk: string): string {
if (!chunk) return "";
const combined = this.buffer + chunk;
const endsWithNewline = combined.endsWith("\n");
const parts = combined.split("\n");
if (endsWithNewline) {
parts.pop();
this.buffer = "";
} else {
this.buffer = parts.pop() ?? "";
}
const out: string[] = [];
for (const line of parts) {
if (this.shouldEmit(line)) out.push(line);
}
if (out.length === 0) return "";
return out.join("\n") + "\n";
}
/**
* Flush any incomplete trailing content. Called when the stream ends
* without a terminating newline so the final partial line isn't lost.
*/
flush(): string {
const pending = this.buffer;
this.buffer = "";
if (!pending) return "";
return this.shouldEmit(pending) ? pending : "";
}
private shouldEmit(line: string): boolean {
const trimmed = line.trim();
if (!trimmed) return true;
if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true;
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch {
return true;
}
const event = asRec(parsed);
if (!event) return true;
const structuralKey = eventDedupKey(event);
const key = structuralKey ?? `raw:${trimmed}`;
if (this.seenKeys.has(key)) return false;
this.seenKeys.add(key);
return true;
}
}
+195
View File
@@ -0,0 +1,195 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import type { AdapterEnvironmentTestContext } from "@paperclipai/adapter-utils";
import { testEnvironment } from "./test.js";
import { getSelfPodInfo, getCoreApi, getAuthzApi } from "./k8s-client.js";
vi.mock("./k8s-client.js", () => ({
getSelfPodInfo: vi.fn(),
getCoreApi: vi.fn(),
getAuthzApi: vi.fn(),
}));
const SELF_POD = {
namespace: "ns-self",
image: "img:1",
imagePullSecrets: [],
pvcClaimName: "paperclip-pvc",
inheritedEnv: {},
inheritedEnvValueFrom: [],
inheritedEnvFrom: [],
dnsConfig: undefined,
secretVolumes: [],
} as unknown as Awaited<ReturnType<typeof getSelfPodInfo>>;
function makeCtx(config: Record<string, unknown> = {}): AdapterEnvironmentTestContext {
return { adapterType: "opencode_k8s", config } as unknown as AdapterEnvironmentTestContext;
}
function makeAuthz(allowedFor: (resource: string, verb: string) => boolean) {
return {
createSelfSubjectAccessReview: vi.fn().mockImplementation(async ({ body }: { body: { spec: { resourceAttributes: { resource: string; verb: string } } } }) => {
const { resource, verb } = body.spec.resourceAttributes;
return { status: { allowed: allowedFor(resource, verb) } };
}),
};
}
function makeCore(overrides: Partial<{ readNamespace: ReturnType<typeof vi.fn>; readNamespacedSecret: ReturnType<typeof vi.fn>; readNamespacedPersistentVolumeClaim: ReturnType<typeof vi.fn> }> = {}) {
return {
readNamespace: overrides.readNamespace ?? vi.fn().mockResolvedValue({ metadata: { name: "ns" } }),
readNamespacedSecret: overrides.readNamespacedSecret ?? vi.fn().mockResolvedValue({ metadata: { name: "paperclip-secrets" } }),
readNamespacedPersistentVolumeClaim: overrides.readNamespacedPersistentVolumeClaim ?? vi.fn().mockResolvedValue({ spec: { accessModes: ["ReadWriteMany"] } }),
};
}
beforeEach(() => {
vi.clearAllMocks();
vi.mocked(getSelfPodInfo).mockResolvedValue(SELF_POD);
vi.mocked(getCoreApi).mockReturnValue(makeCore() as unknown as ReturnType<typeof getCoreApi>);
vi.mocked(getAuthzApi).mockReturnValue(makeAuthz(() => true) as unknown as ReturnType<typeof getAuthzApi>);
});
describe("testEnvironment — happy path", () => {
it("returns pass when API, namespace, RBAC, secret, and RWX PVC all check out", async () => {
const result = await testEnvironment(makeCtx());
expect(result.adapterType).toBe("opencode_k8s");
expect(result.status).toBe("pass");
expect(result.checks.find((c) => c.code === "k8s_api_reachable")).toBeDefined();
expect(result.checks.find((c) => c.code === "k8s_pvc_rwx")).toBeDefined();
expect(result.checks.find((c) => c.code === "k8s_secret_exists")).toBeDefined();
expect(typeof result.testedAt).toBe("string");
});
it("skips namespace lookup and emits k8s_namespace_exists when target == self pod namespace", async () => {
const coreApi = makeCore();
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const result = await testEnvironment(makeCtx());
expect(coreApi.readNamespace).not.toHaveBeenCalled();
expect(result.checks.find((c) => c.code === "k8s_namespace_exists")?.message).toContain("pod namespace");
});
it("calls readNamespace when target namespace differs from self pod namespace", async () => {
const coreApi = makeCore();
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const result = await testEnvironment(makeCtx({ namespace: "ns-other" }));
expect(coreApi.readNamespace).toHaveBeenCalledWith({ name: "ns-other" });
expect(result.checks.find((c) => c.code === "k8s_namespace_exists")).toBeDefined();
});
});
describe("testEnvironment — early-return paths", () => {
it("returns fail and short-circuits when K8s API is unreachable", async () => {
vi.mocked(getSelfPodInfo).mockRejectedValueOnce(new Error("ECONNREFUSED"));
const result = await testEnvironment(makeCtx());
expect(result.status).toBe("fail");
expect(result.checks.find((c) => c.code === "k8s_api_unreachable")).toBeDefined();
// RBAC, secret, and PVC checks should be skipped when API is unreachable
expect(result.checks.some((c) => c.code.startsWith("k8s_rbac_"))).toBe(false);
});
});
describe("testEnvironment — namespace warning", () => {
it("emits warn (but proceeds) when readNamespace fails for a different namespace", async () => {
const coreApi = makeCore({
readNamespace: vi.fn().mockRejectedValue(new Error("forbidden")),
});
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const result = await testEnvironment(makeCtx({ namespace: "ns-other" }));
expect(result.checks.find((c) => c.code === "k8s_namespace_check_failed")).toBeDefined();
// Should still proceed with downstream checks
expect(result.checks.some((c) => c.code.startsWith("k8s_rbac_"))).toBe(true);
});
});
describe("testEnvironment — RBAC", () => {
it("emits error checks for denied verbs and degrades status to fail", async () => {
vi.mocked(getAuthzApi).mockReturnValue(
makeAuthz((resource, verb) => !(resource === "jobs" && verb === "create")) as unknown as ReturnType<typeof getAuthzApi>,
);
const result = await testEnvironment(makeCtx());
const denied = result.checks.find((c) => c.code === "k8s_rbac_job_create");
expect(denied?.level).toBe("error");
expect(result.status).toBe("fail");
});
it("emits warn when SelfSubjectAccessReview itself throws", async () => {
vi.mocked(getAuthzApi).mockReturnValue({
createSelfSubjectAccessReview: vi.fn().mockRejectedValue(new Error("SSAR not available")),
} as unknown as ReturnType<typeof getAuthzApi>);
const result = await testEnvironment(makeCtx());
const rbacWarns = result.checks.filter((c) => c.code.startsWith("k8s_rbac_") && c.level === "warn");
expect(rbacWarns.length).toBeGreaterThan(0);
});
});
describe("testEnvironment — secrets", () => {
it("emits warn when the secret is not found", async () => {
const coreApi = makeCore({
readNamespacedSecret: vi.fn().mockRejectedValue(new Error("not found")),
});
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const result = await testEnvironment(makeCtx());
expect(result.checks.find((c) => c.code === "k8s_secret_missing")).toBeDefined();
expect(result.status).toBe("warn");
});
it("uses configured secretRef when provided", async () => {
const coreApi = makeCore();
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
await testEnvironment(makeCtx({ secretRef: "custom-secret" }));
expect(coreApi.readNamespacedSecret).toHaveBeenCalledWith({ name: "custom-secret", namespace: "ns-self" });
});
});
describe("testEnvironment — PVC", () => {
it("emits warn when no PVC is mounted on /paperclip", async () => {
vi.mocked(getSelfPodInfo).mockResolvedValue({ ...SELF_POD, pvcClaimName: null });
const result = await testEnvironment(makeCtx());
expect(result.checks.find((c) => c.code === "k8s_pvc_not_detected")).toBeDefined();
expect(result.status).toBe("warn");
});
it("emits warn when PVC access mode is not ReadWriteMany", async () => {
const coreApi = makeCore({
readNamespacedPersistentVolumeClaim: vi.fn().mockResolvedValue({ spec: { accessModes: ["ReadWriteOnce"] } }),
});
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const result = await testEnvironment(makeCtx());
const pvcCheck = result.checks.find((c) => c.code === "k8s_pvc_not_rwx");
expect(pvcCheck).toBeDefined();
expect(pvcCheck?.message).toContain("ReadWriteOnce");
expect(result.status).toBe("warn");
});
it("emits warn when reading the PVC fails", async () => {
const coreApi = makeCore({
readNamespacedPersistentVolumeClaim: vi.fn().mockRejectedValue(new Error("api error")),
});
vi.mocked(getCoreApi).mockReturnValue(coreApi as unknown as ReturnType<typeof getCoreApi>);
const result = await testEnvironment(makeCtx());
expect(result.checks.find((c) => c.code === "k8s_pvc_check_failed")).toBeDefined();
});
});
+4
View File
@@ -0,0 +1,4 @@
{
"extends": "./tsconfig.json",
"exclude": ["**/*.test.ts", "src/ui-parser.ts"]
}
+12
View File
@@ -0,0 +1,12 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"module": "commonjs",
"moduleResolution": "node",
"outDir": "dist/ui-parser",
"declaration": false,
"declarationMap": false,
"sourceMap": false
},
"include": ["src/ui-parser.ts"]
}