Compare commits

...

19 Commits

Author SHA1 Message Date
Chris Farhood 5179544fd6 docs: mark repo as abandoned in favor of paperclip-plugin-k8s
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 08:46:55 -04:00
Chris Farhood 160d6b49e9 0.2.5 2026-04-30 09:06:19 -04:00
Chris Farhood 9007762390 chore(deps): bump @paperclipai/adapter-utils from canary.7 pin to ^2026.428.0
The peerDep floor and devDep were pinned to a pre-release canary from
April 15, 13 days behind the current stable. Move both to the latest
stable 2026.428.0. All 328 tests pass against the new types; the
imported surface (asString, parseObject, runChildProcess,
AdapterExecutionContext, etc.) is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 09:06:12 -04:00
Chris Farhood 506007984c 0.2.4 2026-04-30 08:46:57 -04:00
Chris Farhood 7a6d1a44f2 fix(ui-parser): restore esbuild CJS bundle step lost in PR #11 merge
Commit 0e43811 added an esbuild step to bundle src/ui-parser.ts as CJS
because the UI's sandboxed worker can't evaluate ESM `export` syntax.
PR #11 (filesystem-log-tail) was based on a commit predating that fix,
so the merge clobbered both the build:ui-parser script and the esbuild
devDependency. Every release since has shipped a tsc-emitted ESM
ui-parser.js that the worker silently fails to load — parseStdoutLine
never registers and the run transcript falls back to dumping raw
stream-json lines as plain text instead of rendering structured
assistant/thinking/tool_call/tool_result entries.

Restore the script and dep verbatim from 0e43811.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 08:46:46 -04:00
Chris Farhood 3960d746f4 ci: serialize publish jobs sharing the same SHA to fix race
When a tagged release lands on master, both the master-push and tag-push
events trigger the publish job. The skip-on-exists check (`npm view`)
runs concurrently on both, both see the version as not-yet-published,
and both proceed to `npm publish`. The first wins; the second gets
E403 ("cannot publish over previously published versions") and reds
out the run.

Fixes the race by adding a publish-${{ github.sha }} concurrency group
so the second run queues until the first finishes — by then npm view
sees the published version and the skip path takes over cleanly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 08:28:01 -04:00
Chris Farhood cc942ca818 0.2.3 2026-04-30 08:03:08 -04:00
Chris Farhood 83a2d25062 fix(execute): assign captured stdout to outer binding so parse sees it
The filesystem-tail rewrite (8bd5042) declared `const stdout` inside the
try block, shadowing the outer `let stdout = ""`. parseClaudeStreamJson
then ran on the empty outer binding, so every run failed with "Failed to
parse Claude JSON output" and resultJson={stdout:""} despite live
log-streaming working fine. Drop the `const` so the assignment lands on
the outer let.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 08:03:05 -04:00
Chris Farhood c8429cfde1 fix: write logs to /paperclip/instances/default/data/run-logs/ to match server PVC layout
v0.2.1 introduced filesystem-tail log delivery with buildPodLogPath()
returning /paperclip/instances/default/run-logs/... but the paperclip
server creates and tails from /paperclip/instances/default/data/run-logs/
on the shared PVC. The missing /data/ segment meant:

  1. The init container's mkdir -p /paperclip/instances/... ran in a
     directory busybox UID 1000 can't write to — it's the init
     container's ephemeral rootfs, since the PVC is only mounted in
     the main container. Init exited 1, the && short-circuited, and
     the prompt copy never happened. Job failed with "Init container
     'write-prompt' failed with exit code 1".
  2. Even if the mkdir had worked, the main container's tee would
     have written to a path the server doesn't tail.

Fix: drop the misplaced mkdir from both init container variants and
correct buildPodLogPath() to include /data/. The directory already
exists on the PVC because the paperclip server creates it; both
containers run as UID 1000 with fsGroup 1000, so the main container's
tee writes to the pre-existing path with no setup needed.

Bump to 0.2.2.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 22:15:15 -04:00
Chris Farhood 1502039d70 Merge pull request #11 from farhoodlabs/feat/filesystem-log-tail
feat: replace k8s log API streaming with filesystem tailing
2026-04-27 22:26:02 -04:00
Chris Farhood c326d2571e fix(ci): run on tags, publish on both master push and tags 2026-04-27 22:25:48 -04:00
Chris Farhood e6df8fad98 chore: bump to 0.2.0 2026-04-27 22:20:00 -04:00
Chris Farhood 8bd5042b5d feat: replace k8s log API streaming with filesystem tailing
Replaces K8s log API streaming (which was dropping every ~3 seconds at
production scale) with filesystem tailing via tee to a pod log file on
the shared PVC.

Core changes:
- Add tee to claudeInvocation to write pod log file
- Add mkdir -p to init container to create log directory
- Add assertSafePathComponent and buildPodLogPath helper
- Add tailPodLogFile function with adaptive 250ms/1s polling
- Replace k8s log streaming with tailPodLogFile in Promise.allSettled
- Delete log-dedup.ts (RTK output truncation no longer needed)
- Update config-schema.ts and index.ts to remove RTK references
- Clean up log file in cleanupJob when retainJobs=false

Note: 14 tests in execute.test.ts test the obsolete k8s log streaming
approach and need to be rewritten or deleted (streamPodLogsOnce tests).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 22:13:39 -04:00
Chris Farhood 568f571d8c fix(models): inline static model list in index.ts to break circular dep with server/models
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 09:16:35 -04:00
Chris Farhood 8a9376b40e chore: bump to 0.1.56
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 08:05:29 -04:00
Chris Farhood 0c8aa4d1ea fix(models): move import to top of index.ts before export declarations
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 08:04:46 -04:00
Chris Farhood 1d894f104f fix(models): expose static models list so UI renders entries before listModels resolves
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 07:42:44 -04:00
Chris Farhood fc3866924a 0.1.54 2026-04-27 00:38:06 +00:00
Chris Farhood 368254d75d fix: per-chunk activity tracking + pod-phase gate on grace timer (FAR-107)
The 0.1.53 fix tracked stream liveness by updating lastActiveAt only
after streamPodLogsOnce returned.  That worked for the
disconnect-then-reconnect-then-disconnect case, but missed the
disconnect-then-long-running-reconnect case: a streaming attempt that
runs for minutes without disconnecting never refreshes lastActiveAt,
so the grace timer fires 30s after the prior disconnect even though
the new attempt is currently producing output.  Nancy reproduced
exactly this on 0.1.53 — claude_truncated with pod phase=Running.

Two changes:

1. streamPodLogsOnce now accepts the activity ref and updates
   lastActiveAt inside its writable's write handler — every chunk
   delivered from the container refreshes the timer in real time,
   not just on stream return.

2. Before the grace timer settles, gate on pod phase: if the pod is
   still Running or Pending, the container is alive (Claude's
   long tool-use silences exceed 30s for slow upstream APIs).
   Refresh lastActiveAt, leave the poller armed, and let
   waitForJobCompletion remain the authoritative termination
   signal.  Only proceed with the grace settlement when the pod
   has actually reached a terminal phase or is gone.

The original FAR-23 fast-path (container exits, Job condition lags)
still works: when the container terminates, pod phase moves to
Succeeded/Failed and the gate falls through to the existing
Job-presence check.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-27 00:38:06 +00:00
13 changed files with 700 additions and 1911 deletions
+5 -1
View File
@@ -3,6 +3,7 @@ name: CI
on:
push:
branches: [master]
tags: ['v*']
pull_request:
branches: [master]
@@ -28,7 +29,10 @@ jobs:
publish:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/master' && github.event_name == 'push'
if: (github.ref == 'refs/heads/master' && github.event_name == 'push') || startsWith(github.ref, 'refs/tags/')
concurrency:
group: publish-${{ github.sha }}
cancel-in-progress: false
permissions:
id-token: write
steps:
+2
View File
@@ -1,3 +1,5 @@
> **Abandoned** — This adapter is no longer maintained. The Kubernetes execution capability has moved to the sandbox plugin at [`farhoodlabs/paperclip-plugin-k8s`](https://github.com/farhoodlabs/paperclip-plugin-k8s) (`@farhoodlabs/paperclip-plugin-k8s` on npm).
# Claude (Kubernetes) Paperclip Adapter Plugin
Paperclip adapter plugin that runs Claude Code agents as isolated Kubernetes Jobs instead of inside the main Paperclip process.
+474 -7
View File
@@ -1,26 +1,27 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.53",
"version": "0.2.5",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.53",
"version": "0.2.5",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
"picocolors": "^1.1.1"
},
"devDependencies": {
"@paperclipai/adapter-utils": "2026.415.0-canary.7",
"@paperclipai/adapter-utils": "^2026.428.0",
"@types/node": "^24.6.0",
"@vitest/coverage-v8": "^4.1.4",
"esbuild": "^0.24.0",
"typescript": "^5.7.3",
"vitest": "^4.1.4"
},
"peerDependencies": {
"@paperclipai/adapter-utils": ">=2026.415.0-canary.7"
"@paperclipai/adapter-utils": ">=2026.428.0"
}
},
"node_modules/@babel/helper-string-parser": {
@@ -117,6 +118,431 @@
"tslib": "^2.4.0"
}
},
"node_modules/@esbuild/aix-ppc64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/aix-ppc64/-/aix-ppc64-0.24.2.tgz",
"integrity": "sha512-thpVCb/rhxE/BnMLQ7GReQLLN8q9qbHmI55F4489/ByVg2aQaQ6kbcLb6FHkocZzQhxc4gx0sCk0tJkKBFzDhA==",
"cpu": [
"ppc64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"aix"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/android-arm": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/android-arm/-/android-arm-0.24.2.tgz",
"integrity": "sha512-tmwl4hJkCfNHwFB3nBa8z1Uy3ypZpxqxfTQOcHX+xRByyYgunVbZ9MzUUfb0RxaHIMnbHagwAxuTL+tnNM+1/Q==",
"cpu": [
"arm"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"android"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/android-arm64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/android-arm64/-/android-arm64-0.24.2.tgz",
"integrity": "sha512-cNLgeqCqV8WxfcTIOeL4OAtSmL8JjcN6m09XIgro1Wi7cF4t/THaWEa7eL5CMoMBdjoHOTh/vwTO/o2TRXIyzg==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"android"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/android-x64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/android-x64/-/android-x64-0.24.2.tgz",
"integrity": "sha512-B6Q0YQDqMx9D7rvIcsXfmJfvUYLoP722bgfBlO5cGvNVb5V/+Y7nhBE3mHV9OpxBf4eAS2S68KZztiPaWq4XYw==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"android"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/darwin-arm64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/darwin-arm64/-/darwin-arm64-0.24.2.tgz",
"integrity": "sha512-kj3AnYWc+CekmZnS5IPu9D+HWtUI49hbnyqk0FLEJDbzCIQt7hg7ucF1SQAilhtYpIujfaHr6O0UHlzzSPdOeA==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"darwin"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/darwin-x64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/darwin-x64/-/darwin-x64-0.24.2.tgz",
"integrity": "sha512-WeSrmwwHaPkNR5H3yYfowhZcbriGqooyu3zI/3GGpF8AyUdsrrP0X6KumITGA9WOyiJavnGZUwPGvxvwfWPHIA==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"darwin"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/freebsd-arm64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/freebsd-arm64/-/freebsd-arm64-0.24.2.tgz",
"integrity": "sha512-UN8HXjtJ0k/Mj6a9+5u6+2eZ2ERD7Edt1Q9IZiB5UZAIdPnVKDoG7mdTVGhHJIeEml60JteamR3qhsr1r8gXvg==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"freebsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/freebsd-x64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/freebsd-x64/-/freebsd-x64-0.24.2.tgz",
"integrity": "sha512-TvW7wE/89PYW+IevEJXZ5sF6gJRDY/14hyIGFXdIucxCsbRmLUcjseQu1SyTko+2idmCw94TgyaEZi9HUSOe3Q==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"freebsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-arm": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/linux-arm/-/linux-arm-0.24.2.tgz",
"integrity": "sha512-n0WRM/gWIdU29J57hJyUdIsk0WarGd6To0s+Y+LwvlC55wt+GT/OgkwoXCXvIue1i1sSNWblHEig00GBWiJgfA==",
"cpu": [
"arm"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-arm64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/linux-arm64/-/linux-arm64-0.24.2.tgz",
"integrity": "sha512-7HnAD6074BW43YvvUmE/35Id9/NB7BeX5EoNkK9obndmZBUk8xmJJeU7DwmUeN7tkysslb2eSl6CTrYz6oEMQg==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-ia32": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/linux-ia32/-/linux-ia32-0.24.2.tgz",
"integrity": "sha512-sfv0tGPQhcZOgTKO3oBE9xpHuUqguHvSo4jl+wjnKwFpapx+vUDcawbwPNuBIAYdRAvIDBfZVvXprIj3HA+Ugw==",
"cpu": [
"ia32"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-loong64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/linux-loong64/-/linux-loong64-0.24.2.tgz",
"integrity": "sha512-CN9AZr8kEndGooS35ntToZLTQLHEjtVB5n7dl8ZcTZMonJ7CCfStrYhrzF97eAecqVbVJ7APOEe18RPI4KLhwQ==",
"cpu": [
"loong64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-mips64el": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/linux-mips64el/-/linux-mips64el-0.24.2.tgz",
"integrity": "sha512-iMkk7qr/wl3exJATwkISxI7kTcmHKE+BlymIAbHO8xanq/TjHaaVThFF6ipWzPHryoFsesNQJPE/3wFJw4+huw==",
"cpu": [
"mips64el"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-ppc64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/linux-ppc64/-/linux-ppc64-0.24.2.tgz",
"integrity": "sha512-shsVrgCZ57Vr2L8mm39kO5PPIb+843FStGt7sGGoqiiWYconSxwTiuswC1VJZLCjNiMLAMh34jg4VSEQb+iEbw==",
"cpu": [
"ppc64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-riscv64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/linux-riscv64/-/linux-riscv64-0.24.2.tgz",
"integrity": "sha512-4eSFWnU9Hhd68fW16GD0TINewo1L6dRrB+oLNNbYyMUAeOD2yCK5KXGK1GH4qD/kT+bTEXjsyTCiJGHPZ3eM9Q==",
"cpu": [
"riscv64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-s390x": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/linux-s390x/-/linux-s390x-0.24.2.tgz",
"integrity": "sha512-S0Bh0A53b0YHL2XEXC20bHLuGMOhFDO6GN4b3YjRLK//Ep3ql3erpNcPlEFed93hsQAjAQDNsvcK+hV90FubSw==",
"cpu": [
"s390x"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/linux-x64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/linux-x64/-/linux-x64-0.24.2.tgz",
"integrity": "sha512-8Qi4nQcCTbLnK9WoMjdC9NiTG6/E38RNICU6sUNqK0QFxCYgoARqVqxdFmWkdonVsvGqWhmm7MO0jyTqLqwj0Q==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/netbsd-arm64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/netbsd-arm64/-/netbsd-arm64-0.24.2.tgz",
"integrity": "sha512-wuLK/VztRRpMt9zyHSazyCVdCXlpHkKm34WUyinD2lzK07FAHTq0KQvZZlXikNWkDGoT6x3TD51jKQ7gMVpopw==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"netbsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/netbsd-x64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/netbsd-x64/-/netbsd-x64-0.24.2.tgz",
"integrity": "sha512-VefFaQUc4FMmJuAxmIHgUmfNiLXY438XrL4GDNV1Y1H/RW3qow68xTwjZKfj/+Plp9NANmzbH5R40Meudu8mmw==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"netbsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/openbsd-arm64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/openbsd-arm64/-/openbsd-arm64-0.24.2.tgz",
"integrity": "sha512-YQbi46SBct6iKnszhSvdluqDmxCJA+Pu280Av9WICNwQmMxV7nLRHZfjQzwbPs3jeWnuAhE9Jy0NrnJ12Oz+0A==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"openbsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/openbsd-x64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/openbsd-x64/-/openbsd-x64-0.24.2.tgz",
"integrity": "sha512-+iDS6zpNM6EnJyWv0bMGLWSWeXGN/HTaF/LXHXHwejGsVi+ooqDfMCCTerNFxEkM3wYVcExkeGXNqshc9iMaOA==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"openbsd"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/sunos-x64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/sunos-x64/-/sunos-x64-0.24.2.tgz",
"integrity": "sha512-hTdsW27jcktEvpwNHJU4ZwWFGkz2zRJUz8pvddmXPtXDzVKTTINmlmga3ZzwcuMpUvLw7JkLy9QLKyGpD2Yxig==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"sunos"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/win32-arm64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/win32-arm64/-/win32-arm64-0.24.2.tgz",
"integrity": "sha512-LihEQ2BBKVFLOC9ZItT9iFprsE9tqjDjnbulhHoFxYQtQfai7qfluVODIYxt1PgdoyQkz23+01rzwNwYfutxUQ==",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"win32"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/win32-ia32": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/win32-ia32/-/win32-ia32-0.24.2.tgz",
"integrity": "sha512-q+iGUwfs8tncmFC9pcnD5IvRHAzmbwQ3GPS5/ceCyHdjXubwQWI12MKWSNSMYLJMq23/IUCvJMS76PDqXe1fxA==",
"cpu": [
"ia32"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"win32"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@esbuild/win32-x64": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/@esbuild/win32-x64/-/win32-x64-0.24.2.tgz",
"integrity": "sha512-7VTgWzgMGvup6aSqDPLiW5zHaxYJGTO4OokMjIlrCtf+VpEL+cXKtCvg723iguPYI5oaUNdS+/V7OU2gvXVWEg==",
"cpu": [
"x64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"win32"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@jridgewell/resolve-uri": {
"version": "3.1.2",
"resolved": "https://registry.npmjs.org/@jridgewell/resolve-uri/-/resolve-uri-3.1.2.tgz",
@@ -223,9 +649,9 @@
}
},
"node_modules/@paperclipai/adapter-utils": {
"version": "2026.415.0-canary.7",
"resolved": "https://registry.npmjs.org/@paperclipai/adapter-utils/-/adapter-utils-2026.415.0-canary.7.tgz",
"integrity": "sha512-VNzIZmu1lrK6QM8Ad9WkOihZItfkj21NHKQf+artDcbwFT2hHbDAD9hdW2W9NMVxYdFvvnws3w76FI/BUbCMbQ==",
"version": "2026.428.0",
"resolved": "https://registry.npmjs.org/@paperclipai/adapter-utils/-/adapter-utils-2026.428.0.tgz",
"integrity": "sha512-kGHpE7rhePPCbnG3OwXbNuHZZuI+XyuFgNSiDnrEeiSbkI2c5XHM2WnWDCZ/NGHULfJW3lWhSxGMFoYqiy38vQ==",
"dev": true,
"license": "MIT"
},
@@ -1033,6 +1459,47 @@
"node": ">= 0.4"
}
},
"node_modules/esbuild": {
"version": "0.24.2",
"resolved": "https://registry.npmjs.org/esbuild/-/esbuild-0.24.2.tgz",
"integrity": "sha512-+9egpBW8I3CD5XPe0n6BfT5fxLzxrlDzqydF3aviG+9ni1lDC/OvMHcxqEFV0+LANZG5R1bFMWfUrjVsdwxJvA==",
"dev": true,
"hasInstallScript": true,
"license": "MIT",
"bin": {
"esbuild": "bin/esbuild"
},
"engines": {
"node": ">=18"
},
"optionalDependencies": {
"@esbuild/aix-ppc64": "0.24.2",
"@esbuild/android-arm": "0.24.2",
"@esbuild/android-arm64": "0.24.2",
"@esbuild/android-x64": "0.24.2",
"@esbuild/darwin-arm64": "0.24.2",
"@esbuild/darwin-x64": "0.24.2",
"@esbuild/freebsd-arm64": "0.24.2",
"@esbuild/freebsd-x64": "0.24.2",
"@esbuild/linux-arm": "0.24.2",
"@esbuild/linux-arm64": "0.24.2",
"@esbuild/linux-ia32": "0.24.2",
"@esbuild/linux-loong64": "0.24.2",
"@esbuild/linux-mips64el": "0.24.2",
"@esbuild/linux-ppc64": "0.24.2",
"@esbuild/linux-riscv64": "0.24.2",
"@esbuild/linux-s390x": "0.24.2",
"@esbuild/linux-x64": "0.24.2",
"@esbuild/netbsd-arm64": "0.24.2",
"@esbuild/netbsd-x64": "0.24.2",
"@esbuild/openbsd-arm64": "0.24.2",
"@esbuild/openbsd-x64": "0.24.2",
"@esbuild/sunos-x64": "0.24.2",
"@esbuild/win32-arm64": "0.24.2",
"@esbuild/win32-ia32": "0.24.2",
"@esbuild/win32-x64": "0.24.2"
}
},
"node_modules/estree-walker": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/estree-walker/-/estree-walker-3.0.3.tgz",
+6 -4
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.53",
"version": "0.2.5",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
@@ -25,7 +25,8 @@
"dist"
],
"scripts": {
"build": "tsc",
"build": "tsc && npm run build:ui-parser",
"build:ui-parser": "esbuild src/ui-parser.ts --bundle --format=cjs --target=es2020 --outfile=dist/ui-parser.js --log-level=warning",
"clean": "rm -rf dist",
"typecheck": "tsc --noEmit",
"test": "vitest run",
@@ -37,12 +38,13 @@
"picocolors": "^1.1.1"
},
"peerDependencies": {
"@paperclipai/adapter-utils": ">=2026.415.0-canary.7"
"@paperclipai/adapter-utils": ">=2026.428.0"
},
"devDependencies": {
"@paperclipai/adapter-utils": "2026.415.0-canary.7",
"@paperclipai/adapter-utils": "^2026.428.0",
"@types/node": "^24.6.0",
"@vitest/coverage-v8": "^4.1.4",
"esbuild": "^0.24.0",
"typescript": "^5.7.3",
"vitest": "^4.1.4"
}
+29 -5
View File
@@ -1,7 +1,35 @@
import type { AdapterModel } from "@paperclipai/adapter-utils";
export const type = "claude_k8s";
export const label = "Claude (Kubernetes)";
export const models: undefined = undefined;
function isBedrockEnv(): boolean {
return (
process.env.CLAUDE_CODE_USE_BEDROCK === "1" ||
process.env.CLAUDE_CODE_USE_BEDROCK === "true" ||
(typeof process.env.ANTHROPIC_BEDROCK_BASE_URL === "string" &&
process.env.ANTHROPIC_BEDROCK_BASE_URL.trim().length > 0)
);
}
const DIRECT_MODELS: AdapterModel[] = [
{ id: "claude-opus-4-7", label: "Claude Opus 4.7" },
{ id: "claude-opus-4-6", label: "Claude Opus 4.6" },
{ id: "claude-sonnet-4-6", label: "Claude Sonnet 4.6" },
{ id: "claude-haiku-4-6", label: "Claude Haiku 4.6" },
{ id: "claude-sonnet-4-5-20250929", label: "Claude Sonnet 4.5" },
{ id: "claude-haiku-4-5-20251001", label: "Claude Haiku 4.5" },
];
const BEDROCK_MODELS: AdapterModel[] = [
{ id: "us.anthropic.claude-opus-4-7", label: "Bedrock Opus 4.7" },
{ id: "us.anthropic.claude-opus-4-6-v1", label: "Bedrock Opus 4.6" },
{ id: "us.anthropic.claude-sonnet-4-6", label: "Bedrock Sonnet 4.6" },
{ id: "us.anthropic.claude-sonnet-4-5-20250929-v1:0", label: "Bedrock Sonnet 4.5" },
{ id: "us.anthropic.claude-haiku-4-5-20251001-v1:0", label: "Bedrock Haiku 4.5" },
];
export const models = isBedrockEnv() ? BEDROCK_MODELS : DIRECT_MODELS;
export const agentConfigurationDoc = `# claude_k8s agent configuration
@@ -32,10 +60,6 @@ Kubernetes fields:
- retainJobs (boolean, optional): skip cleanup on completion for debugging
- reattachOrphanedJobs (boolean, optional): when true (default), attach to a running orphaned Job that matches the current agent/task/session instead of blocking; when false, any non-terminal orphan blocks the new run
Output filtering fields:
- enableRtk (boolean, optional): truncate oversized tool outputs before they reach the model via a PostToolUse hook; default false
- rtkMaxOutputBytes (number, optional): byte threshold for tool output truncation when enableRtk is true; default 50000
Operational fields:
- timeoutSec (number, optional): run timeout in seconds; 0 means no timeout
- graceSec (number, optional): additional grace before adapter gives up after Job deadline
+1 -16
View File
@@ -133,22 +133,7 @@ export function getConfigSchema(): AdapterConfigSchema {
label: "Labels",
hint: "Extra labels added to Job metadata. One key=value per line.",
},
// Output filtering (RTK-compatible)
{
type: "toggle",
key: "enableRtk",
label: "Enable Output Filtering",
hint: "Truncate oversized tool outputs before they reach the model, reducing token consumption. Implemented natively in Node.js — no external binary required. Installs a PostToolUse hook in ~/.claude/settings.json for each run.",
default: false,
},
{
type: "number",
key: "rtkMaxOutputBytes",
label: "Max Tool Output Bytes",
hint: "Maximum bytes of tool output to pass to the model when output filtering is enabled. Outputs exceeding this threshold are truncated with a summary. Default: 50000.",
default: 50000,
},
];
return { fields };
}
}
File diff suppressed because it is too large Load Diff
+104 -354
View File
@@ -16,28 +16,12 @@ import {
isClaudeMaxTurnsResult,
isClaudeUnknownSessionError,
} from "./parse.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import { getSelfPodInfo, getBatchApi, getCoreApi } from "./k8s-client.js";
import { buildJobManifest, buildPodLogPath, sanitizeLabelValue } from "./job-manifest.js";
import type * as k8s from "@kubernetes/client-node";
import { Writable } from "node:stream";
const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires
// before force-returning, even if logApi.log has not yet resolved. Defensive
// against the K8s client library not propagating writable.destroy() into an
// abort of the underlying HTTP request.
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
// After the log stream exits (container stopped producing output), wait this
// long for the K8s Job condition to be confirmed before treating the job as
// done. K8s Job conditions can lag pod exit by several seconds or more under
// cluster load. Without this bound, waitForJobCompletion keeps polling while
// streamPodLogs keeps reconnecting — together they can hold execute() open for
// minutes, causing stale "running" status in the UI (FAR-23).
const LOG_EXIT_COMPLETION_GRACE_MS = 30_000;
// Module-level tracking of active Jobs for SIGTERM best-effort cleanup.
interface ActiveJobRef {
@@ -75,6 +59,92 @@ function ensureSigtermHandler(): void {
});
}
/**
* Tail a pod log file from the shared PVC, emitting complete lines via onLog.
* Uses adaptive polling: 250ms when the file is actively growing, backed off
* to 1000ms after 5 consecutive polls with no growth.
*/
interface TailOptions {
onLog: AdapterExecutionContext["onLog"];
stopSignal: { stopped: boolean };
}
async function tailPodLogFile(
filePath: string,
opts: TailOptions,
): Promise<string> {
const { onLog, stopSignal } = opts;
const accumulator: string[] = [];
let pendingLine = "";
let consecutiveIdlePolls = 0;
let pollInterval = 250;
// Wait up to 30s for the file to appear
const deadline = Date.now() + 30_000;
while (Date.now() < deadline) {
try {
await fs.stat(filePath);
break;
} catch {
if (stopSignal.stopped) throw new Error("Stop signal received before log file appeared");
await new Promise((resolve) => setTimeout(resolve, 250));
}
}
// Final check after the wait loop
let handle: fs.FileHandle;
try {
handle = await fs.open(filePath, "r");
} catch {
throw new Error(`Pod log file never appeared at ${filePath}`);
}
let offset = 0;
try {
while (!stopSignal.stopped) {
const stat = await fs.stat(filePath);
const size = stat.size;
if (size > offset) {
const buf = Buffer.alloc(size - offset);
const { bytesRead } = await handle.read(buf, 0, buf.length, offset);
offset += bytesRead;
consecutiveIdlePolls = 0;
pollInterval = 250;
const combined = pendingLine + buf.toString("utf-8", 0, bytesRead);
const lines = combined.split("\n");
pendingLine = lines.pop() ?? "";
for (const line of lines) {
accumulator.push(line);
await onLog("stdout", line + "\n");
}
} else {
consecutiveIdlePolls++;
if (consecutiveIdlePolls >= 5) pollInterval = 1000;
}
if (!stopSignal.stopped) await new Promise((resolve) => setTimeout(resolve, pollInterval));
}
// Final drain
if (offset < (await fs.stat(filePath)).size) {
const stat = await fs.stat(filePath);
const size = stat.size;
const buf = Buffer.alloc(size - offset);
const { bytesRead } = await handle.read(buf, 0, buf.length, offset);
const combined = pendingLine + buf.toString("utf-8", 0, bytesRead);
const lines = combined.split("\n");
pendingLine = lines.pop() ?? "";
for (const line of lines) {
accumulator.push(line);
await onLog("stdout", line + "\n");
}
}
} finally {
await handle.close();
}
return accumulator.join("\n");
}
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
@@ -389,202 +459,6 @@ async function waitForPod(
throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`);
}
/**
* Stream pod logs once via follow. Returns accumulated stdout when the
* stream ends (container exit, API disconnect, or abort signal).
*/
export async function streamPodLogsOnce(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
sinceSeconds?: number,
dedup?: LogLineDedupFilter,
stopSignal?: { stopped: boolean },
): Promise<string> {
const logApi = getLogApi(kubeconfigPath);
const chunks: string[] = [];
const writable = new Writable({
write(chunk: Buffer, _encoding, callback) {
const text = chunk.toString("utf-8");
chunks.push(text);
const emitted = dedup ? dedup.filter(text) : text;
if (!emitted) {
callback();
return;
}
// Forward raw stream-json lines unchanged. The Paperclip UI uses the
// adapter's ui-parser export (src/ui-parser.ts) to render structured
// transcript entries — pre-formatting here would strip that structure
// and produce flat plain text that looks nothing like claude_local.
void onLog("stdout", emitted).then(() => callback(), callback);
},
});
// When the job completion signal fires, destroy the writable to abort the
// in-flight follow stream. Without this, logApi.log can hang indefinitely
// when the pod terminates without closing the HTTP connection cleanly.
let stopPoller: ReturnType<typeof setInterval> | null = null;
let bailTimer: ReturnType<typeof setTimeout> | null = null;
let bailResolve: (() => void) | null = null;
// Bail promise resolves LOG_STREAM_BAIL_TIMEOUT_MS after stopSignal fires,
// even if logApi.log has not resolved by then. This is a safety net for the
// case where writable.destroy() fails to propagate to an abort of the HTTP
// request (e.g. the K8s client is awaiting a response that never comes).
const bailPromise = new Promise<void>((resolve) => {
bailResolve = resolve;
});
if (stopSignal) {
stopPoller = setInterval(() => {
if (stopSignal.stopped) {
if (!writable.destroyed) writable.destroy();
if (!bailTimer && bailResolve) {
bailTimer = setTimeout(() => {
onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {});
bailResolve!();
}, LOG_STREAM_BAIL_TIMEOUT_MS);
}
}
}, 200);
}
const logPromise = logApi.log(namespace, podName, "claude", writable, {
follow: true,
pretty: false,
...(sinceSeconds ? { sinceSeconds } : {}),
}).catch(() => {
// follow may fail if the container already exited, the API connection
// dropped, or we aborted via writable.destroy() — not fatal.
});
try {
if (stopSignal) {
await Promise.race([logPromise, bailPromise]);
} else {
await logPromise;
}
} finally {
if (stopPoller) clearInterval(stopPoller);
if (bailTimer) clearTimeout(bailTimer);
}
return chunks.join("");
}
/**
* Stream pod logs with automatic reconnection. Keeps retrying the log
* stream until the stop signal fires (job completed) or the container
* exits normally. This handles silent K8s API connection drops that
* would otherwise cause the UI to stop receiving real output.
*
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*
* `activity` tracks stream liveness so execute()'s grace timer can
* distinguish a transient K8s log-API reconnect from a real container
* exit (FAR-107). Two signals:
* - `streamHasExited` becomes true on the first return from
* streamPodLogsOnce. Until then we are still in the warm-up window
* and waitForJobCompletion is the authoritative signal — grace must
* not fire.
* - `lastActiveAt` advances every time a streamPodLogsOnce attempt
* returns non-empty output (the container is still producing).
* The grace timer fires only once GRACE_MS have passed since the
* last chunk, so output that resumes after a transient drop keeps
* the run alive.
*/
async function streamPodLogs(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
dedup?: LogLineDedupFilter,
activity?: { lastActiveAt: number; streamHasExited: boolean },
): Promise<string> {
const allChunks: string[] = [];
let attempt = 0;
// Track the timestamp of the last successfully received log line so
// reconnects use a tight window instead of an ever-growing one anchored
// at stream start. This is the primary fix for FAR-105 duplicative logs.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Shared across reconnects so replayed lines inside the `sinceSeconds`
// overlap window are dropped before they reach the streaming UI (FAR-123).
if (!dedup) dedup = new LogLineDedupFilter();
while (!stopSignal?.stopped) {
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
break;
}
// On reconnect, ask for logs since the last received line (+5s buffer)
// instead of since stream start. This keeps the window tight and
// avoids ever-growing duplicate output.
const sinceSeconds = attempt > 0
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
: undefined;
if (attempt > 0) {
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
if (activity) activity.streamHasExited = true;
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
// so any log lines in `result` were received up to this moment).
lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Refresh stream liveness so the grace timer in execute() does not
// fire while output is still flowing through reconnects (FAR-107).
if (activity) activity.lastActiveAt = Date.now();
} else if (attempt === 0) {
// First attempt returned nothing — update timestamp so reconnect
// window stays reasonable.
lastLogReceivedAt = preStreamTs;
}
attempt++;
// If the job is done or the container exited, no need to reconnect.
if (stopSignal?.stopped) break;
// Brief pause before reconnecting to avoid tight loops.
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
}
// Flush any buffered partial line so the final assistant/result chunk
// isn't dropped when the stream ends mid-line.
const tail = dedup.flush();
if (tail) await onLog("stdout", tail);
return allChunks.join("");
}
/**
* One-shot read of pod logs (no follow). Used as fallback when the
* follow stream missed output because the container exited quickly.
*/
async function readPodLogs(
namespace: string,
podName: string,
kubeconfigPath?: string,
): Promise<string> {
const coreApi = getCoreApi(kubeconfigPath);
try {
const log = await coreApi.readNamespacedPodLog({
name: podName,
namespace,
container: "claude",
});
return typeof log === "string" ? log : "";
} catch {
return "";
}
}
/**
* Wait for the Job to reach a terminal state (Complete or Failed).
* Returns the Job's final status. A 404 (job deleted by TTL or externally)
@@ -777,6 +651,7 @@ async function cleanupJob(
jobName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
podLogPath?: string,
): Promise<void> {
try {
const batchApi = getBatchApi(kubeconfigPath);
@@ -785,6 +660,9 @@ async function cleanupJob(
namespace,
body: { propagationPolicy: "Background" },
});
if (podLogPath) {
try { await fs.unlink(podLogPath); } catch { /* non-fatal */ }
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Warning: failed to cleanup job ${jobName}: ${msg}\n`);
@@ -838,6 +716,8 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let jobName!: string;
// eslint-disable-next-line prefer-const
let namespace!: string;
// eslint-disable-next-line prefer-const
let podLogPath!: string;
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
// runtimeSessionParams and currentSessionIdRaw are also used after the
// try block (in the result-parsing section) so hoist them here.
@@ -1010,6 +890,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
if (reattachTarget) {
jobName = reattachTarget.jobName;
namespace = reattachTarget.namespace;
podLogPath = buildPodLogPath(ctx.agent.companyId, ctx.agent.id, runId);
// Announce reattach metadata. Prompt and args aren't known here — they
// belong to the prior run that created this pod and are already present
@@ -1062,6 +943,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const claudeArgs = built.claudeArgs;
const promptMetrics = built.promptMetrics;
promptSecret = built.promptSecret;
podLogPath = built.podLogPath;
if (built.skippedLabels.length > 0) {
await onLog("stderr", `[paperclip] Warning: skipped ${built.skippedLabels.length} extra label(s) with reserved prefix: ${built.skippedLabels.join(", ")}\n`);
}
@@ -1179,6 +1061,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
_releaseMutex();
}
// eslint-disable-next-line @typescript-eslint/no-shadow
let stdout = "";
let exitCode: number | null = null;
let podTerminatedState: PodTerminatedState | null = null;
@@ -1261,12 +1144,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let keepaliveJobTerminal = false;
let consecutiveTerminalReadings = 0;
// Shared signal: when job completion resolves, tell the log streamer to
// stop reconnecting. Declared before keepaliveTimer so the cancel path
// inside the timer can set it without temporal dead zone issues.
// stop. Declared before keepaliveTimer so the cancel path inside the
// timer can set it without temporal dead zone issues.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
// Set when the run is externally cancelled (cancel-poll path).
let cancelled = false;
@@ -1349,142 +1229,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return onLog(stream, chunk);
};
// Track stream liveness so the grace timer below only fires when output
// has actually stopped — not on a transient K8s log-API reconnect that
// streamPodLogs heals on its own (FAR-107).
const streamActivity: { lastActiveAt: number; streamHasExited: boolean } = {
lastActiveAt: Date.now(),
streamHasExited: false,
};
const trackedLogStream = streamPodLogs(
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
streamActivity,
);
// completionWithGrace races waitForJobCompletion against a grace timer
// that fires LOG_EXIT_COMPLETION_GRACE_MS after the log stream exits.
// This bounds the stale-UI window when K8s Job conditions lag container
// exit (FAR-23): without it, waitForJobCompletion polls indefinitely
// while streamPodLogs reconnects, holding execute() open for minutes.
// logStopSignal.stopped is set on every settled path (fulfilled, rejected,
// or grace) so streamPodLogs stops reconnecting promptly.
type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean; gracePeriodFired?: boolean };
let gracePoller: ReturnType<typeof setInterval> | null = null;
const completionWithGrace = new Promise<CompletionResult>((resolve, reject) => {
let settled = false;
const settleOk = (r: CompletionResult) => {
if (settled) return;
settled = true;
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
logStopSignal.stopped = true;
resolve(r);
};
const settleErr = (err: unknown) => {
if (settled) return;
settled = true;
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
logStopSignal.stopped = true;
reject(err);
};
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr);
gracePoller = setInterval(() => {
// Only consider grace once the stream has exited at least once.
// Until then we are still in the warm-up window and
// waitForJobCompletion is the authoritative signal. Once the
// stream has exited, fire only after GRACE_MS of inactivity
// measured against the last received chunk — output that resumes
// through a reconnect resets the clock so transient drops do not
// truncate live runs (FAR-107).
if (
streamActivity.streamHasExited &&
Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS
) {
// Stop the grace poller immediately so we don't double-fire while the
// verification read below is in flight.
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
// The log stream exiting only means the container stopped producing
// output — it does NOT prove the Job was deleted. Verify Job
// presence with a one-shot read so we can distinguish:
// (a) Job 404 → truly gone (TTL or external deletion)
// (b) Job still present → K8s condition propagation lag (FAR-23)
// Without this check we mis-classify (b) as "deleted externally" and
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
void (async () => {
try {
await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace });
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
} catch (err: unknown) {
if (isK8s404(err)) {
jobGoneDetectionPath = "grace-period-verify-404";
jobGoneAt = Date.now();
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago and Job ${jobName} is gone (TTL or external deletion) — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, jobGone: true });
} else {
// K8s API hiccup — bail out without claiming external deletion.
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago; Job state unverifiable (${err instanceof Error ? err.message : String(err)}) — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
}
}
})();
}
}, 1_000);
});
const [logResult, completionResult] = await Promise.allSettled([
trackedLogStream,
completionWithGrace,
const [tailResult, completionResult] = await Promise.allSettled([
tailPodLogFile(podLogPath, { onLog: wrappedOnLog, stopSignal: logStopSignal }),
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(r => { logStopSignal.stopped = true; return r; }),
]);
// Stop the keepalive immediately once the job has reached a terminal
// state — do not wait for the finally block.
if (keepaliveTimer) {
clearInterval(keepaliveTimer);
keepaliveTimer = null;
}
// If the run was externally cancelled, return a clean cancelled result
// without processing stdout (the finally block still runs for cleanup).
if (cancelled) {
return {
exitCode: null,
signal: null,
timedOut: false,
errorCode: "cancelled",
errorMessage: "Run cancelled",
};
}
if (logResult.status === "fulfilled") {
stdout = logResult.value;
}
// One-shot log fallback: handles two failure modes with a single read.
// Mode 1 — empty stream: the follow stream returned nothing (fast exit before connection).
// Mode 2 — partial stream: we have some output but no result event (follow stream raced
// with container exit and captured only the init line before the connection dropped).
// A one-shot readPodLogs is more reliable for already-terminated containers and reads
// from the beginning of the log, giving us the full output.
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
// the authoritative parse happens once below after all fallbacks complete).
const hasResultEvent = stdout.split("\n").some((l) => { try { return JSON.parse(l).type === "result"; } catch { return false; } });
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
if (needsOneShot) {
if (!stdout.trim()) {
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
}
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
if (!stdout.trim() && oneShotLogs.trim()) {
stdout = oneShotLogs;
const deduped = logDedup.filter(stdout) + logDedup.flush();
if (deduped) await onLog("stdout", deduped);
} else if (oneShotLogs && oneShotLogs.length > stdout.length) {
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
const deduped = logDedup.filter(oneShotLogs) + logDedup.flush();
if (deduped) await onLog("stdout", deduped);
stdout = oneShotLogs;
}
}
stdout = tailResult.status === "fulfilled" ? tailResult.value : "";
if (completionResult.status === "fulfilled") {
jobTimedOut = completionResult.value.timedOut;
@@ -1544,7 +1294,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
if (skipCleanup) {
await onLog("stdout", `[paperclip] Retaining job ${jobName} (state mismatch — UI is waiting on it)\n`);
} else if (!retainJobs) {
await cleanupJob(namespace, jobName, onLog, kubeconfigPath);
await cleanupJob(namespace, jobName, onLog, kubeconfigPath, podLogPath);
} else {
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
}
+52 -95
View File
@@ -1,6 +1,6 @@
import { describe, it, expect, beforeEach } from "vitest";
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
import { buildJobManifest, buildRtkSetupCommands, sanitizeLabelValue } from "./job-manifest.js";
import { buildJobManifest, buildPodLogPath, sanitizeLabelValue } from "./job-manifest.js";
import type { SelfPodInfo } from "./k8s-client.js";
function makeCtx(overrides: Partial<AdapterExecutionContext> = {}): AdapterExecutionContext {
@@ -221,11 +221,9 @@ describe("buildJobManifest", () => {
it("omits paperclip.io/run-id when sanitized value is null (all-invalid runId)", () => {
// inject an all-special-chars runId via context override — buildJobManifest
// uses ctx.runId directly
// uses ctx.runId directly. Use characters that are path-valid but label-invalid.
const badCtx = makeCtx({ runId: "@@@" });
const { job, skippedLabels } = buildJobManifest({ ctx: badCtx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/run-id"]).toBeUndefined();
expect(skippedLabels).toContain("paperclip.io/run-id");
expect(() => buildJobManifest({ ctx: badCtx, selfPod })).toThrow("Invalid runId");
});
it("selector matches sanitized agent-id label", () => {
@@ -301,7 +299,9 @@ describe("buildJobManifest", () => {
it("write-prompt writes PROMPT_CONTENT to /tmp/prompt/prompt.txt", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const init = job.spec?.template?.spec?.initContainers?.[0];
expect(init?.command).toEqual(["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"]);
expect(init?.command?.[0]).toBe("sh");
expect(init?.command?.[1]).toBe("-c");
expect(init?.command?.[2]).toBe("printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt");
});
it("write-prompt mounts prompt volume", () => {
@@ -794,112 +794,69 @@ describe("buildJobManifest", () => {
});
});
describe("rtk output filtering", () => {
describe("pod log file tailing", () => {
it("does not modify main command when enableRtk is false (default)", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const cmd = job.spec?.template?.spec?.containers[0]?.command;
// Command should be the plain `cat ... | claude ...` form with no rtk setup
expect(cmd?.[2]).toMatch(/^cat \/tmp\/prompt\/prompt\.txt \| claude /);
// Command should be the plain `cat ... | claude ... | tee ...` form with no rtk setup
expect(cmd?.[2]).toMatch(/^cat \/tmp\/prompt\/prompt\.txt \| claude .* \| tee /);
expect(cmd?.[2]).not.toContain("rtk-filter");
});
it("prepends RTK setup commands when enableRtk is true", () => {
ctx.config = { enableRtk: true };
const { job } = buildJobManifest({ ctx, selfPod });
const cmd = job.spec?.template?.spec?.containers[0]?.command;
expect(cmd?.[2]).toContain(".rtk-filter.js");
expect(cmd?.[2]).toContain("cat /tmp/prompt/prompt.txt | claude");
});
it("RTK setup runs before claude invocation", () => {
ctx.config = { enableRtk: true };
it("command includes tee to pod log path", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? "";
const rtkIdx = cmd.indexOf(".rtk-filter.js");
const claudeIdx = cmd.indexOf("cat /tmp/prompt/prompt.txt | claude");
expect(rtkIdx).toBeGreaterThanOrEqual(0);
expect(claudeIdx).toBeGreaterThan(rtkIdx);
expect(cmd).toContain("| tee");
expect(cmd).toContain("/paperclip/instances/default/data/run-logs/");
});
it("RTK setup uses node (no external binaries)", () => {
ctx.config = { enableRtk: true };
it("podLogPath is returned from buildJobManifest", () => {
const result = buildJobManifest({ ctx, selfPod });
expect(result.podLogPath).toBe(
"/paperclip/instances/default/data/run-logs/co1/agent-abc/run-abc12345.pod.ndjson",
);
});
it("buildPodLogPath returns correctly formatted path", () => {
expect(buildPodLogPath("co1", "agent-abc", "run-abc12345")).toBe(
"/paperclip/instances/default/data/run-logs/co1/agent-abc/run-abc12345.pod.ndjson",
);
});
it("init container does not create log directory (server pre-creates it on shared PVC)", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? "";
// Should only use `node` — no curl, wget, apt, pip, etc.
expect(cmd).not.toMatch(/\b(curl|wget|apt|yum|pip|gem|cargo|go\s+get)\b/);
expect(cmd).toContain("node ");
const initCmd = job.spec?.template?.spec?.initContainers?.[0]?.command;
expect(initCmd?.[2]).not.toContain("mkdir -p /paperclip");
});
it("uses default 50000 byte threshold when rtkMaxOutputBytes not set", () => {
ctx.config = { enableRtk: true };
const setup = buildRtkSetupCommands(50000);
// The filter script base64 should decode to contain the MAX constant
const b64Match = setup.match(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/);
expect(b64Match).not.toBeNull();
const decoded = Buffer.from(b64Match![1], "base64").toString("utf-8");
expect(decoded).toContain("50000");
it("sanitizes companyId with / to valid path component for log path", () => {
const badCtx = {
...ctx,
agent: { ...ctx.agent, companyId: "co/1" },
};
const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod });
// / is stripped by sanitizeForK8sPath
expect(podLogPath).toContain("co1/");
});
it("respects custom rtkMaxOutputBytes", () => {
ctx.config = { enableRtk: true, rtkMaxOutputBytes: 100000 };
const { job } = buildJobManifest({ ctx, selfPod });
const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? "";
// The custom threshold should appear in the base64-encoded filter script
const b64Matches = [...cmd.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
const decoded = b64Matches.map((m) => Buffer.from(m[1], "base64").toString("utf-8")).join("\n");
expect(decoded).toContain("100000");
it("sanitizes agentId with @ to valid path component for log path", () => {
const badCtx = {
...ctx,
agent: { ...ctx.agent, id: "agent@123" },
};
const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod });
// @ is stripped by sanitizeForK8sPath
expect(podLogPath).toContain("/agent123/");
});
it("RTK setup installs a PostToolUse hook in claude settings", () => {
const setup = buildRtkSetupCommands(50000);
// The settings script (second base64 block) should reference PostToolUse
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
expect(b64Matches.length).toBeGreaterThanOrEqual(2);
const settingsScript = Buffer.from(b64Matches[1]![1], "base64").toString("utf-8");
expect(settingsScript).toContain("PostToolUse");
expect(settingsScript).toContain("settings.json");
});
it("filter script handles string content truncation", () => {
// Decode the filter script and verify it truncates string content
const setup = buildRtkSetupCommands(1000);
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
expect(filterScript).toContain("MAX=1000");
expect(filterScript).toContain("truncated by paperclip-rtk");
expect(filterScript).toContain("tool_response");
expect(filterScript).toContain("tool_result");
});
it("filter script truncates without corrupting multi-byte UTF-8", () => {
// "中" is U+4E2D, 3 bytes in UTF-8: E4 B8 AD
// With MAX=5, two "中" (6 bytes) should truncate to one (3 bytes), not
// produce a replacement character from slicing mid-codepoint.
const setup = buildRtkSetupCommands(5);
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
// Extract the trunc function from the filter script and evaluate it
const fnMatch = filterScript.match(/(function trunc\(s\)\{.*\})(?=const tr=)/);
expect(fnMatch).toBeTruthy();
// eslint-disable-next-line no-eval
const trunc = eval(`(()=>{const MAX=5;${fnMatch![1]};return trunc;})()`);
const result = trunc("中中");
expect(result).not.toContain("");
expect(result).toContain("中");
expect(result).toContain("truncated by paperclip-rtk");
// Should report bytes from the actual truncation point, not MAX
expect(result).toContain("3 bytes truncated");
});
it("filter script handles array content (block format)", () => {
const setup = buildRtkSetupCommands(50000);
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
// Should handle array content blocks (text field on each block)
expect(filterScript).toContain("Array.isArray");
expect(filterScript).toContain("b.text");
it("sanitizes runId with underscore to valid path component for log path", () => {
const badCtx = {
...ctx,
runId: "run_123",
};
const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod });
// _ is stripped by sanitizeForK8sPath
expect(podLogPath).toContain("/run123.pod.ndjson");
});
});
});
+23 -88
View File
@@ -12,85 +12,18 @@ import {
import { createHash } from "node:crypto";
import type { ClaudePromptBundle } from "./prompt-cache.js";
/**
* Build the shell command prefix that installs a native Node.js PostToolUse
* hook into Claude Code's settings. The hook truncates oversized tool outputs
* before they reach the model — replacing the RTK binary init-container
* approach with a self-contained Node.js implementation.
*
* Both scripts are base64-encoded so they can be embedded in a sh -c command
* string without any quoting or escaping issues.
*
* @param maxOutputBytes Byte threshold above which tool output is truncated.
* @returns A shell command string (suitable for "&&"-chaining
* before the claude invocation).
*/
export function buildRtkSetupCommands(maxOutputBytes: number): string {
// --- Filter script ----------------------------------------------------------
// This script runs as the PostToolUse hook inside every K8s Job pod.
// Claude Code writes the hook event as JSON to the script's stdin; the script
// truncates the tool_response/tool_result content when it exceeds the
// threshold and writes the (possibly modified) JSON to stdout.
//
// Field-name coverage:
// • tool_response — documented hook event format for PostToolUse
// • tool_result — alternative name seen in some Claude Code versions
// Content may be a plain string or an array of typed blocks (text/image/…).
const filterScript = [
`const c=[];`,
`process.stdin.on('data',d=>c.push(d));`,
`process.stdin.on('end',()=>{`,
`const raw=Buffer.concat(c).toString('utf-8');`,
`let o;try{o=JSON.parse(raw);}catch{process.stdout.write(raw);return;}`,
`const MAX=${maxOutputBytes};`,
`function trunc(s){`,
`if(typeof s!=='string')return s;`,
`const b=Buffer.from(s,'utf-8');`,
`if(b.length<=MAX)return s;`,
`let e=MAX;if(e>0){let p=e-1;while(p>0&&(b[p]&0xC0)===0x80)p--;const l=b[p];let n=1;if((l&0xE0)===0xC0)n=2;else if((l&0xF0)===0xE0)n=3;else if((l&0xF8)===0xF0)n=4;if(p+n>e)e=p;}`,
`return b.slice(0,e).toString('utf-8')+'\\n[...'+(b.length-e)+' bytes truncated by paperclip-rtk]';`,
`}`,
`const tr=o&&(o.tool_response||o.tool_result);`,
`if(tr){`,
`if(typeof tr.content==='string'){tr.content=trunc(tr.content);}`,
`else if(Array.isArray(tr.content)){`,
`tr.content=tr.content.map(function(b){`,
`if(b&&typeof b==='object'&&typeof b.text==='string'){`,
`return Object.assign({},b,{text:trunc(b.text)});`,
`}return b;`,
`});`,
`}`,
`}`,
`process.stdout.write(JSON.stringify(o));`,
`});`,
].join("");
function assertSafePathComponent(field: string, value: string): void {
if (!/^[a-zA-Z0-9-]+$/.test(value)) {
throw new Error(`Invalid ${field} for log path: ${value}`);
}
}
// --- Settings script --------------------------------------------------------
// Reads the existing ~/.claude/settings.json (if any), merges in the RTK
// PostToolUse hook, and writes the file back. All other settings sections
// are preserved; only PostToolUse is replaced so we own the full hook list
// for this run.
const settingsScript = [
`const fs=require('fs'),pt=require('path');`,
`const p=pt.join(process.env.HOME,'.claude','settings.json');`,
`let s={};try{s=JSON.parse(fs.readFileSync(p,'utf-8'));}catch(e){}`,
`s.hooks=s.hooks||{};`,
`s.hooks.PostToolUse=[{matcher:'.*',hooks:[{type:'command',command:'node /tmp/.rtk-filter.js'}]}];`,
`fs.mkdirSync(pt.dirname(p),{recursive:true});`,
`fs.writeFileSync(p,JSON.stringify(s));`,
].join("");
function sanitizeForK8sPath(value: string): string {
return value.replace(/[^a-zA-Z0-9-]/g, "");
}
// Encode as base64 so the strings can be embedded directly in a shell command
// without any quoting concerns (base64 alphabet: A-Za-z0-9+/=).
const filterB64 = Buffer.from(filterScript, "utf-8").toString("base64");
const settingsB64 = Buffer.from(settingsScript, "utf-8").toString("base64");
return [
// Write the filter script
`node -e "require('fs').writeFileSync('/tmp/.rtk-filter.js',Buffer.from('${filterB64}','base64').toString('utf-8'))"`,
// Install the Claude Code PostToolUse hook (merge into existing settings)
`node -e "eval(Buffer.from('${settingsB64}','base64').toString('utf-8'))"`,
].join(" && ");
export function buildPodLogPath(companyId: string, agentId: string, runId: string): string {
return `/paperclip/instances/default/data/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`;
}
/** Prompts above this size (bytes) are staged via a Secret instead of an
@@ -202,6 +135,8 @@ export interface JobBuildResult {
promptSecret: PromptSecret | null;
/** User-supplied extra labels that were dropped because they used a reserved prefix. */
skippedLabels: string[];
/** Path to the pod log file on the shared PVC. */
podLogPath: string;
}
function sanitizeForK8sName(value: string, maxLen = 16): string {
@@ -353,8 +288,6 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
const extraLabels = parseKeyValueConfig(config.labels);
const enableRtk = asBoolean(config.enableRtk, false);
const rtkMaxOutputBytes = asNumber(config.rtkMaxOutputBytes, 50000);
// Resolve working directory — use workspace cwd, fall back to /paperclip
const workspaceContext = parseObject(context.paperclipWorkspace);
@@ -535,13 +468,15 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
// Build the claude command string for the main container
const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
const claudeInvocation = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
// When RTK output filtering is enabled, prepend the Node.js hook setup.
// This writes a filter script and a Claude Code settings file that installs
// it as a PostToolUse hook — no external binary or init container required.
const mainCommand = enableRtk
? `${buildRtkSetupCommands(rtkMaxOutputBytes)} && ${claudeInvocation}`
: claudeInvocation;
const logPathCompanyId = sanitizeForK8sPath(agent.companyId);
const logPathAgentId = sanitizeForK8sPath(agent.id);
const logPathRunId = sanitizeForK8sPath(runId);
assertSafePathComponent("companyId", logPathCompanyId);
assertSafePathComponent("agentId", logPathAgentId);
assertSafePathComponent("runId", logPathRunId);
const podLogPath = buildPodLogPath(logPathCompanyId, logPathAgentId, logPathRunId);
const claudeInvocation = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped} | tee ${podLogPath}`;
const mainCommand = claudeInvocation;
// Decide prompt delivery strategy: env var (small) or Secret volume (large).
const promptBytes = Buffer.byteLength(prompt, "utf-8");
@@ -584,7 +519,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
name: "write-prompt",
image: "busybox:1.36",
imagePullPolicy: "IfNotPresent",
command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
command: ["sh", "-c", `printf '%s' "$PROMPT_CONTENT" > /tmp/prompt/prompt.txt`],
env: [{ name: "PROMPT_CONTENT", value: prompt }],
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
securityContext,
@@ -641,5 +576,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
},
};
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret, skippedLabels };
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret, skippedLabels, podLogPath };
}
-173
View File
@@ -1,173 +0,0 @@
import { describe, it, expect } from "vitest";
import { LogLineDedupFilter, eventDedupKey } from "./log-dedup.js";
function assistantEvent(id: string, text: string): string {
return JSON.stringify({
type: "assistant",
session_id: "sess_1",
message: {
id,
content: [{ type: "text", text }],
},
});
}
function userToolResultEvent(toolUseId: string, content: string): string {
return JSON.stringify({
type: "user",
session_id: "sess_1",
message: {
content: [{ type: "tool_result", tool_use_id: toolUseId, content }],
},
});
}
function systemInitEvent(sessionId: string): string {
return JSON.stringify({
type: "system",
subtype: "init",
session_id: sessionId,
model: "claude-opus-4-7",
});
}
function resultEvent(sessionId: string): string {
return JSON.stringify({
type: "result",
subtype: "success",
session_id: sessionId,
result: "done",
total_cost_usd: 0.01,
usage: { input_tokens: 1, output_tokens: 1, cache_read_input_tokens: 0 },
});
}
describe("eventDedupKey", () => {
it("keys assistant events by message.id", () => {
const key = eventDedupKey(JSON.parse(assistantEvent("msg_abc", "hi")));
expect(key).toBe("assistant:msg_abc");
});
it("keys user tool_result events by tool_use_id", () => {
const key = eventDedupKey(JSON.parse(userToolResultEvent("toolu_1", "ok")));
expect(key).toBe("user:tool_result:toolu_1");
});
it("keys system init events by session_id", () => {
const key = eventDedupKey(JSON.parse(systemInitEvent("sess_xyz")));
expect(key).toBe("system:init:sess_xyz");
});
it("keys result events by session_id", () => {
const key = eventDedupKey(JSON.parse(resultEvent("sess_xyz")));
expect(key).toBe("result:sess_xyz");
});
it("returns null for assistant events missing message.id", () => {
const event = { type: "assistant", message: { content: [] } };
expect(eventDedupKey(event)).toBeNull();
});
it("returns null for unknown event types", () => {
expect(eventDedupKey({ type: "unknown" })).toBeNull();
expect(eventDedupKey({})).toBeNull();
});
});
describe("LogLineDedupFilter", () => {
it("passes unique lines through unchanged", () => {
const filter = new LogLineDedupFilter();
const a = assistantEvent("msg_1", "hello");
const b = assistantEvent("msg_2", "world");
expect(filter.filter(`${a}\n${b}\n`)).toBe(`${a}\n${b}\n`);
});
it("drops assistant events replayed with the same message.id", () => {
const filter = new LogLineDedupFilter();
const a = assistantEvent("msg_1", "Three nits to fix.");
filter.filter(`${a}\n`);
expect(filter.filter(`${a}\n`)).toBe("");
});
it("drops user tool_result events replayed with the same tool_use_id", () => {
const filter = new LogLineDedupFilter();
const a = userToolResultEvent("toolu_abc", "file contents");
filter.filter(`${a}\n`);
expect(filter.filter(`${a}\n`)).toBe("");
});
it("drops system init and result events on replay", () => {
const filter = new LogLineDedupFilter();
const init = systemInitEvent("sess_1");
const result = resultEvent("sess_1");
filter.filter(`${init}\n${result}\n`);
expect(filter.filter(`${init}\n${result}\n`)).toBe("");
});
it("buffers incomplete trailing lines across chunks", () => {
const filter = new LogLineDedupFilter();
const line = assistantEvent("msg_1", "hello");
const mid = Math.floor(line.length / 2);
const out1 = filter.filter(line.slice(0, mid));
const out2 = filter.filter(line.slice(mid) + "\n");
expect(out1).toBe("");
expect(out2).toBe(`${line}\n`);
});
it("flush() emits a final incomplete line that was not replayed", () => {
const filter = new LogLineDedupFilter();
const line = assistantEvent("msg_tail", "no newline");
filter.filter(line);
expect(filter.flush()).toBe(line);
});
it("flush() drops an incomplete line that was already seen with a newline", () => {
const filter = new LogLineDedupFilter();
const line = assistantEvent("msg_same", "x");
filter.filter(`${line}\n`);
filter.filter(line);
expect(filter.flush()).toBe("");
});
it("passes non-JSON lines through every time (does not dedup paperclip status)", () => {
const filter = new LogLineDedupFilter();
const status = "[paperclip] keepalive — job foo running\n";
expect(filter.filter(status)).toBe(status);
expect(filter.filter(status)).toBe(status);
});
it("dedups structurally identical JSON with identical content (raw fallback)", () => {
const filter = new LogLineDedupFilter();
// No recognized type → raw fallback key.
const line = JSON.stringify({ foo: "bar", baz: 1 });
filter.filter(`${line}\n`);
expect(filter.filter(`${line}\n`)).toBe("");
});
it("handles multiple complete lines in a single chunk with partial trailing", () => {
const filter = new LogLineDedupFilter();
const a = assistantEvent("msg_a", "a");
const b = assistantEvent("msg_b", "b");
const c = assistantEvent("msg_c", "c");
// a and b are complete, c is partial (no trailing newline).
const out = filter.filter(`${a}\n${b}\n${c}`);
expect(out).toBe(`${a}\n${b}\n`);
// Completing c later should emit exactly c.
expect(filter.filter("\n")).toBe(`${c}\n`);
});
it("drops the classic FAR-123 replay scenario across reconnects", () => {
const filter = new LogLineDedupFilter();
const assistantNits = assistantEvent("msg_nits", "Three nits to fix. Let me look at an existing test file...");
const assistantWrite = assistantEvent("msg_write", "Now I need to write unit tests");
// First stream attempt emits both events.
const out1 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
expect(out1).toBe(`${assistantNits}\n${assistantWrite}\n`);
// Reconnect replays both within the sinceSeconds overlap — filter should drop them.
const out2 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
expect(out2).toBe("");
// And a genuinely new event after the replay should still pass through.
const assistantFresh = assistantEvent("msg_fresh", "next turn");
expect(filter.filter(`${assistantFresh}\n`)).toBe(`${assistantFresh}\n`);
});
});
-146
View File
@@ -1,146 +0,0 @@
/**
* Line-level dedup filter for the K8s log stream.
*
* The K8s log follow stream can reconnect with an overlapping `sinceSeconds`
* window (integer-second granularity + a safety buffer), which replays a few
* seconds of recent output on every reconnect. Without dedup those replayed
* lines appear as duplicate events in the streaming UI — the same assistant
* text block shows up between every subsequent tool call (FAR-123).
*
* The filter operates at the chunk → line level: chunks are split on `\n`,
* incomplete trailing content is buffered until the next chunk, and each
* complete line is emitted at most once. JSON-shaped Claude stream-json
* events are keyed by their stable structural IDs; non-JSON lines pass
* through unchanged so genuinely-repeated status lines are not swallowed.
*/
type Parsed = Record<string, unknown>;
function asString(value: unknown): string {
return typeof value === "string" ? value : "";
}
function asRecord(value: unknown): Parsed | null {
if (typeof value !== "object" || value === null || Array.isArray(value)) return null;
return value as Parsed;
}
/**
* Build a stable dedup key for a Claude stream-json event. Returns `null`
* when the event is not a recognized Claude event — those lines fall back to
* raw-content hashing so non-JSON output (paperclip status lines, shell
* output) is never deduped by identity.
*/
export function eventDedupKey(event: Parsed): string | null {
const type = asString(event.type);
if (type === "system") {
const subtype = asString(event.subtype);
const sessionId = asString(event.session_id);
if (subtype === "init" && sessionId) return `system:init:${sessionId}`;
return null;
}
if (type === "assistant") {
const message = asRecord(event.message);
const id = message ? asString(message.id) : "";
if (id) return `assistant:${id}`;
return null;
}
if (type === "user") {
const message = asRecord(event.message);
const content = message && Array.isArray(message.content) ? message.content : [];
const toolUseIds: string[] = [];
for (const entry of content) {
const block = asRecord(entry);
if (!block) continue;
const toolUseId = asString(block.tool_use_id);
if (toolUseId) toolUseIds.push(toolUseId);
}
if (toolUseIds.length > 0) return `user:tool_result:${toolUseIds.join(",")}`;
return null;
}
if (type === "result") {
const sessionId = asString(event.session_id);
return sessionId ? `result:${sessionId}` : "result:unknown";
}
return null;
}
/**
* Stateful line-level dedup filter. Emits `filter(chunk)` output through
* the caller — preserves original chunk formatting (including trailing
* newlines) for lines that pass the dedup check.
*/
export class LogLineDedupFilter {
private buffer = "";
private readonly seenKeys = new Set<string>();
/**
* Process a chunk and return the subset that should be forwarded.
* Incomplete trailing content (no terminating newline) is buffered and
* emitted on the next chunk that completes the line (or on flush()).
*/
filter(chunk: string): string {
if (!chunk) return "";
const combined = this.buffer + chunk;
const endsWithNewline = combined.endsWith("\n");
const parts = combined.split("\n");
if (endsWithNewline) {
// Discard the final empty element — last line was complete.
parts.pop();
this.buffer = "";
} else {
// Last element is an incomplete line — hold it for the next chunk.
this.buffer = parts.pop() ?? "";
}
const out: string[] = [];
for (const line of parts) {
if (this.shouldEmit(line)) out.push(line);
}
if (out.length === 0) return "";
return out.join("\n") + "\n";
}
/**
* Flush any incomplete trailing content. Called when the stream ends
* without a terminating newline so the final partial line isn't lost.
*/
flush(): string {
const pending = this.buffer;
this.buffer = "";
if (!pending) return "";
return this.shouldEmit(pending) ? pending : "";
}
private shouldEmit(line: string): boolean {
const trimmed = line.trim();
if (!trimmed) return true;
// Only attempt dedup on JSON-shaped lines; pass shell/text output through.
if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true;
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch {
return true;
}
const event = asRecord(parsed);
if (!event) return true;
// Recognized Claude stream-json event → structural key.
const structuralKey = eventDedupKey(event);
const key = structuralKey ?? `raw:${trimmed}`;
if (this.seenKeys.has(key)) return false;
this.seenKeys.add(key);
return true;
}
}
+1
View File
@@ -50,3 +50,4 @@ describe("listK8sModels", () => {
expect(models.some((m) => m.id === "claude-opus-4-7")).toBe(true);
});
});