feat: replace k8s log API streaming with filesystem tailing

Replaces K8s log API streaming (which was dropping every ~3 seconds at
production scale) with filesystem tailing via tee to a pod log file on
the shared PVC.

Core changes:
- Add tee to claudeInvocation to write pod log file
- Add mkdir -p to init container to create log directory
- Add assertSafePathComponent and buildPodLogPath helper
- Add tailPodLogFile function with adaptive 250ms/1s polling
- Replace k8s log streaming with tailPodLogFile in Promise.allSettled
- Delete log-dedup.ts (RTK output truncation no longer needed)
- Update config-schema.ts and index.ts to remove RTK references
- Clean up log file in cleanupJob when retainJobs=false

Note: 14 tests in execute.test.ts test the obsolete k8s log streaming
approach and need to be rewritten or deleted (streamPodLogsOnce tests).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-27 21:05:48 -04:00
parent 568f571d8c
commit 8bd5042b5d
10 changed files with 190 additions and 1931 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.54",
"version": "0.2.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.54",
"version": "0.2.0",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.57",
"version": "0.2.0",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
-4
View File
@@ -60,10 +60,6 @@ Kubernetes fields:
- retainJobs (boolean, optional): skip cleanup on completion for debugging
- reattachOrphanedJobs (boolean, optional): when true (default), attach to a running orphaned Job that matches the current agent/task/session instead of blocking; when false, any non-terminal orphan blocks the new run
Output filtering fields:
- enableRtk (boolean, optional): truncate oversized tool outputs before they reach the model via a PostToolUse hook; default false
- rtkMaxOutputBytes (number, optional): byte threshold for tool output truncation when enableRtk is true; default 50000
Operational fields:
- timeoutSec (number, optional): run timeout in seconds; 0 means no timeout
- graceSec (number, optional): additional grace before adapter gives up after Job deadline
+1 -16
View File
@@ -133,22 +133,7 @@ export function getConfigSchema(): AdapterConfigSchema {
label: "Labels",
hint: "Extra labels added to Job metadata. One key=value per line.",
},
// Output filtering (RTK-compatible)
{
type: "toggle",
key: "enableRtk",
label: "Enable Output Filtering",
hint: "Truncate oversized tool outputs before they reach the model, reducing token consumption. Implemented natively in Node.js — no external binary required. Installs a PostToolUse hook in ~/.claude/settings.json for each run.",
default: false,
},
{
type: "number",
key: "rtkMaxOutputBytes",
label: "Max Tool Output Bytes",
hint: "Maximum bytes of tool output to pass to the model when output filtering is enabled. Outputs exceeding this threshold are truncated with a summary. Default: 50000.",
default: 50000,
},
];
return { fields };
}
}
File diff suppressed because it is too large Load Diff
+104 -383
View File
@@ -16,28 +16,12 @@ import {
isClaudeMaxTurnsResult,
isClaudeUnknownSessionError,
} from "./parse.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import { getSelfPodInfo, getBatchApi, getCoreApi } from "./k8s-client.js";
import { buildJobManifest, buildPodLogPath, sanitizeLabelValue } from "./job-manifest.js";
import type * as k8s from "@kubernetes/client-node";
import { Writable } from "node:stream";
const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires
// before force-returning, even if logApi.log has not yet resolved. Defensive
// against the K8s client library not propagating writable.destroy() into an
// abort of the underlying HTTP request.
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
// After the log stream exits (container stopped producing output), wait this
// long for the K8s Job condition to be confirmed before treating the job as
// done. K8s Job conditions can lag pod exit by several seconds or more under
// cluster load. Without this bound, waitForJobCompletion keeps polling while
// streamPodLogs keeps reconnecting — together they can hold execute() open for
// minutes, causing stale "running" status in the UI (FAR-23).
const LOG_EXIT_COMPLETION_GRACE_MS = 30_000;
// Module-level tracking of active Jobs for SIGTERM best-effort cleanup.
interface ActiveJobRef {
@@ -75,6 +59,92 @@ function ensureSigtermHandler(): void {
});
}
/**
* Tail a pod log file from the shared PVC, emitting complete lines via onLog.
* Uses adaptive polling: 250ms when the file is actively growing, backed off
* to 1000ms after 5 consecutive polls with no growth.
*/
interface TailOptions {
onLog: AdapterExecutionContext["onLog"];
stopSignal: { stopped: boolean };
}
async function tailPodLogFile(
filePath: string,
opts: TailOptions,
): Promise<string> {
const { onLog, stopSignal } = opts;
const accumulator: string[] = [];
let pendingLine = "";
let consecutiveIdlePolls = 0;
let pollInterval = 250;
// Wait up to 30s for the file to appear
const deadline = Date.now() + 30_000;
while (Date.now() < deadline) {
try {
await fs.stat(filePath);
break;
} catch {
if (stopSignal.stopped) throw new Error("Stop signal received before log file appeared");
await new Promise((resolve) => setTimeout(resolve, 250));
}
}
// Final check after the wait loop
let handle: fs.FileHandle;
try {
handle = await fs.open(filePath, "r");
} catch {
throw new Error(`Pod log file never appeared at ${filePath}`);
}
let offset = 0;
try {
while (!stopSignal.stopped) {
const stat = await fs.stat(filePath);
const size = stat.size;
if (size > offset) {
const buf = Buffer.alloc(size - offset);
const { bytesRead } = await handle.read(buf, 0, buf.length, offset);
offset += bytesRead;
consecutiveIdlePolls = 0;
pollInterval = 250;
const combined = pendingLine + buf.toString("utf-8", 0, bytesRead);
const lines = combined.split("\n");
pendingLine = lines.pop() ?? "";
for (const line of lines) {
accumulator.push(line);
await onLog("stdout", line + "\n");
}
} else {
consecutiveIdlePolls++;
if (consecutiveIdlePolls >= 5) pollInterval = 1000;
}
if (!stopSignal.stopped) await new Promise((resolve) => setTimeout(resolve, pollInterval));
}
// Final drain
if (offset < (await fs.stat(filePath)).size) {
const stat = await fs.stat(filePath);
const size = stat.size;
const buf = Buffer.alloc(size - offset);
const { bytesRead } = await handle.read(buf, 0, buf.length, offset);
const combined = pendingLine + buf.toString("utf-8", 0, bytesRead);
const lines = combined.split("\n");
pendingLine = lines.pop() ?? "";
for (const line of lines) {
accumulator.push(line);
await onLog("stdout", line + "\n");
}
}
} finally {
await handle.close();
}
return accumulator.join("\n");
}
/**
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
@@ -389,210 +459,6 @@ async function waitForPod(
throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`);
}
/**
* Stream pod logs once via follow. Returns accumulated stdout when the
* stream ends (container exit, API disconnect, or abort signal).
*/
export async function streamPodLogsOnce(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
sinceSeconds?: number,
dedup?: LogLineDedupFilter,
stopSignal?: { stopped: boolean },
activity?: { lastActiveAt: number },
): Promise<string> {
const logApi = getLogApi(kubeconfigPath);
const chunks: string[] = [];
const writable = new Writable({
write(chunk: Buffer, _encoding, callback) {
const text = chunk.toString("utf-8");
chunks.push(text);
// Refresh stream liveness on every chunk received from the container.
// This MUST happen here (not just after streamPodLogsOnce returns) —
// a streaming attempt that never disconnects can produce output for
// hours, and the grace timer in execute() will fire 30s after the
// FIRST disconnect even if a new long-running attempt is currently
// streaming, unless we keep this timestamp fresh per-chunk (FAR-107).
if (activity) activity.lastActiveAt = Date.now();
const emitted = dedup ? dedup.filter(text) : text;
if (!emitted) {
callback();
return;
}
// Forward raw stream-json lines unchanged. The Paperclip UI uses the
// adapter's ui-parser export (src/ui-parser.ts) to render structured
// transcript entries — pre-formatting here would strip that structure
// and produce flat plain text that looks nothing like claude_local.
void onLog("stdout", emitted).then(() => callback(), callback);
},
});
// When the job completion signal fires, destroy the writable to abort the
// in-flight follow stream. Without this, logApi.log can hang indefinitely
// when the pod terminates without closing the HTTP connection cleanly.
let stopPoller: ReturnType<typeof setInterval> | null = null;
let bailTimer: ReturnType<typeof setTimeout> | null = null;
let bailResolve: (() => void) | null = null;
// Bail promise resolves LOG_STREAM_BAIL_TIMEOUT_MS after stopSignal fires,
// even if logApi.log has not resolved by then. This is a safety net for the
// case where writable.destroy() fails to propagate to an abort of the HTTP
// request (e.g. the K8s client is awaiting a response that never comes).
const bailPromise = new Promise<void>((resolve) => {
bailResolve = resolve;
});
if (stopSignal) {
stopPoller = setInterval(() => {
if (stopSignal.stopped) {
if (!writable.destroyed) writable.destroy();
if (!bailTimer && bailResolve) {
bailTimer = setTimeout(() => {
onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {});
bailResolve!();
}, LOG_STREAM_BAIL_TIMEOUT_MS);
}
}
}, 200);
}
const logPromise = logApi.log(namespace, podName, "claude", writable, {
follow: true,
pretty: false,
...(sinceSeconds ? { sinceSeconds } : {}),
}).catch(() => {
// follow may fail if the container already exited, the API connection
// dropped, or we aborted via writable.destroy() — not fatal.
});
try {
if (stopSignal) {
await Promise.race([logPromise, bailPromise]);
} else {
await logPromise;
}
} finally {
if (stopPoller) clearInterval(stopPoller);
if (bailTimer) clearTimeout(bailTimer);
}
return chunks.join("");
}
/**
* Stream pod logs with automatic reconnection. Keeps retrying the log
* stream until the stop signal fires (job completed) or the container
* exits normally. This handles silent K8s API connection drops that
* would otherwise cause the UI to stop receiving real output.
*
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*
* `activity` tracks stream liveness so execute()'s grace timer can
* distinguish a transient K8s log-API reconnect from a real container
* exit (FAR-107). Two signals:
* - `streamHasExited` becomes true on the first return from
* streamPodLogsOnce. Until then we are still in the warm-up window
* and waitForJobCompletion is the authoritative signal — grace must
* not fire.
* - `lastActiveAt` advances every time a streamPodLogsOnce attempt
* returns non-empty output (the container is still producing).
* The grace timer fires only once GRACE_MS have passed since the
* last chunk, so output that resumes after a transient drop keeps
* the run alive.
*/
async function streamPodLogs(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
dedup?: LogLineDedupFilter,
activity?: { lastActiveAt: number; streamHasExited: boolean },
): Promise<string> {
const allChunks: string[] = [];
let attempt = 0;
// Track the timestamp of the last successfully received log line so
// reconnects use a tight window instead of an ever-growing one anchored
// at stream start. This is the primary fix for FAR-105 duplicative logs.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Shared across reconnects so replayed lines inside the `sinceSeconds`
// overlap window are dropped before they reach the streaming UI (FAR-123).
if (!dedup) dedup = new LogLineDedupFilter();
while (!stopSignal?.stopped) {
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
break;
}
// On reconnect, ask for logs since the last received line (+5s buffer)
// instead of since stream start. This keeps the window tight and
// avoids ever-growing duplicate output.
const sinceSeconds = attempt > 0
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
: undefined;
if (attempt > 0) {
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal, activity);
if (activity) activity.streamHasExited = true;
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
// so any log lines in `result` were received up to this moment).
lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Refresh stream liveness so the grace timer in execute() does not
// fire while output is still flowing through reconnects (FAR-107).
if (activity) activity.lastActiveAt = Date.now();
} else if (attempt === 0) {
// First attempt returned nothing — update timestamp so reconnect
// window stays reasonable.
lastLogReceivedAt = preStreamTs;
}
attempt++;
// If the job is done or the container exited, no need to reconnect.
if (stopSignal?.stopped) break;
// Brief pause before reconnecting to avoid tight loops.
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
}
// Flush any buffered partial line so the final assistant/result chunk
// isn't dropped when the stream ends mid-line.
const tail = dedup.flush();
if (tail) await onLog("stdout", tail);
return allChunks.join("");
}
/**
* One-shot read of pod logs (no follow). Used as fallback when the
* follow stream missed output because the container exited quickly.
*/
async function readPodLogs(
namespace: string,
podName: string,
kubeconfigPath?: string,
): Promise<string> {
const coreApi = getCoreApi(kubeconfigPath);
try {
const log = await coreApi.readNamespacedPodLog({
name: podName,
namespace,
container: "claude",
});
return typeof log === "string" ? log : "";
} catch {
return "";
}
}
/**
* Wait for the Job to reach a terminal state (Complete or Failed).
* Returns the Job's final status. A 404 (job deleted by TTL or externally)
@@ -785,6 +651,7 @@ async function cleanupJob(
jobName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
podLogPath?: string,
): Promise<void> {
try {
const batchApi = getBatchApi(kubeconfigPath);
@@ -793,6 +660,9 @@ async function cleanupJob(
namespace,
body: { propagationPolicy: "Background" },
});
if (podLogPath) {
try { await fs.unlink(podLogPath); } catch { /* non-fatal */ }
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Warning: failed to cleanup job ${jobName}: ${msg}\n`);
@@ -846,6 +716,8 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let jobName!: string;
// eslint-disable-next-line prefer-const
let namespace!: string;
// eslint-disable-next-line prefer-const
let podLogPath!: string;
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
// runtimeSessionParams and currentSessionIdRaw are also used after the
// try block (in the result-parsing section) so hoist them here.
@@ -1018,6 +890,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
if (reattachTarget) {
jobName = reattachTarget.jobName;
namespace = reattachTarget.namespace;
podLogPath = buildPodLogPath(ctx.agent.companyId, ctx.agent.id, runId);
// Announce reattach metadata. Prompt and args aren't known here — they
// belong to the prior run that created this pod and are already present
@@ -1070,6 +943,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const claudeArgs = built.claudeArgs;
const promptMetrics = built.promptMetrics;
promptSecret = built.promptSecret;
podLogPath = built.podLogPath;
if (built.skippedLabels.length > 0) {
await onLog("stderr", `[paperclip] Warning: skipped ${built.skippedLabels.length} extra label(s) with reserved prefix: ${built.skippedLabels.join(", ")}\n`);
}
@@ -1187,6 +1061,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
_releaseMutex();
}
// eslint-disable-next-line @typescript-eslint/no-shadow
let stdout = "";
let exitCode: number | null = null;
let podTerminatedState: PodTerminatedState | null = null;
@@ -1269,12 +1144,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let keepaliveJobTerminal = false;
let consecutiveTerminalReadings = 0;
// Shared signal: when job completion resolves, tell the log streamer to
// stop reconnecting. Declared before keepaliveTimer so the cancel path
// inside the timer can set it without temporal dead zone issues.
// stop. Declared before keepaliveTimer so the cancel path inside the
// timer can set it without temporal dead zone issues.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
// Set when the run is externally cancelled (cancel-poll path).
let cancelled = false;
@@ -1357,163 +1229,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return onLog(stream, chunk);
};
// Track stream liveness so the grace timer below only fires when output
// has actually stopped — not on a transient K8s log-API reconnect that
// streamPodLogs heals on its own (FAR-107).
const streamActivity: { lastActiveAt: number; streamHasExited: boolean } = {
lastActiveAt: Date.now(),
streamHasExited: false,
};
const trackedLogStream = streamPodLogs(
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
streamActivity,
);
// completionWithGrace races waitForJobCompletion against a grace timer
// that fires LOG_EXIT_COMPLETION_GRACE_MS after the log stream exits.
// This bounds the stale-UI window when K8s Job conditions lag container
// exit (FAR-23): without it, waitForJobCompletion polls indefinitely
// while streamPodLogs reconnects, holding execute() open for minutes.
// logStopSignal.stopped is set on every settled path (fulfilled, rejected,
// or grace) so streamPodLogs stops reconnecting promptly.
type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean; gracePeriodFired?: boolean };
let gracePoller: ReturnType<typeof setInterval> | null = null;
const completionWithGrace = new Promise<CompletionResult>((resolve, reject) => {
let settled = false;
const settleOk = (r: CompletionResult) => {
if (settled) return;
settled = true;
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
logStopSignal.stopped = true;
resolve(r);
};
const settleErr = (err: unknown) => {
if (settled) return;
settled = true;
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
logStopSignal.stopped = true;
reject(err);
};
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr);
let graceCheckInFlight = false;
gracePoller = setInterval(() => {
// Only consider grace once the stream has exited at least once.
// Until then we are still in the warm-up window and
// waitForJobCompletion is the authoritative signal. Once the
// stream has exited, fire only after GRACE_MS of inactivity
// measured against the last received chunk — output that resumes
// through a reconnect resets the clock so transient drops do not
// truncate live runs (FAR-107).
if (graceCheckInFlight) return;
if (
streamActivity.streamHasExited &&
Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS
) {
graceCheckInFlight = true;
void (async () => {
try {
// Pod-phase gate (FAR-107): if the pod is still Running/Pending
// the container is alive — Claude can be silent for >30s during
// long tool calls (web fetches, slow upstream APIs). Refresh
// the stream-activity timer, leave the poller armed, and let
// waitForJobCompletion remain the authoritative signal. Only
// proceed with the grace settlement when the pod has actually
// reached a terminal phase or is gone.
const podLookup = await lookupPodState(namespace, jobName, kubeconfigPath);
if (!podLookup.podMissing && (podLookup.phase === "Running" || podLookup.phase === "Pending")) {
streamActivity.lastActiveAt = Date.now();
graceCheckInFlight = false;
return;
}
} catch (err) {
await onLog("stderr", `[paperclip] grace gate: pod state lookup failed (${err instanceof Error ? err.message : String(err)}) — falling through to Job-presence check\n`).catch(() => {});
}
// Pod is no longer Running — proceed with Job-presence verification.
// Stop the grace poller immediately so we don't double-fire while the
// verification read below is in flight.
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
// The log stream exiting only means the container stopped producing
// output — it does NOT prove the Job was deleted. Verify Job
// presence with a one-shot read so we can distinguish:
// (a) Job 404 → truly gone (TTL or external deletion)
// (b) Job still present → K8s condition propagation lag (FAR-23)
// Without this check we mis-classify (b) as "deleted externally" and
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
try {
await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace });
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
} catch (err: unknown) {
if (isK8s404(err)) {
jobGoneDetectionPath = "grace-period-verify-404";
jobGoneAt = Date.now();
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago and Job ${jobName} is gone (TTL or external deletion) — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, jobGone: true });
} else {
// K8s API hiccup — bail out without claiming external deletion.
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago; Job state unverifiable (${err instanceof Error ? err.message : String(err)}) — proceeding with captured output (FAR-23)\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
}
}
})();
}
}, 1_000);
});
const [logResult, completionResult] = await Promise.allSettled([
trackedLogStream,
completionWithGrace,
const [tailResult, completionResult] = await Promise.allSettled([
tailPodLogFile(podLogPath, { onLog: wrappedOnLog, stopSignal: logStopSignal }),
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(r => { logStopSignal.stopped = true; return r; }),
]);
// Stop the keepalive immediately once the job has reached a terminal
// state — do not wait for the finally block.
if (keepaliveTimer) {
clearInterval(keepaliveTimer);
keepaliveTimer = null;
}
// If the run was externally cancelled, return a clean cancelled result
// without processing stdout (the finally block still runs for cleanup).
if (cancelled) {
return {
exitCode: null,
signal: null,
timedOut: false,
errorCode: "cancelled",
errorMessage: "Run cancelled",
};
}
if (logResult.status === "fulfilled") {
stdout = logResult.value;
}
// One-shot log fallback: handles two failure modes with a single read.
// Mode 1 — empty stream: the follow stream returned nothing (fast exit before connection).
// Mode 2 — partial stream: we have some output but no result event (follow stream raced
// with container exit and captured only the init line before the connection dropped).
// A one-shot readPodLogs is more reliable for already-terminated containers and reads
// from the beginning of the log, giving us the full output.
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
// the authoritative parse happens once below after all fallbacks complete).
const hasResultEvent = stdout.split("\n").some((l) => { try { return JSON.parse(l).type === "result"; } catch { return false; } });
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
if (needsOneShot) {
if (!stdout.trim()) {
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
}
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
if (!stdout.trim() && oneShotLogs.trim()) {
stdout = oneShotLogs;
const deduped = logDedup.filter(stdout) + logDedup.flush();
if (deduped) await onLog("stdout", deduped);
} else if (oneShotLogs && oneShotLogs.length > stdout.length) {
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
const deduped = logDedup.filter(oneShotLogs) + logDedup.flush();
if (deduped) await onLog("stdout", deduped);
stdout = oneShotLogs;
}
}
const stdout = tailResult.status === "fulfilled" ? tailResult.value : "";
if (completionResult.status === "fulfilled") {
jobTimedOut = completionResult.value.timedOut;
@@ -1573,7 +1294,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
if (skipCleanup) {
await onLog("stdout", `[paperclip] Retaining job ${jobName} (state mismatch — UI is waiting on it)\n`);
} else if (!retainJobs) {
await cleanupJob(namespace, jobName, onLog, kubeconfigPath);
await cleanupJob(namespace, jobName, onLog, kubeconfigPath, podLogPath);
} else {
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
}
+55 -95
View File
@@ -1,6 +1,6 @@
import { describe, it, expect, beforeEach } from "vitest";
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
import { buildJobManifest, buildRtkSetupCommands, sanitizeLabelValue } from "./job-manifest.js";
import { buildJobManifest, buildPodLogPath, sanitizeLabelValue } from "./job-manifest.js";
import type { SelfPodInfo } from "./k8s-client.js";
function makeCtx(overrides: Partial<AdapterExecutionContext> = {}): AdapterExecutionContext {
@@ -221,11 +221,9 @@ describe("buildJobManifest", () => {
it("omits paperclip.io/run-id when sanitized value is null (all-invalid runId)", () => {
// inject an all-special-chars runId via context override — buildJobManifest
// uses ctx.runId directly
// uses ctx.runId directly. Use characters that are path-valid but label-invalid.
const badCtx = makeCtx({ runId: "@@@" });
const { job, skippedLabels } = buildJobManifest({ ctx: badCtx, selfPod });
expect(job.metadata?.labels?.["paperclip.io/run-id"]).toBeUndefined();
expect(skippedLabels).toContain("paperclip.io/run-id");
expect(() => buildJobManifest({ ctx: badCtx, selfPod })).toThrow("Invalid runId");
});
it("selector matches sanitized agent-id label", () => {
@@ -301,7 +299,10 @@ describe("buildJobManifest", () => {
it("write-prompt writes PROMPT_CONTENT to /tmp/prompt/prompt.txt", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const init = job.spec?.template?.spec?.initContainers?.[0];
expect(init?.command).toEqual(["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"]);
expect(init?.command?.[0]).toBe("sh");
expect(init?.command?.[1]).toBe("-c");
expect(init?.command?.[2]).toContain("mkdir -p /paperclip/instances/default/run-logs/");
expect(init?.command?.[2]).toContain("printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt");
});
it("write-prompt mounts prompt volume", () => {
@@ -794,112 +795,71 @@ describe("buildJobManifest", () => {
});
});
describe("rtk output filtering", () => {
describe("pod log file tailing", () => {
it("does not modify main command when enableRtk is false (default)", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const cmd = job.spec?.template?.spec?.containers[0]?.command;
// Command should be the plain `cat ... | claude ...` form with no rtk setup
expect(cmd?.[2]).toMatch(/^cat \/tmp\/prompt\/prompt\.txt \| claude /);
// Command should be the plain `cat ... | claude ... | tee ...` form with no rtk setup
expect(cmd?.[2]).toMatch(/^cat \/tmp\/prompt\/prompt\.txt \| claude .* \| tee /);
expect(cmd?.[2]).not.toContain("rtk-filter");
});
it("prepends RTK setup commands when enableRtk is true", () => {
ctx.config = { enableRtk: true };
const { job } = buildJobManifest({ ctx, selfPod });
const cmd = job.spec?.template?.spec?.containers[0]?.command;
expect(cmd?.[2]).toContain(".rtk-filter.js");
expect(cmd?.[2]).toContain("cat /tmp/prompt/prompt.txt | claude");
});
it("RTK setup runs before claude invocation", () => {
ctx.config = { enableRtk: true };
it("command includes tee to pod log path", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? "";
const rtkIdx = cmd.indexOf(".rtk-filter.js");
const claudeIdx = cmd.indexOf("cat /tmp/prompt/prompt.txt | claude");
expect(rtkIdx).toBeGreaterThanOrEqual(0);
expect(claudeIdx).toBeGreaterThan(rtkIdx);
expect(cmd).toContain("| tee");
expect(cmd).toContain("/paperclip/instances/default/run-logs/");
});
it("RTK setup uses node (no external binaries)", () => {
ctx.config = { enableRtk: true };
it("podLogPath is returned from buildJobManifest", () => {
const result = buildJobManifest({ ctx, selfPod });
expect(result.podLogPath).toBe(
"/paperclip/instances/default/run-logs/co1/agent-abc/run-abc12345.pod.ndjson",
);
});
it("buildPodLogPath returns correctly formatted path", () => {
expect(buildPodLogPath("co1", "agent-abc", "run-abc12345")).toBe(
"/paperclip/instances/default/run-logs/co1/agent-abc/run-abc12345.pod.ndjson",
);
});
it("init container creates log directory", () => {
const { job } = buildJobManifest({ ctx, selfPod });
const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? "";
// Should only use `node` — no curl, wget, apt, pip, etc.
expect(cmd).not.toMatch(/\b(curl|wget|apt|yum|pip|gem|cargo|go\s+get)\b/);
expect(cmd).toContain("node ");
const initCmd = job.spec?.template?.spec?.initContainers?.[0]?.command;
expect(initCmd?.[0]).toBe("sh");
expect(initCmd?.[1]).toBe("-c");
expect(initCmd?.[2]).toContain("mkdir -p /paperclip/instances/default/run-logs/");
});
it("uses default 50000 byte threshold when rtkMaxOutputBytes not set", () => {
ctx.config = { enableRtk: true };
const setup = buildRtkSetupCommands(50000);
// The filter script base64 should decode to contain the MAX constant
const b64Match = setup.match(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/);
expect(b64Match).not.toBeNull();
const decoded = Buffer.from(b64Match![1], "base64").toString("utf-8");
expect(decoded).toContain("50000");
it("sanitizes companyId with / to valid path component for log path", () => {
const badCtx = {
...ctx,
agent: { ...ctx.agent, companyId: "co/1" },
};
const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod });
// / is stripped by sanitizeForK8sPath
expect(podLogPath).toContain("co1/");
});
it("respects custom rtkMaxOutputBytes", () => {
ctx.config = { enableRtk: true, rtkMaxOutputBytes: 100000 };
const { job } = buildJobManifest({ ctx, selfPod });
const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? "";
// The custom threshold should appear in the base64-encoded filter script
const b64Matches = [...cmd.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
const decoded = b64Matches.map((m) => Buffer.from(m[1], "base64").toString("utf-8")).join("\n");
expect(decoded).toContain("100000");
it("sanitizes agentId with @ to valid path component for log path", () => {
const badCtx = {
...ctx,
agent: { ...ctx.agent, id: "agent@123" },
};
const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod });
// @ is stripped by sanitizeForK8sPath
expect(podLogPath).toContain("/agent123/");
});
it("RTK setup installs a PostToolUse hook in claude settings", () => {
const setup = buildRtkSetupCommands(50000);
// The settings script (second base64 block) should reference PostToolUse
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
expect(b64Matches.length).toBeGreaterThanOrEqual(2);
const settingsScript = Buffer.from(b64Matches[1]![1], "base64").toString("utf-8");
expect(settingsScript).toContain("PostToolUse");
expect(settingsScript).toContain("settings.json");
});
it("filter script handles string content truncation", () => {
// Decode the filter script and verify it truncates string content
const setup = buildRtkSetupCommands(1000);
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
expect(filterScript).toContain("MAX=1000");
expect(filterScript).toContain("truncated by paperclip-rtk");
expect(filterScript).toContain("tool_response");
expect(filterScript).toContain("tool_result");
});
it("filter script truncates without corrupting multi-byte UTF-8", () => {
// "中" is U+4E2D, 3 bytes in UTF-8: E4 B8 AD
// With MAX=5, two "中" (6 bytes) should truncate to one (3 bytes), not
// produce a replacement character from slicing mid-codepoint.
const setup = buildRtkSetupCommands(5);
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
// Extract the trunc function from the filter script and evaluate it
const fnMatch = filterScript.match(/(function trunc\(s\)\{.*\})(?=const tr=)/);
expect(fnMatch).toBeTruthy();
// eslint-disable-next-line no-eval
const trunc = eval(`(()=>{const MAX=5;${fnMatch![1]};return trunc;})()`);
const result = trunc("中中");
expect(result).not.toContain("");
expect(result).toContain("中");
expect(result).toContain("truncated by paperclip-rtk");
// Should report bytes from the actual truncation point, not MAX
expect(result).toContain("3 bytes truncated");
});
it("filter script handles array content (block format)", () => {
const setup = buildRtkSetupCommands(50000);
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
// Should handle array content blocks (text field on each block)
expect(filterScript).toContain("Array.isArray");
expect(filterScript).toContain("b.text");
it("sanitizes runId with underscore to valid path component for log path", () => {
const badCtx = {
...ctx,
runId: "run_123",
};
const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod });
// _ is stripped by sanitizeForK8sPath
expect(podLogPath).toContain("/run123.pod.ndjson");
});
});
});
+24 -89
View File
@@ -12,85 +12,18 @@ import {
import { createHash } from "node:crypto";
import type { ClaudePromptBundle } from "./prompt-cache.js";
/**
* Build the shell command prefix that installs a native Node.js PostToolUse
* hook into Claude Code's settings. The hook truncates oversized tool outputs
* before they reach the model — replacing the RTK binary init-container
* approach with a self-contained Node.js implementation.
*
* Both scripts are base64-encoded so they can be embedded in a sh -c command
* string without any quoting or escaping issues.
*
* @param maxOutputBytes Byte threshold above which tool output is truncated.
* @returns A shell command string (suitable for "&&"-chaining
* before the claude invocation).
*/
export function buildRtkSetupCommands(maxOutputBytes: number): string {
// --- Filter script ----------------------------------------------------------
// This script runs as the PostToolUse hook inside every K8s Job pod.
// Claude Code writes the hook event as JSON to the script's stdin; the script
// truncates the tool_response/tool_result content when it exceeds the
// threshold and writes the (possibly modified) JSON to stdout.
//
// Field-name coverage:
// • tool_response — documented hook event format for PostToolUse
// • tool_result — alternative name seen in some Claude Code versions
// Content may be a plain string or an array of typed blocks (text/image/…).
const filterScript = [
`const c=[];`,
`process.stdin.on('data',d=>c.push(d));`,
`process.stdin.on('end',()=>{`,
`const raw=Buffer.concat(c).toString('utf-8');`,
`let o;try{o=JSON.parse(raw);}catch{process.stdout.write(raw);return;}`,
`const MAX=${maxOutputBytes};`,
`function trunc(s){`,
`if(typeof s!=='string')return s;`,
`const b=Buffer.from(s,'utf-8');`,
`if(b.length<=MAX)return s;`,
`let e=MAX;if(e>0){let p=e-1;while(p>0&&(b[p]&0xC0)===0x80)p--;const l=b[p];let n=1;if((l&0xE0)===0xC0)n=2;else if((l&0xF0)===0xE0)n=3;else if((l&0xF8)===0xF0)n=4;if(p+n>e)e=p;}`,
`return b.slice(0,e).toString('utf-8')+'\\n[...'+(b.length-e)+' bytes truncated by paperclip-rtk]';`,
`}`,
`const tr=o&&(o.tool_response||o.tool_result);`,
`if(tr){`,
`if(typeof tr.content==='string'){tr.content=trunc(tr.content);}`,
`else if(Array.isArray(tr.content)){`,
`tr.content=tr.content.map(function(b){`,
`if(b&&typeof b==='object'&&typeof b.text==='string'){`,
`return Object.assign({},b,{text:trunc(b.text)});`,
`}return b;`,
`});`,
`}`,
`}`,
`process.stdout.write(JSON.stringify(o));`,
`});`,
].join("");
function assertSafePathComponent(field: string, value: string): void {
if (!/^[a-zA-Z0-9-]+$/.test(value)) {
throw new Error(`Invalid ${field} for log path: ${value}`);
}
}
// --- Settings script --------------------------------------------------------
// Reads the existing ~/.claude/settings.json (if any), merges in the RTK
// PostToolUse hook, and writes the file back. All other settings sections
// are preserved; only PostToolUse is replaced so we own the full hook list
// for this run.
const settingsScript = [
`const fs=require('fs'),pt=require('path');`,
`const p=pt.join(process.env.HOME,'.claude','settings.json');`,
`let s={};try{s=JSON.parse(fs.readFileSync(p,'utf-8'));}catch(e){}`,
`s.hooks=s.hooks||{};`,
`s.hooks.PostToolUse=[{matcher:'.*',hooks:[{type:'command',command:'node /tmp/.rtk-filter.js'}]}];`,
`fs.mkdirSync(pt.dirname(p),{recursive:true});`,
`fs.writeFileSync(p,JSON.stringify(s));`,
].join("");
function sanitizeForK8sPath(value: string): string {
return value.replace(/[^a-zA-Z0-9-]/g, "");
}
// Encode as base64 so the strings can be embedded directly in a shell command
// without any quoting concerns (base64 alphabet: A-Za-z0-9+/=).
const filterB64 = Buffer.from(filterScript, "utf-8").toString("base64");
const settingsB64 = Buffer.from(settingsScript, "utf-8").toString("base64");
return [
// Write the filter script
`node -e "require('fs').writeFileSync('/tmp/.rtk-filter.js',Buffer.from('${filterB64}','base64').toString('utf-8'))"`,
// Install the Claude Code PostToolUse hook (merge into existing settings)
`node -e "eval(Buffer.from('${settingsB64}','base64').toString('utf-8'))"`,
].join(" && ");
export function buildPodLogPath(companyId: string, agentId: string, runId: string): string {
return `/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`;
}
/** Prompts above this size (bytes) are staged via a Secret instead of an
@@ -202,6 +135,8 @@ export interface JobBuildResult {
promptSecret: PromptSecret | null;
/** User-supplied extra labels that were dropped because they used a reserved prefix. */
skippedLabels: string[];
/** Path to the pod log file on the shared PVC. */
podLogPath: string;
}
function sanitizeForK8sName(value: string, maxLen = 16): string {
@@ -353,8 +288,6 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
const extraLabels = parseKeyValueConfig(config.labels);
const enableRtk = asBoolean(config.enableRtk, false);
const rtkMaxOutputBytes = asNumber(config.rtkMaxOutputBytes, 50000);
// Resolve working directory — use workspace cwd, fall back to /paperclip
const workspaceContext = parseObject(context.paperclipWorkspace);
@@ -535,13 +468,15 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
// Build the claude command string for the main container
const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
const claudeInvocation = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
// When RTK output filtering is enabled, prepend the Node.js hook setup.
// This writes a filter script and a Claude Code settings file that installs
// it as a PostToolUse hook — no external binary or init container required.
const mainCommand = enableRtk
? `${buildRtkSetupCommands(rtkMaxOutputBytes)} && ${claudeInvocation}`
: claudeInvocation;
const logPathCompanyId = sanitizeForK8sPath(agent.companyId);
const logPathAgentId = sanitizeForK8sPath(agent.id);
const logPathRunId = sanitizeForK8sPath(runId);
assertSafePathComponent("companyId", logPathCompanyId);
assertSafePathComponent("agentId", logPathAgentId);
assertSafePathComponent("runId", logPathRunId);
const podLogPath = buildPodLogPath(logPathCompanyId, logPathAgentId, logPathRunId);
const claudeInvocation = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped} | tee ${podLogPath}`;
const mainCommand = claudeInvocation;
// Decide prompt delivery strategy: env var (small) or Secret volume (large).
const promptBytes = Buffer.byteLength(prompt, "utf-8");
@@ -569,7 +504,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
name: "write-prompt",
image: "busybox:1.36",
imagePullPolicy: "IfNotPresent",
command: ["sh", "-c", "cp /tmp/prompt-secret/prompt.txt /tmp/prompt/prompt.txt"],
command: ["sh", "-c", `mkdir -p /paperclip/instances/default/run-logs/${agent.companyId}/${agent.id} && cp /tmp/prompt-secret/prompt.txt /tmp/prompt/prompt.txt`],
volumeMounts: [
{ name: "prompt", mountPath: "/tmp/prompt" },
{ name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true },
@@ -584,7 +519,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
name: "write-prompt",
image: "busybox:1.36",
imagePullPolicy: "IfNotPresent",
command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
command: ["sh", "-c", `mkdir -p /paperclip/instances/default/run-logs/${agent.companyId}/${agent.id} && printf '%s' "$PROMPT_CONTENT" > /tmp/prompt/prompt.txt`],
env: [{ name: "PROMPT_CONTENT", value: prompt }],
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
securityContext,
@@ -641,5 +576,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
},
};
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret, skippedLabels };
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret, skippedLabels, podLogPath };
}
-173
View File
@@ -1,173 +0,0 @@
import { describe, it, expect } from "vitest";
import { LogLineDedupFilter, eventDedupKey } from "./log-dedup.js";
function assistantEvent(id: string, text: string): string {
return JSON.stringify({
type: "assistant",
session_id: "sess_1",
message: {
id,
content: [{ type: "text", text }],
},
});
}
function userToolResultEvent(toolUseId: string, content: string): string {
return JSON.stringify({
type: "user",
session_id: "sess_1",
message: {
content: [{ type: "tool_result", tool_use_id: toolUseId, content }],
},
});
}
function systemInitEvent(sessionId: string): string {
return JSON.stringify({
type: "system",
subtype: "init",
session_id: sessionId,
model: "claude-opus-4-7",
});
}
function resultEvent(sessionId: string): string {
return JSON.stringify({
type: "result",
subtype: "success",
session_id: sessionId,
result: "done",
total_cost_usd: 0.01,
usage: { input_tokens: 1, output_tokens: 1, cache_read_input_tokens: 0 },
});
}
describe("eventDedupKey", () => {
it("keys assistant events by message.id", () => {
const key = eventDedupKey(JSON.parse(assistantEvent("msg_abc", "hi")));
expect(key).toBe("assistant:msg_abc");
});
it("keys user tool_result events by tool_use_id", () => {
const key = eventDedupKey(JSON.parse(userToolResultEvent("toolu_1", "ok")));
expect(key).toBe("user:tool_result:toolu_1");
});
it("keys system init events by session_id", () => {
const key = eventDedupKey(JSON.parse(systemInitEvent("sess_xyz")));
expect(key).toBe("system:init:sess_xyz");
});
it("keys result events by session_id", () => {
const key = eventDedupKey(JSON.parse(resultEvent("sess_xyz")));
expect(key).toBe("result:sess_xyz");
});
it("returns null for assistant events missing message.id", () => {
const event = { type: "assistant", message: { content: [] } };
expect(eventDedupKey(event)).toBeNull();
});
it("returns null for unknown event types", () => {
expect(eventDedupKey({ type: "unknown" })).toBeNull();
expect(eventDedupKey({})).toBeNull();
});
});
describe("LogLineDedupFilter", () => {
it("passes unique lines through unchanged", () => {
const filter = new LogLineDedupFilter();
const a = assistantEvent("msg_1", "hello");
const b = assistantEvent("msg_2", "world");
expect(filter.filter(`${a}\n${b}\n`)).toBe(`${a}\n${b}\n`);
});
it("drops assistant events replayed with the same message.id", () => {
const filter = new LogLineDedupFilter();
const a = assistantEvent("msg_1", "Three nits to fix.");
filter.filter(`${a}\n`);
expect(filter.filter(`${a}\n`)).toBe("");
});
it("drops user tool_result events replayed with the same tool_use_id", () => {
const filter = new LogLineDedupFilter();
const a = userToolResultEvent("toolu_abc", "file contents");
filter.filter(`${a}\n`);
expect(filter.filter(`${a}\n`)).toBe("");
});
it("drops system init and result events on replay", () => {
const filter = new LogLineDedupFilter();
const init = systemInitEvent("sess_1");
const result = resultEvent("sess_1");
filter.filter(`${init}\n${result}\n`);
expect(filter.filter(`${init}\n${result}\n`)).toBe("");
});
it("buffers incomplete trailing lines across chunks", () => {
const filter = new LogLineDedupFilter();
const line = assistantEvent("msg_1", "hello");
const mid = Math.floor(line.length / 2);
const out1 = filter.filter(line.slice(0, mid));
const out2 = filter.filter(line.slice(mid) + "\n");
expect(out1).toBe("");
expect(out2).toBe(`${line}\n`);
});
it("flush() emits a final incomplete line that was not replayed", () => {
const filter = new LogLineDedupFilter();
const line = assistantEvent("msg_tail", "no newline");
filter.filter(line);
expect(filter.flush()).toBe(line);
});
it("flush() drops an incomplete line that was already seen with a newline", () => {
const filter = new LogLineDedupFilter();
const line = assistantEvent("msg_same", "x");
filter.filter(`${line}\n`);
filter.filter(line);
expect(filter.flush()).toBe("");
});
it("passes non-JSON lines through every time (does not dedup paperclip status)", () => {
const filter = new LogLineDedupFilter();
const status = "[paperclip] keepalive — job foo running\n";
expect(filter.filter(status)).toBe(status);
expect(filter.filter(status)).toBe(status);
});
it("dedups structurally identical JSON with identical content (raw fallback)", () => {
const filter = new LogLineDedupFilter();
// No recognized type → raw fallback key.
const line = JSON.stringify({ foo: "bar", baz: 1 });
filter.filter(`${line}\n`);
expect(filter.filter(`${line}\n`)).toBe("");
});
it("handles multiple complete lines in a single chunk with partial trailing", () => {
const filter = new LogLineDedupFilter();
const a = assistantEvent("msg_a", "a");
const b = assistantEvent("msg_b", "b");
const c = assistantEvent("msg_c", "c");
// a and b are complete, c is partial (no trailing newline).
const out = filter.filter(`${a}\n${b}\n${c}`);
expect(out).toBe(`${a}\n${b}\n`);
// Completing c later should emit exactly c.
expect(filter.filter("\n")).toBe(`${c}\n`);
});
it("drops the classic FAR-123 replay scenario across reconnects", () => {
const filter = new LogLineDedupFilter();
const assistantNits = assistantEvent("msg_nits", "Three nits to fix. Let me look at an existing test file...");
const assistantWrite = assistantEvent("msg_write", "Now I need to write unit tests");
// First stream attempt emits both events.
const out1 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
expect(out1).toBe(`${assistantNits}\n${assistantWrite}\n`);
// Reconnect replays both within the sinceSeconds overlap — filter should drop them.
const out2 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
expect(out2).toBe("");
// And a genuinely new event after the replay should still pass through.
const assistantFresh = assistantEvent("msg_fresh", "next turn");
expect(filter.filter(`${assistantFresh}\n`)).toBe(`${assistantFresh}\n`);
});
});
-146
View File
@@ -1,146 +0,0 @@
/**
* Line-level dedup filter for the K8s log stream.
*
* The K8s log follow stream can reconnect with an overlapping `sinceSeconds`
* window (integer-second granularity + a safety buffer), which replays a few
* seconds of recent output on every reconnect. Without dedup those replayed
* lines appear as duplicate events in the streaming UI — the same assistant
* text block shows up between every subsequent tool call (FAR-123).
*
* The filter operates at the chunk → line level: chunks are split on `\n`,
* incomplete trailing content is buffered until the next chunk, and each
* complete line is emitted at most once. JSON-shaped Claude stream-json
* events are keyed by their stable structural IDs; non-JSON lines pass
* through unchanged so genuinely-repeated status lines are not swallowed.
*/
type Parsed = Record<string, unknown>;
function asString(value: unknown): string {
return typeof value === "string" ? value : "";
}
function asRecord(value: unknown): Parsed | null {
if (typeof value !== "object" || value === null || Array.isArray(value)) return null;
return value as Parsed;
}
/**
* Build a stable dedup key for a Claude stream-json event. Returns `null`
* when the event is not a recognized Claude event — those lines fall back to
* raw-content hashing so non-JSON output (paperclip status lines, shell
* output) is never deduped by identity.
*/
export function eventDedupKey(event: Parsed): string | null {
const type = asString(event.type);
if (type === "system") {
const subtype = asString(event.subtype);
const sessionId = asString(event.session_id);
if (subtype === "init" && sessionId) return `system:init:${sessionId}`;
return null;
}
if (type === "assistant") {
const message = asRecord(event.message);
const id = message ? asString(message.id) : "";
if (id) return `assistant:${id}`;
return null;
}
if (type === "user") {
const message = asRecord(event.message);
const content = message && Array.isArray(message.content) ? message.content : [];
const toolUseIds: string[] = [];
for (const entry of content) {
const block = asRecord(entry);
if (!block) continue;
const toolUseId = asString(block.tool_use_id);
if (toolUseId) toolUseIds.push(toolUseId);
}
if (toolUseIds.length > 0) return `user:tool_result:${toolUseIds.join(",")}`;
return null;
}
if (type === "result") {
const sessionId = asString(event.session_id);
return sessionId ? `result:${sessionId}` : "result:unknown";
}
return null;
}
/**
* Stateful line-level dedup filter. Emits `filter(chunk)` output through
* the caller — preserves original chunk formatting (including trailing
* newlines) for lines that pass the dedup check.
*/
export class LogLineDedupFilter {
private buffer = "";
private readonly seenKeys = new Set<string>();
/**
* Process a chunk and return the subset that should be forwarded.
* Incomplete trailing content (no terminating newline) is buffered and
* emitted on the next chunk that completes the line (or on flush()).
*/
filter(chunk: string): string {
if (!chunk) return "";
const combined = this.buffer + chunk;
const endsWithNewline = combined.endsWith("\n");
const parts = combined.split("\n");
if (endsWithNewline) {
// Discard the final empty element — last line was complete.
parts.pop();
this.buffer = "";
} else {
// Last element is an incomplete line — hold it for the next chunk.
this.buffer = parts.pop() ?? "";
}
const out: string[] = [];
for (const line of parts) {
if (this.shouldEmit(line)) out.push(line);
}
if (out.length === 0) return "";
return out.join("\n") + "\n";
}
/**
* Flush any incomplete trailing content. Called when the stream ends
* without a terminating newline so the final partial line isn't lost.
*/
flush(): string {
const pending = this.buffer;
this.buffer = "";
if (!pending) return "";
return this.shouldEmit(pending) ? pending : "";
}
private shouldEmit(line: string): boolean {
const trimmed = line.trim();
if (!trimmed) return true;
// Only attempt dedup on JSON-shaped lines; pass shell/text output through.
if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true;
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch {
return true;
}
const event = asRecord(parsed);
if (!event) return true;
// Recognized Claude stream-json event → structural key.
const structuralKey = eventDedupKey(event);
const key = structuralKey ?? `raw:${trimmed}`;
if (this.seenKeys.has(key)) return false;
this.seenKeys.add(key);
return true;
}
}