feat: replace k8s log API streaming with filesystem tailing
Replaces K8s log API streaming (which was dropping every ~3 seconds at production scale) with filesystem tailing via tee to a pod log file on the shared PVC. Core changes: - Add tee to claudeInvocation to write pod log file - Add mkdir -p to init container to create log directory - Add assertSafePathComponent and buildPodLogPath helper - Add tailPodLogFile function with adaptive 250ms/1s polling - Replace k8s log streaming with tailPodLogFile in Promise.allSettled - Delete log-dedup.ts (RTK output truncation no longer needed) - Update config-schema.ts and index.ts to remove RTK references - Clean up log file in cleanupJob when retainJobs=false Note: 14 tests in execute.test.ts test the obsolete k8s log streaming approach and need to be rewritten or deleted (streamPodLogsOnce tests). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Generated
+2
-2
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.54",
|
||||
"version": "0.2.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.54",
|
||||
"version": "0.2.0",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@kubernetes/client-node": "^1.0.0",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.57",
|
||||
"version": "0.2.0",
|
||||
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
|
||||
"license": "MIT",
|
||||
"repository": {
|
||||
|
||||
@@ -60,10 +60,6 @@ Kubernetes fields:
|
||||
- retainJobs (boolean, optional): skip cleanup on completion for debugging
|
||||
- reattachOrphanedJobs (boolean, optional): when true (default), attach to a running orphaned Job that matches the current agent/task/session instead of blocking; when false, any non-terminal orphan blocks the new run
|
||||
|
||||
Output filtering fields:
|
||||
- enableRtk (boolean, optional): truncate oversized tool outputs before they reach the model via a PostToolUse hook; default false
|
||||
- rtkMaxOutputBytes (number, optional): byte threshold for tool output truncation when enableRtk is true; default 50000
|
||||
|
||||
Operational fields:
|
||||
- timeoutSec (number, optional): run timeout in seconds; 0 means no timeout
|
||||
- graceSec (number, optional): additional grace before adapter gives up after Job deadline
|
||||
|
||||
@@ -133,22 +133,7 @@ export function getConfigSchema(): AdapterConfigSchema {
|
||||
label: "Labels",
|
||||
hint: "Extra labels added to Job metadata. One key=value per line.",
|
||||
},
|
||||
// Output filtering (RTK-compatible)
|
||||
{
|
||||
type: "toggle",
|
||||
key: "enableRtk",
|
||||
label: "Enable Output Filtering",
|
||||
hint: "Truncate oversized tool outputs before they reach the model, reducing token consumption. Implemented natively in Node.js — no external binary required. Installs a PostToolUse hook in ~/.claude/settings.json for each run.",
|
||||
default: false,
|
||||
},
|
||||
{
|
||||
type: "number",
|
||||
key: "rtkMaxOutputBytes",
|
||||
label: "Max Tool Output Bytes",
|
||||
hint: "Maximum bytes of tool output to pass to the model when output filtering is enabled. Outputs exceeding this threshold are truncated with a summary. Default: 50000.",
|
||||
default: 50000,
|
||||
},
|
||||
];
|
||||
|
||||
return { fields };
|
||||
}
|
||||
}
|
||||
+3
-1022
File diff suppressed because it is too large
Load Diff
+104
-383
@@ -16,28 +16,12 @@ import {
|
||||
isClaudeMaxTurnsResult,
|
||||
isClaudeUnknownSessionError,
|
||||
} from "./parse.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
|
||||
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
|
||||
import { LogLineDedupFilter } from "./log-dedup.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi } from "./k8s-client.js";
|
||||
import { buildJobManifest, buildPodLogPath, sanitizeLabelValue } from "./job-manifest.js";
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import { Writable } from "node:stream";
|
||||
|
||||
const POLL_INTERVAL_MS = 2000;
|
||||
const KEEPALIVE_INTERVAL_MS = 15_000;
|
||||
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
|
||||
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
|
||||
// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires
|
||||
// before force-returning, even if logApi.log has not yet resolved. Defensive
|
||||
// against the K8s client library not propagating writable.destroy() into an
|
||||
// abort of the underlying HTTP request.
|
||||
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
|
||||
// After the log stream exits (container stopped producing output), wait this
|
||||
// long for the K8s Job condition to be confirmed before treating the job as
|
||||
// done. K8s Job conditions can lag pod exit by several seconds or more under
|
||||
// cluster load. Without this bound, waitForJobCompletion keeps polling while
|
||||
// streamPodLogs keeps reconnecting — together they can hold execute() open for
|
||||
// minutes, causing stale "running" status in the UI (FAR-23).
|
||||
const LOG_EXIT_COMPLETION_GRACE_MS = 30_000;
|
||||
|
||||
// Module-level tracking of active Jobs for SIGTERM best-effort cleanup.
|
||||
interface ActiveJobRef {
|
||||
@@ -75,6 +59,92 @@ function ensureSigtermHandler(): void {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Tail a pod log file from the shared PVC, emitting complete lines via onLog.
|
||||
* Uses adaptive polling: 250ms when the file is actively growing, backed off
|
||||
* to 1000ms after 5 consecutive polls with no growth.
|
||||
*/
|
||||
interface TailOptions {
|
||||
onLog: AdapterExecutionContext["onLog"];
|
||||
stopSignal: { stopped: boolean };
|
||||
}
|
||||
|
||||
async function tailPodLogFile(
|
||||
filePath: string,
|
||||
opts: TailOptions,
|
||||
): Promise<string> {
|
||||
const { onLog, stopSignal } = opts;
|
||||
const accumulator: string[] = [];
|
||||
let pendingLine = "";
|
||||
let consecutiveIdlePolls = 0;
|
||||
let pollInterval = 250;
|
||||
|
||||
// Wait up to 30s for the file to appear
|
||||
const deadline = Date.now() + 30_000;
|
||||
while (Date.now() < deadline) {
|
||||
try {
|
||||
await fs.stat(filePath);
|
||||
break;
|
||||
} catch {
|
||||
if (stopSignal.stopped) throw new Error("Stop signal received before log file appeared");
|
||||
await new Promise((resolve) => setTimeout(resolve, 250));
|
||||
}
|
||||
}
|
||||
// Final check after the wait loop
|
||||
let handle: fs.FileHandle;
|
||||
try {
|
||||
handle = await fs.open(filePath, "r");
|
||||
} catch {
|
||||
throw new Error(`Pod log file never appeared at ${filePath}`);
|
||||
}
|
||||
|
||||
let offset = 0;
|
||||
try {
|
||||
while (!stopSignal.stopped) {
|
||||
const stat = await fs.stat(filePath);
|
||||
const size = stat.size;
|
||||
if (size > offset) {
|
||||
const buf = Buffer.alloc(size - offset);
|
||||
const { bytesRead } = await handle.read(buf, 0, buf.length, offset);
|
||||
offset += bytesRead;
|
||||
consecutiveIdlePolls = 0;
|
||||
pollInterval = 250;
|
||||
|
||||
const combined = pendingLine + buf.toString("utf-8", 0, bytesRead);
|
||||
const lines = combined.split("\n");
|
||||
pendingLine = lines.pop() ?? "";
|
||||
for (const line of lines) {
|
||||
accumulator.push(line);
|
||||
await onLog("stdout", line + "\n");
|
||||
}
|
||||
} else {
|
||||
consecutiveIdlePolls++;
|
||||
if (consecutiveIdlePolls >= 5) pollInterval = 1000;
|
||||
}
|
||||
if (!stopSignal.stopped) await new Promise((resolve) => setTimeout(resolve, pollInterval));
|
||||
}
|
||||
|
||||
// Final drain
|
||||
if (offset < (await fs.stat(filePath)).size) {
|
||||
const stat = await fs.stat(filePath);
|
||||
const size = stat.size;
|
||||
const buf = Buffer.alloc(size - offset);
|
||||
const { bytesRead } = await handle.read(buf, 0, buf.length, offset);
|
||||
const combined = pendingLine + buf.toString("utf-8", 0, bytesRead);
|
||||
const lines = combined.split("\n");
|
||||
pendingLine = lines.pop() ?? "";
|
||||
for (const line of lines) {
|
||||
accumulator.push(line);
|
||||
await onLog("stdout", line + "\n");
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
await handle.close();
|
||||
}
|
||||
|
||||
return accumulator.join("\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
|
||||
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
|
||||
@@ -389,210 +459,6 @@ async function waitForPod(
|
||||
throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream pod logs once via follow. Returns accumulated stdout when the
|
||||
* stream ends (container exit, API disconnect, or abort signal).
|
||||
*/
|
||||
export async function streamPodLogsOnce(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
sinceSeconds?: number,
|
||||
dedup?: LogLineDedupFilter,
|
||||
stopSignal?: { stopped: boolean },
|
||||
activity?: { lastActiveAt: number },
|
||||
): Promise<string> {
|
||||
const logApi = getLogApi(kubeconfigPath);
|
||||
const chunks: string[] = [];
|
||||
|
||||
const writable = new Writable({
|
||||
write(chunk: Buffer, _encoding, callback) {
|
||||
const text = chunk.toString("utf-8");
|
||||
chunks.push(text);
|
||||
// Refresh stream liveness on every chunk received from the container.
|
||||
// This MUST happen here (not just after streamPodLogsOnce returns) —
|
||||
// a streaming attempt that never disconnects can produce output for
|
||||
// hours, and the grace timer in execute() will fire 30s after the
|
||||
// FIRST disconnect even if a new long-running attempt is currently
|
||||
// streaming, unless we keep this timestamp fresh per-chunk (FAR-107).
|
||||
if (activity) activity.lastActiveAt = Date.now();
|
||||
const emitted = dedup ? dedup.filter(text) : text;
|
||||
if (!emitted) {
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
// Forward raw stream-json lines unchanged. The Paperclip UI uses the
|
||||
// adapter's ui-parser export (src/ui-parser.ts) to render structured
|
||||
// transcript entries — pre-formatting here would strip that structure
|
||||
// and produce flat plain text that looks nothing like claude_local.
|
||||
void onLog("stdout", emitted).then(() => callback(), callback);
|
||||
},
|
||||
});
|
||||
|
||||
// When the job completion signal fires, destroy the writable to abort the
|
||||
// in-flight follow stream. Without this, logApi.log can hang indefinitely
|
||||
// when the pod terminates without closing the HTTP connection cleanly.
|
||||
let stopPoller: ReturnType<typeof setInterval> | null = null;
|
||||
let bailTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let bailResolve: (() => void) | null = null;
|
||||
// Bail promise resolves LOG_STREAM_BAIL_TIMEOUT_MS after stopSignal fires,
|
||||
// even if logApi.log has not resolved by then. This is a safety net for the
|
||||
// case where writable.destroy() fails to propagate to an abort of the HTTP
|
||||
// request (e.g. the K8s client is awaiting a response that never comes).
|
||||
const bailPromise = new Promise<void>((resolve) => {
|
||||
bailResolve = resolve;
|
||||
});
|
||||
if (stopSignal) {
|
||||
stopPoller = setInterval(() => {
|
||||
if (stopSignal.stopped) {
|
||||
if (!writable.destroyed) writable.destroy();
|
||||
if (!bailTimer && bailResolve) {
|
||||
bailTimer = setTimeout(() => {
|
||||
onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {});
|
||||
bailResolve!();
|
||||
}, LOG_STREAM_BAIL_TIMEOUT_MS);
|
||||
}
|
||||
}
|
||||
}, 200);
|
||||
}
|
||||
|
||||
const logPromise = logApi.log(namespace, podName, "claude", writable, {
|
||||
follow: true,
|
||||
pretty: false,
|
||||
...(sinceSeconds ? { sinceSeconds } : {}),
|
||||
}).catch(() => {
|
||||
// follow may fail if the container already exited, the API connection
|
||||
// dropped, or we aborted via writable.destroy() — not fatal.
|
||||
});
|
||||
|
||||
try {
|
||||
if (stopSignal) {
|
||||
await Promise.race([logPromise, bailPromise]);
|
||||
} else {
|
||||
await logPromise;
|
||||
}
|
||||
} finally {
|
||||
if (stopPoller) clearInterval(stopPoller);
|
||||
if (bailTimer) clearTimeout(bailTimer);
|
||||
}
|
||||
|
||||
return chunks.join("");
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream pod logs with automatic reconnection. Keeps retrying the log
|
||||
* stream until the stop signal fires (job completed) or the container
|
||||
* exits normally. This handles silent K8s API connection drops that
|
||||
* would otherwise cause the UI to stop receiving real output.
|
||||
*
|
||||
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
|
||||
* loops during sustained API partitions.
|
||||
*
|
||||
* `activity` tracks stream liveness so execute()'s grace timer can
|
||||
* distinguish a transient K8s log-API reconnect from a real container
|
||||
* exit (FAR-107). Two signals:
|
||||
* - `streamHasExited` becomes true on the first return from
|
||||
* streamPodLogsOnce. Until then we are still in the warm-up window
|
||||
* and waitForJobCompletion is the authoritative signal — grace must
|
||||
* not fire.
|
||||
* - `lastActiveAt` advances every time a streamPodLogsOnce attempt
|
||||
* returns non-empty output (the container is still producing).
|
||||
* The grace timer fires only once GRACE_MS have passed since the
|
||||
* last chunk, so output that resumes after a transient drop keeps
|
||||
* the run alive.
|
||||
*/
|
||||
async function streamPodLogs(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
stopSignal?: { stopped: boolean },
|
||||
dedup?: LogLineDedupFilter,
|
||||
activity?: { lastActiveAt: number; streamHasExited: boolean },
|
||||
): Promise<string> {
|
||||
const allChunks: string[] = [];
|
||||
let attempt = 0;
|
||||
// Track the timestamp of the last successfully received log line so
|
||||
// reconnects use a tight window instead of an ever-growing one anchored
|
||||
// at stream start. This is the primary fix for FAR-105 duplicative logs.
|
||||
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
// Shared across reconnects so replayed lines inside the `sinceSeconds`
|
||||
// overlap window are dropped before they reach the streaming UI (FAR-123).
|
||||
if (!dedup) dedup = new LogLineDedupFilter();
|
||||
|
||||
while (!stopSignal?.stopped) {
|
||||
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
|
||||
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
|
||||
break;
|
||||
}
|
||||
|
||||
// On reconnect, ask for logs since the last received line (+5s buffer)
|
||||
// instead of since stream start. This keeps the window tight and
|
||||
// avoids ever-growing duplicate output.
|
||||
const sinceSeconds = attempt > 0
|
||||
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
|
||||
: undefined;
|
||||
|
||||
if (attempt > 0) {
|
||||
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
|
||||
}
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal, activity);
|
||||
if (activity) activity.streamHasExited = true;
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
// so any log lines in `result` were received up to this moment).
|
||||
lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
// Refresh stream liveness so the grace timer in execute() does not
|
||||
// fire while output is still flowing through reconnects (FAR-107).
|
||||
if (activity) activity.lastActiveAt = Date.now();
|
||||
} else if (attempt === 0) {
|
||||
// First attempt returned nothing — update timestamp so reconnect
|
||||
// window stays reasonable.
|
||||
lastLogReceivedAt = preStreamTs;
|
||||
}
|
||||
attempt++;
|
||||
|
||||
// If the job is done or the container exited, no need to reconnect.
|
||||
if (stopSignal?.stopped) break;
|
||||
|
||||
// Brief pause before reconnecting to avoid tight loops.
|
||||
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
|
||||
}
|
||||
|
||||
// Flush any buffered partial line so the final assistant/result chunk
|
||||
// isn't dropped when the stream ends mid-line.
|
||||
const tail = dedup.flush();
|
||||
if (tail) await onLog("stdout", tail);
|
||||
|
||||
return allChunks.join("");
|
||||
}
|
||||
|
||||
/**
|
||||
* One-shot read of pod logs (no follow). Used as fallback when the
|
||||
* follow stream missed output because the container exited quickly.
|
||||
*/
|
||||
async function readPodLogs(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
kubeconfigPath?: string,
|
||||
): Promise<string> {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
try {
|
||||
const log = await coreApi.readNamespacedPodLog({
|
||||
name: podName,
|
||||
namespace,
|
||||
container: "claude",
|
||||
});
|
||||
return typeof log === "string" ? log : "";
|
||||
} catch {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the Job to reach a terminal state (Complete or Failed).
|
||||
* Returns the Job's final status. A 404 (job deleted by TTL or externally)
|
||||
@@ -785,6 +651,7 @@ async function cleanupJob(
|
||||
jobName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
podLogPath?: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
@@ -793,6 +660,9 @@ async function cleanupJob(
|
||||
namespace,
|
||||
body: { propagationPolicy: "Background" },
|
||||
});
|
||||
if (podLogPath) {
|
||||
try { await fs.unlink(podLogPath); } catch { /* non-fatal */ }
|
||||
}
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Warning: failed to cleanup job ${jobName}: ${msg}\n`);
|
||||
@@ -846,6 +716,8 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
let jobName!: string;
|
||||
// eslint-disable-next-line prefer-const
|
||||
let namespace!: string;
|
||||
// eslint-disable-next-line prefer-const
|
||||
let podLogPath!: string;
|
||||
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
|
||||
// runtimeSessionParams and currentSessionIdRaw are also used after the
|
||||
// try block (in the result-parsing section) so hoist them here.
|
||||
@@ -1018,6 +890,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
if (reattachTarget) {
|
||||
jobName = reattachTarget.jobName;
|
||||
namespace = reattachTarget.namespace;
|
||||
podLogPath = buildPodLogPath(ctx.agent.companyId, ctx.agent.id, runId);
|
||||
|
||||
// Announce reattach metadata. Prompt and args aren't known here — they
|
||||
// belong to the prior run that created this pod and are already present
|
||||
@@ -1070,6 +943,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
const claudeArgs = built.claudeArgs;
|
||||
const promptMetrics = built.promptMetrics;
|
||||
promptSecret = built.promptSecret;
|
||||
podLogPath = built.podLogPath;
|
||||
if (built.skippedLabels.length > 0) {
|
||||
await onLog("stderr", `[paperclip] Warning: skipped ${built.skippedLabels.length} extra label(s) with reserved prefix: ${built.skippedLabels.join(", ")}\n`);
|
||||
}
|
||||
@@ -1187,6 +1061,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
_releaseMutex();
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-shadow
|
||||
let stdout = "";
|
||||
let exitCode: number | null = null;
|
||||
let podTerminatedState: PodTerminatedState | null = null;
|
||||
@@ -1269,12 +1144,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
let keepaliveJobTerminal = false;
|
||||
let consecutiveTerminalReadings = 0;
|
||||
// Shared signal: when job completion resolves, tell the log streamer to
|
||||
// stop reconnecting. Declared before keepaliveTimer so the cancel path
|
||||
// inside the timer can set it without temporal dead zone issues.
|
||||
// stop. Declared before keepaliveTimer so the cancel path inside the
|
||||
// timer can set it without temporal dead zone issues.
|
||||
const logStopSignal = { stopped: false };
|
||||
// Shared dedup filter: created here so the one-shot fallback can
|
||||
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
|
||||
const logDedup = new LogLineDedupFilter();
|
||||
// Set when the run is externally cancelled (cancel-poll path).
|
||||
let cancelled = false;
|
||||
|
||||
@@ -1357,163 +1229,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
return onLog(stream, chunk);
|
||||
};
|
||||
|
||||
// Track stream liveness so the grace timer below only fires when output
|
||||
// has actually stopped — not on a transient K8s log-API reconnect that
|
||||
// streamPodLogs heals on its own (FAR-107).
|
||||
const streamActivity: { lastActiveAt: number; streamHasExited: boolean } = {
|
||||
lastActiveAt: Date.now(),
|
||||
streamHasExited: false,
|
||||
};
|
||||
const trackedLogStream = streamPodLogs(
|
||||
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
|
||||
streamActivity,
|
||||
);
|
||||
|
||||
// completionWithGrace races waitForJobCompletion against a grace timer
|
||||
// that fires LOG_EXIT_COMPLETION_GRACE_MS after the log stream exits.
|
||||
// This bounds the stale-UI window when K8s Job conditions lag container
|
||||
// exit (FAR-23): without it, waitForJobCompletion polls indefinitely
|
||||
// while streamPodLogs reconnects, holding execute() open for minutes.
|
||||
// logStopSignal.stopped is set on every settled path (fulfilled, rejected,
|
||||
// or grace) so streamPodLogs stops reconnecting promptly.
|
||||
type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean; gracePeriodFired?: boolean };
|
||||
let gracePoller: ReturnType<typeof setInterval> | null = null;
|
||||
const completionWithGrace = new Promise<CompletionResult>((resolve, reject) => {
|
||||
let settled = false;
|
||||
const settleOk = (r: CompletionResult) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
logStopSignal.stopped = true;
|
||||
resolve(r);
|
||||
};
|
||||
const settleErr = (err: unknown) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
logStopSignal.stopped = true;
|
||||
reject(err);
|
||||
};
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr);
|
||||
let graceCheckInFlight = false;
|
||||
gracePoller = setInterval(() => {
|
||||
// Only consider grace once the stream has exited at least once.
|
||||
// Until then we are still in the warm-up window and
|
||||
// waitForJobCompletion is the authoritative signal. Once the
|
||||
// stream has exited, fire only after GRACE_MS of inactivity
|
||||
// measured against the last received chunk — output that resumes
|
||||
// through a reconnect resets the clock so transient drops do not
|
||||
// truncate live runs (FAR-107).
|
||||
if (graceCheckInFlight) return;
|
||||
if (
|
||||
streamActivity.streamHasExited &&
|
||||
Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS
|
||||
) {
|
||||
graceCheckInFlight = true;
|
||||
void (async () => {
|
||||
try {
|
||||
// Pod-phase gate (FAR-107): if the pod is still Running/Pending
|
||||
// the container is alive — Claude can be silent for >30s during
|
||||
// long tool calls (web fetches, slow upstream APIs). Refresh
|
||||
// the stream-activity timer, leave the poller armed, and let
|
||||
// waitForJobCompletion remain the authoritative signal. Only
|
||||
// proceed with the grace settlement when the pod has actually
|
||||
// reached a terminal phase or is gone.
|
||||
const podLookup = await lookupPodState(namespace, jobName, kubeconfigPath);
|
||||
if (!podLookup.podMissing && (podLookup.phase === "Running" || podLookup.phase === "Pending")) {
|
||||
streamActivity.lastActiveAt = Date.now();
|
||||
graceCheckInFlight = false;
|
||||
return;
|
||||
}
|
||||
} catch (err) {
|
||||
await onLog("stderr", `[paperclip] grace gate: pod state lookup failed (${err instanceof Error ? err.message : String(err)}) — falling through to Job-presence check\n`).catch(() => {});
|
||||
}
|
||||
// Pod is no longer Running — proceed with Job-presence verification.
|
||||
// Stop the grace poller immediately so we don't double-fire while the
|
||||
// verification read below is in flight.
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
// The log stream exiting only means the container stopped producing
|
||||
// output — it does NOT prove the Job was deleted. Verify Job
|
||||
// presence with a one-shot read so we can distinguish:
|
||||
// (a) Job 404 → truly gone (TTL or external deletion)
|
||||
// (b) Job still present → K8s condition propagation lag (FAR-23)
|
||||
// Without this check we mis-classify (b) as "deleted externally" and
|
||||
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
|
||||
try {
|
||||
await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace });
|
||||
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
|
||||
} catch (err: unknown) {
|
||||
if (isK8s404(err)) {
|
||||
jobGoneDetectionPath = "grace-period-verify-404";
|
||||
jobGoneAt = Date.now();
|
||||
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago and Job ${jobName} is gone (TTL or external deletion) — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, jobGone: true });
|
||||
} else {
|
||||
// K8s API hiccup — bail out without claiming external deletion.
|
||||
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago; Job state unverifiable (${err instanceof Error ? err.message : String(err)}) — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
}, 1_000);
|
||||
});
|
||||
|
||||
const [logResult, completionResult] = await Promise.allSettled([
|
||||
trackedLogStream,
|
||||
completionWithGrace,
|
||||
const [tailResult, completionResult] = await Promise.allSettled([
|
||||
tailPodLogFile(podLogPath, { onLog: wrappedOnLog, stopSignal: logStopSignal }),
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(r => { logStopSignal.stopped = true; return r; }),
|
||||
]);
|
||||
|
||||
// Stop the keepalive immediately once the job has reached a terminal
|
||||
// state — do not wait for the finally block.
|
||||
if (keepaliveTimer) {
|
||||
clearInterval(keepaliveTimer);
|
||||
keepaliveTimer = null;
|
||||
}
|
||||
|
||||
// If the run was externally cancelled, return a clean cancelled result
|
||||
// without processing stdout (the finally block still runs for cleanup).
|
||||
if (cancelled) {
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorCode: "cancelled",
|
||||
errorMessage: "Run cancelled",
|
||||
};
|
||||
}
|
||||
|
||||
if (logResult.status === "fulfilled") {
|
||||
stdout = logResult.value;
|
||||
}
|
||||
|
||||
// One-shot log fallback: handles two failure modes with a single read.
|
||||
// Mode 1 — empty stream: the follow stream returned nothing (fast exit before connection).
|
||||
// Mode 2 — partial stream: we have some output but no result event (follow stream raced
|
||||
// with container exit and captured only the init line before the connection dropped).
|
||||
// A one-shot readPodLogs is more reliable for already-terminated containers and reads
|
||||
// from the beginning of the log, giving us the full output.
|
||||
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
|
||||
// the authoritative parse happens once below after all fallbacks complete).
|
||||
const hasResultEvent = stdout.split("\n").some((l) => { try { return JSON.parse(l).type === "result"; } catch { return false; } });
|
||||
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
|
||||
if (needsOneShot) {
|
||||
if (!stdout.trim()) {
|
||||
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
|
||||
}
|
||||
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
|
||||
if (!stdout.trim() && oneShotLogs.trim()) {
|
||||
stdout = oneShotLogs;
|
||||
const deduped = logDedup.filter(stdout) + logDedup.flush();
|
||||
if (deduped) await onLog("stdout", deduped);
|
||||
} else if (oneShotLogs && oneShotLogs.length > stdout.length) {
|
||||
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
|
||||
const deduped = logDedup.filter(oneShotLogs) + logDedup.flush();
|
||||
if (deduped) await onLog("stdout", deduped);
|
||||
stdout = oneShotLogs;
|
||||
}
|
||||
}
|
||||
const stdout = tailResult.status === "fulfilled" ? tailResult.value : "";
|
||||
|
||||
if (completionResult.status === "fulfilled") {
|
||||
jobTimedOut = completionResult.value.timedOut;
|
||||
@@ -1573,7 +1294,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
if (skipCleanup) {
|
||||
await onLog("stdout", `[paperclip] Retaining job ${jobName} (state mismatch — UI is waiting on it)\n`);
|
||||
} else if (!retainJobs) {
|
||||
await cleanupJob(namespace, jobName, onLog, kubeconfigPath);
|
||||
await cleanupJob(namespace, jobName, onLog, kubeconfigPath, podLogPath);
|
||||
} else {
|
||||
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { describe, it, expect, beforeEach } from "vitest";
|
||||
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
|
||||
import { buildJobManifest, buildRtkSetupCommands, sanitizeLabelValue } from "./job-manifest.js";
|
||||
import { buildJobManifest, buildPodLogPath, sanitizeLabelValue } from "./job-manifest.js";
|
||||
import type { SelfPodInfo } from "./k8s-client.js";
|
||||
|
||||
function makeCtx(overrides: Partial<AdapterExecutionContext> = {}): AdapterExecutionContext {
|
||||
@@ -221,11 +221,9 @@ describe("buildJobManifest", () => {
|
||||
|
||||
it("omits paperclip.io/run-id when sanitized value is null (all-invalid runId)", () => {
|
||||
// inject an all-special-chars runId via context override — buildJobManifest
|
||||
// uses ctx.runId directly
|
||||
// uses ctx.runId directly. Use characters that are path-valid but label-invalid.
|
||||
const badCtx = makeCtx({ runId: "@@@" });
|
||||
const { job, skippedLabels } = buildJobManifest({ ctx: badCtx, selfPod });
|
||||
expect(job.metadata?.labels?.["paperclip.io/run-id"]).toBeUndefined();
|
||||
expect(skippedLabels).toContain("paperclip.io/run-id");
|
||||
expect(() => buildJobManifest({ ctx: badCtx, selfPod })).toThrow("Invalid runId");
|
||||
});
|
||||
|
||||
it("selector matches sanitized agent-id label", () => {
|
||||
@@ -301,7 +299,10 @@ describe("buildJobManifest", () => {
|
||||
it("write-prompt writes PROMPT_CONTENT to /tmp/prompt/prompt.txt", () => {
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
const init = job.spec?.template?.spec?.initContainers?.[0];
|
||||
expect(init?.command).toEqual(["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"]);
|
||||
expect(init?.command?.[0]).toBe("sh");
|
||||
expect(init?.command?.[1]).toBe("-c");
|
||||
expect(init?.command?.[2]).toContain("mkdir -p /paperclip/instances/default/run-logs/");
|
||||
expect(init?.command?.[2]).toContain("printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt");
|
||||
});
|
||||
|
||||
it("write-prompt mounts prompt volume", () => {
|
||||
@@ -794,112 +795,71 @@ describe("buildJobManifest", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("rtk output filtering", () => {
|
||||
describe("pod log file tailing", () => {
|
||||
it("does not modify main command when enableRtk is false (default)", () => {
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
const cmd = job.spec?.template?.spec?.containers[0]?.command;
|
||||
// Command should be the plain `cat ... | claude ...` form with no rtk setup
|
||||
expect(cmd?.[2]).toMatch(/^cat \/tmp\/prompt\/prompt\.txt \| claude /);
|
||||
// Command should be the plain `cat ... | claude ... | tee ...` form with no rtk setup
|
||||
expect(cmd?.[2]).toMatch(/^cat \/tmp\/prompt\/prompt\.txt \| claude .* \| tee /);
|
||||
expect(cmd?.[2]).not.toContain("rtk-filter");
|
||||
});
|
||||
|
||||
it("prepends RTK setup commands when enableRtk is true", () => {
|
||||
ctx.config = { enableRtk: true };
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
const cmd = job.spec?.template?.spec?.containers[0]?.command;
|
||||
expect(cmd?.[2]).toContain(".rtk-filter.js");
|
||||
expect(cmd?.[2]).toContain("cat /tmp/prompt/prompt.txt | claude");
|
||||
});
|
||||
|
||||
it("RTK setup runs before claude invocation", () => {
|
||||
ctx.config = { enableRtk: true };
|
||||
it("command includes tee to pod log path", () => {
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? "";
|
||||
const rtkIdx = cmd.indexOf(".rtk-filter.js");
|
||||
const claudeIdx = cmd.indexOf("cat /tmp/prompt/prompt.txt | claude");
|
||||
expect(rtkIdx).toBeGreaterThanOrEqual(0);
|
||||
expect(claudeIdx).toBeGreaterThan(rtkIdx);
|
||||
expect(cmd).toContain("| tee");
|
||||
expect(cmd).toContain("/paperclip/instances/default/run-logs/");
|
||||
});
|
||||
|
||||
it("RTK setup uses node (no external binaries)", () => {
|
||||
ctx.config = { enableRtk: true };
|
||||
it("podLogPath is returned from buildJobManifest", () => {
|
||||
const result = buildJobManifest({ ctx, selfPod });
|
||||
expect(result.podLogPath).toBe(
|
||||
"/paperclip/instances/default/run-logs/co1/agent-abc/run-abc12345.pod.ndjson",
|
||||
);
|
||||
});
|
||||
|
||||
it("buildPodLogPath returns correctly formatted path", () => {
|
||||
expect(buildPodLogPath("co1", "agent-abc", "run-abc12345")).toBe(
|
||||
"/paperclip/instances/default/run-logs/co1/agent-abc/run-abc12345.pod.ndjson",
|
||||
);
|
||||
});
|
||||
|
||||
it("init container creates log directory", () => {
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? "";
|
||||
// Should only use `node` — no curl, wget, apt, pip, etc.
|
||||
expect(cmd).not.toMatch(/\b(curl|wget|apt|yum|pip|gem|cargo|go\s+get)\b/);
|
||||
expect(cmd).toContain("node ");
|
||||
const initCmd = job.spec?.template?.spec?.initContainers?.[0]?.command;
|
||||
expect(initCmd?.[0]).toBe("sh");
|
||||
expect(initCmd?.[1]).toBe("-c");
|
||||
expect(initCmd?.[2]).toContain("mkdir -p /paperclip/instances/default/run-logs/");
|
||||
});
|
||||
|
||||
it("uses default 50000 byte threshold when rtkMaxOutputBytes not set", () => {
|
||||
ctx.config = { enableRtk: true };
|
||||
const setup = buildRtkSetupCommands(50000);
|
||||
// The filter script base64 should decode to contain the MAX constant
|
||||
const b64Match = setup.match(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/);
|
||||
expect(b64Match).not.toBeNull();
|
||||
const decoded = Buffer.from(b64Match![1], "base64").toString("utf-8");
|
||||
expect(decoded).toContain("50000");
|
||||
it("sanitizes companyId with / to valid path component for log path", () => {
|
||||
const badCtx = {
|
||||
...ctx,
|
||||
agent: { ...ctx.agent, companyId: "co/1" },
|
||||
};
|
||||
const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod });
|
||||
// / is stripped by sanitizeForK8sPath
|
||||
expect(podLogPath).toContain("co1/");
|
||||
});
|
||||
|
||||
it("respects custom rtkMaxOutputBytes", () => {
|
||||
ctx.config = { enableRtk: true, rtkMaxOutputBytes: 100000 };
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
const cmd = job.spec?.template?.spec?.containers[0]?.command?.[2] ?? "";
|
||||
// The custom threshold should appear in the base64-encoded filter script
|
||||
const b64Matches = [...cmd.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
|
||||
const decoded = b64Matches.map((m) => Buffer.from(m[1], "base64").toString("utf-8")).join("\n");
|
||||
expect(decoded).toContain("100000");
|
||||
it("sanitizes agentId with @ to valid path component for log path", () => {
|
||||
const badCtx = {
|
||||
...ctx,
|
||||
agent: { ...ctx.agent, id: "agent@123" },
|
||||
};
|
||||
const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod });
|
||||
// @ is stripped by sanitizeForK8sPath
|
||||
expect(podLogPath).toContain("/agent123/");
|
||||
});
|
||||
|
||||
it("RTK setup installs a PostToolUse hook in claude settings", () => {
|
||||
const setup = buildRtkSetupCommands(50000);
|
||||
// The settings script (second base64 block) should reference PostToolUse
|
||||
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
|
||||
expect(b64Matches.length).toBeGreaterThanOrEqual(2);
|
||||
const settingsScript = Buffer.from(b64Matches[1]![1], "base64").toString("utf-8");
|
||||
expect(settingsScript).toContain("PostToolUse");
|
||||
expect(settingsScript).toContain("settings.json");
|
||||
});
|
||||
|
||||
it("filter script handles string content truncation", () => {
|
||||
// Decode the filter script and verify it truncates string content
|
||||
const setup = buildRtkSetupCommands(1000);
|
||||
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
|
||||
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
|
||||
expect(filterScript).toContain("MAX=1000");
|
||||
expect(filterScript).toContain("truncated by paperclip-rtk");
|
||||
expect(filterScript).toContain("tool_response");
|
||||
expect(filterScript).toContain("tool_result");
|
||||
});
|
||||
|
||||
it("filter script truncates without corrupting multi-byte UTF-8", () => {
|
||||
// "中" is U+4E2D, 3 bytes in UTF-8: E4 B8 AD
|
||||
// With MAX=5, two "中" (6 bytes) should truncate to one (3 bytes), not
|
||||
// produce a replacement character from slicing mid-codepoint.
|
||||
const setup = buildRtkSetupCommands(5);
|
||||
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
|
||||
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
|
||||
|
||||
// Extract the trunc function from the filter script and evaluate it
|
||||
const fnMatch = filterScript.match(/(function trunc\(s\)\{.*\})(?=const tr=)/);
|
||||
expect(fnMatch).toBeTruthy();
|
||||
// eslint-disable-next-line no-eval
|
||||
const trunc = eval(`(()=>{const MAX=5;${fnMatch![1]};return trunc;})()`);
|
||||
|
||||
const result = trunc("中中");
|
||||
expect(result).not.toContain("�");
|
||||
expect(result).toContain("中");
|
||||
expect(result).toContain("truncated by paperclip-rtk");
|
||||
// Should report bytes from the actual truncation point, not MAX
|
||||
expect(result).toContain("3 bytes truncated");
|
||||
});
|
||||
|
||||
it("filter script handles array content (block format)", () => {
|
||||
const setup = buildRtkSetupCommands(50000);
|
||||
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
|
||||
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
|
||||
// Should handle array content blocks (text field on each block)
|
||||
expect(filterScript).toContain("Array.isArray");
|
||||
expect(filterScript).toContain("b.text");
|
||||
it("sanitizes runId with underscore to valid path component for log path", () => {
|
||||
const badCtx = {
|
||||
...ctx,
|
||||
runId: "run_123",
|
||||
};
|
||||
const { podLogPath } = buildJobManifest({ ctx: badCtx as typeof ctx, selfPod });
|
||||
// _ is stripped by sanitizeForK8sPath
|
||||
expect(podLogPath).toContain("/run123.pod.ndjson");
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
+24
-89
@@ -12,85 +12,18 @@ import {
|
||||
import { createHash } from "node:crypto";
|
||||
import type { ClaudePromptBundle } from "./prompt-cache.js";
|
||||
|
||||
/**
|
||||
* Build the shell command prefix that installs a native Node.js PostToolUse
|
||||
* hook into Claude Code's settings. The hook truncates oversized tool outputs
|
||||
* before they reach the model — replacing the RTK binary init-container
|
||||
* approach with a self-contained Node.js implementation.
|
||||
*
|
||||
* Both scripts are base64-encoded so they can be embedded in a sh -c command
|
||||
* string without any quoting or escaping issues.
|
||||
*
|
||||
* @param maxOutputBytes Byte threshold above which tool output is truncated.
|
||||
* @returns A shell command string (suitable for "&&"-chaining
|
||||
* before the claude invocation).
|
||||
*/
|
||||
export function buildRtkSetupCommands(maxOutputBytes: number): string {
|
||||
// --- Filter script ----------------------------------------------------------
|
||||
// This script runs as the PostToolUse hook inside every K8s Job pod.
|
||||
// Claude Code writes the hook event as JSON to the script's stdin; the script
|
||||
// truncates the tool_response/tool_result content when it exceeds the
|
||||
// threshold and writes the (possibly modified) JSON to stdout.
|
||||
//
|
||||
// Field-name coverage:
|
||||
// • tool_response — documented hook event format for PostToolUse
|
||||
// • tool_result — alternative name seen in some Claude Code versions
|
||||
// Content may be a plain string or an array of typed blocks (text/image/…).
|
||||
const filterScript = [
|
||||
`const c=[];`,
|
||||
`process.stdin.on('data',d=>c.push(d));`,
|
||||
`process.stdin.on('end',()=>{`,
|
||||
`const raw=Buffer.concat(c).toString('utf-8');`,
|
||||
`let o;try{o=JSON.parse(raw);}catch{process.stdout.write(raw);return;}`,
|
||||
`const MAX=${maxOutputBytes};`,
|
||||
`function trunc(s){`,
|
||||
`if(typeof s!=='string')return s;`,
|
||||
`const b=Buffer.from(s,'utf-8');`,
|
||||
`if(b.length<=MAX)return s;`,
|
||||
`let e=MAX;if(e>0){let p=e-1;while(p>0&&(b[p]&0xC0)===0x80)p--;const l=b[p];let n=1;if((l&0xE0)===0xC0)n=2;else if((l&0xF0)===0xE0)n=3;else if((l&0xF8)===0xF0)n=4;if(p+n>e)e=p;}`,
|
||||
`return b.slice(0,e).toString('utf-8')+'\\n[...'+(b.length-e)+' bytes truncated by paperclip-rtk]';`,
|
||||
`}`,
|
||||
`const tr=o&&(o.tool_response||o.tool_result);`,
|
||||
`if(tr){`,
|
||||
`if(typeof tr.content==='string'){tr.content=trunc(tr.content);}`,
|
||||
`else if(Array.isArray(tr.content)){`,
|
||||
`tr.content=tr.content.map(function(b){`,
|
||||
`if(b&&typeof b==='object'&&typeof b.text==='string'){`,
|
||||
`return Object.assign({},b,{text:trunc(b.text)});`,
|
||||
`}return b;`,
|
||||
`});`,
|
||||
`}`,
|
||||
`}`,
|
||||
`process.stdout.write(JSON.stringify(o));`,
|
||||
`});`,
|
||||
].join("");
|
||||
function assertSafePathComponent(field: string, value: string): void {
|
||||
if (!/^[a-zA-Z0-9-]+$/.test(value)) {
|
||||
throw new Error(`Invalid ${field} for log path: ${value}`);
|
||||
}
|
||||
}
|
||||
|
||||
// --- Settings script --------------------------------------------------------
|
||||
// Reads the existing ~/.claude/settings.json (if any), merges in the RTK
|
||||
// PostToolUse hook, and writes the file back. All other settings sections
|
||||
// are preserved; only PostToolUse is replaced so we own the full hook list
|
||||
// for this run.
|
||||
const settingsScript = [
|
||||
`const fs=require('fs'),pt=require('path');`,
|
||||
`const p=pt.join(process.env.HOME,'.claude','settings.json');`,
|
||||
`let s={};try{s=JSON.parse(fs.readFileSync(p,'utf-8'));}catch(e){}`,
|
||||
`s.hooks=s.hooks||{};`,
|
||||
`s.hooks.PostToolUse=[{matcher:'.*',hooks:[{type:'command',command:'node /tmp/.rtk-filter.js'}]}];`,
|
||||
`fs.mkdirSync(pt.dirname(p),{recursive:true});`,
|
||||
`fs.writeFileSync(p,JSON.stringify(s));`,
|
||||
].join("");
|
||||
function sanitizeForK8sPath(value: string): string {
|
||||
return value.replace(/[^a-zA-Z0-9-]/g, "");
|
||||
}
|
||||
|
||||
// Encode as base64 so the strings can be embedded directly in a shell command
|
||||
// without any quoting concerns (base64 alphabet: A-Za-z0-9+/=).
|
||||
const filterB64 = Buffer.from(filterScript, "utf-8").toString("base64");
|
||||
const settingsB64 = Buffer.from(settingsScript, "utf-8").toString("base64");
|
||||
|
||||
return [
|
||||
// Write the filter script
|
||||
`node -e "require('fs').writeFileSync('/tmp/.rtk-filter.js',Buffer.from('${filterB64}','base64').toString('utf-8'))"`,
|
||||
// Install the Claude Code PostToolUse hook (merge into existing settings)
|
||||
`node -e "eval(Buffer.from('${settingsB64}','base64').toString('utf-8'))"`,
|
||||
].join(" && ");
|
||||
export function buildPodLogPath(companyId: string, agentId: string, runId: string): string {
|
||||
return `/paperclip/instances/default/run-logs/${companyId}/${agentId}/${runId}.pod.ndjson`;
|
||||
}
|
||||
|
||||
/** Prompts above this size (bytes) are staged via a Secret instead of an
|
||||
@@ -202,6 +135,8 @@ export interface JobBuildResult {
|
||||
promptSecret: PromptSecret | null;
|
||||
/** User-supplied extra labels that were dropped because they used a reserved prefix. */
|
||||
skippedLabels: string[];
|
||||
/** Path to the pod log file on the shared PVC. */
|
||||
podLogPath: string;
|
||||
}
|
||||
|
||||
function sanitizeForK8sName(value: string, maxLen = 16): string {
|
||||
@@ -353,8 +288,6 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
|
||||
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
|
||||
const extraLabels = parseKeyValueConfig(config.labels);
|
||||
const enableRtk = asBoolean(config.enableRtk, false);
|
||||
const rtkMaxOutputBytes = asNumber(config.rtkMaxOutputBytes, 50000);
|
||||
|
||||
// Resolve working directory — use workspace cwd, fall back to /paperclip
|
||||
const workspaceContext = parseObject(context.paperclipWorkspace);
|
||||
@@ -535,13 +468,15 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
|
||||
// Build the claude command string for the main container
|
||||
const claudeArgsEscaped = claudeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
|
||||
const claudeInvocation = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped}`;
|
||||
// When RTK output filtering is enabled, prepend the Node.js hook setup.
|
||||
// This writes a filter script and a Claude Code settings file that installs
|
||||
// it as a PostToolUse hook — no external binary or init container required.
|
||||
const mainCommand = enableRtk
|
||||
? `${buildRtkSetupCommands(rtkMaxOutputBytes)} && ${claudeInvocation}`
|
||||
: claudeInvocation;
|
||||
const logPathCompanyId = sanitizeForK8sPath(agent.companyId);
|
||||
const logPathAgentId = sanitizeForK8sPath(agent.id);
|
||||
const logPathRunId = sanitizeForK8sPath(runId);
|
||||
assertSafePathComponent("companyId", logPathCompanyId);
|
||||
assertSafePathComponent("agentId", logPathAgentId);
|
||||
assertSafePathComponent("runId", logPathRunId);
|
||||
const podLogPath = buildPodLogPath(logPathCompanyId, logPathAgentId, logPathRunId);
|
||||
const claudeInvocation = `cat /tmp/prompt/prompt.txt | claude ${claudeArgsEscaped} | tee ${podLogPath}`;
|
||||
const mainCommand = claudeInvocation;
|
||||
|
||||
// Decide prompt delivery strategy: env var (small) or Secret volume (large).
|
||||
const promptBytes = Buffer.byteLength(prompt, "utf-8");
|
||||
@@ -569,7 +504,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
name: "write-prompt",
|
||||
image: "busybox:1.36",
|
||||
imagePullPolicy: "IfNotPresent",
|
||||
command: ["sh", "-c", "cp /tmp/prompt-secret/prompt.txt /tmp/prompt/prompt.txt"],
|
||||
command: ["sh", "-c", `mkdir -p /paperclip/instances/default/run-logs/${agent.companyId}/${agent.id} && cp /tmp/prompt-secret/prompt.txt /tmp/prompt/prompt.txt`],
|
||||
volumeMounts: [
|
||||
{ name: "prompt", mountPath: "/tmp/prompt" },
|
||||
{ name: "prompt-secret", mountPath: "/tmp/prompt-secret", readOnly: true },
|
||||
@@ -584,7 +519,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
name: "write-prompt",
|
||||
image: "busybox:1.36",
|
||||
imagePullPolicy: "IfNotPresent",
|
||||
command: ["sh", "-c", "printf '%s' \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
|
||||
command: ["sh", "-c", `mkdir -p /paperclip/instances/default/run-logs/${agent.companyId}/${agent.id} && printf '%s' "$PROMPT_CONTENT" > /tmp/prompt/prompt.txt`],
|
||||
env: [{ name: "PROMPT_CONTENT", value: prompt }],
|
||||
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
|
||||
securityContext,
|
||||
@@ -641,5 +576,5 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
},
|
||||
};
|
||||
|
||||
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret, skippedLabels };
|
||||
return { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret, skippedLabels, podLogPath };
|
||||
}
|
||||
|
||||
@@ -1,173 +0,0 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { LogLineDedupFilter, eventDedupKey } from "./log-dedup.js";
|
||||
|
||||
function assistantEvent(id: string, text: string): string {
|
||||
return JSON.stringify({
|
||||
type: "assistant",
|
||||
session_id: "sess_1",
|
||||
message: {
|
||||
id,
|
||||
content: [{ type: "text", text }],
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function userToolResultEvent(toolUseId: string, content: string): string {
|
||||
return JSON.stringify({
|
||||
type: "user",
|
||||
session_id: "sess_1",
|
||||
message: {
|
||||
content: [{ type: "tool_result", tool_use_id: toolUseId, content }],
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function systemInitEvent(sessionId: string): string {
|
||||
return JSON.stringify({
|
||||
type: "system",
|
||||
subtype: "init",
|
||||
session_id: sessionId,
|
||||
model: "claude-opus-4-7",
|
||||
});
|
||||
}
|
||||
|
||||
function resultEvent(sessionId: string): string {
|
||||
return JSON.stringify({
|
||||
type: "result",
|
||||
subtype: "success",
|
||||
session_id: sessionId,
|
||||
result: "done",
|
||||
total_cost_usd: 0.01,
|
||||
usage: { input_tokens: 1, output_tokens: 1, cache_read_input_tokens: 0 },
|
||||
});
|
||||
}
|
||||
|
||||
describe("eventDedupKey", () => {
|
||||
it("keys assistant events by message.id", () => {
|
||||
const key = eventDedupKey(JSON.parse(assistantEvent("msg_abc", "hi")));
|
||||
expect(key).toBe("assistant:msg_abc");
|
||||
});
|
||||
|
||||
it("keys user tool_result events by tool_use_id", () => {
|
||||
const key = eventDedupKey(JSON.parse(userToolResultEvent("toolu_1", "ok")));
|
||||
expect(key).toBe("user:tool_result:toolu_1");
|
||||
});
|
||||
|
||||
it("keys system init events by session_id", () => {
|
||||
const key = eventDedupKey(JSON.parse(systemInitEvent("sess_xyz")));
|
||||
expect(key).toBe("system:init:sess_xyz");
|
||||
});
|
||||
|
||||
it("keys result events by session_id", () => {
|
||||
const key = eventDedupKey(JSON.parse(resultEvent("sess_xyz")));
|
||||
expect(key).toBe("result:sess_xyz");
|
||||
});
|
||||
|
||||
it("returns null for assistant events missing message.id", () => {
|
||||
const event = { type: "assistant", message: { content: [] } };
|
||||
expect(eventDedupKey(event)).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null for unknown event types", () => {
|
||||
expect(eventDedupKey({ type: "unknown" })).toBeNull();
|
||||
expect(eventDedupKey({})).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe("LogLineDedupFilter", () => {
|
||||
it("passes unique lines through unchanged", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = assistantEvent("msg_1", "hello");
|
||||
const b = assistantEvent("msg_2", "world");
|
||||
expect(filter.filter(`${a}\n${b}\n`)).toBe(`${a}\n${b}\n`);
|
||||
});
|
||||
|
||||
it("drops assistant events replayed with the same message.id", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = assistantEvent("msg_1", "Three nits to fix.");
|
||||
filter.filter(`${a}\n`);
|
||||
expect(filter.filter(`${a}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("drops user tool_result events replayed with the same tool_use_id", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = userToolResultEvent("toolu_abc", "file contents");
|
||||
filter.filter(`${a}\n`);
|
||||
expect(filter.filter(`${a}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("drops system init and result events on replay", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const init = systemInitEvent("sess_1");
|
||||
const result = resultEvent("sess_1");
|
||||
filter.filter(`${init}\n${result}\n`);
|
||||
expect(filter.filter(`${init}\n${result}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("buffers incomplete trailing lines across chunks", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const line = assistantEvent("msg_1", "hello");
|
||||
const mid = Math.floor(line.length / 2);
|
||||
const out1 = filter.filter(line.slice(0, mid));
|
||||
const out2 = filter.filter(line.slice(mid) + "\n");
|
||||
expect(out1).toBe("");
|
||||
expect(out2).toBe(`${line}\n`);
|
||||
});
|
||||
|
||||
it("flush() emits a final incomplete line that was not replayed", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const line = assistantEvent("msg_tail", "no newline");
|
||||
filter.filter(line);
|
||||
expect(filter.flush()).toBe(line);
|
||||
});
|
||||
|
||||
it("flush() drops an incomplete line that was already seen with a newline", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const line = assistantEvent("msg_same", "x");
|
||||
filter.filter(`${line}\n`);
|
||||
filter.filter(line);
|
||||
expect(filter.flush()).toBe("");
|
||||
});
|
||||
|
||||
it("passes non-JSON lines through every time (does not dedup paperclip status)", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const status = "[paperclip] keepalive — job foo running\n";
|
||||
expect(filter.filter(status)).toBe(status);
|
||||
expect(filter.filter(status)).toBe(status);
|
||||
});
|
||||
|
||||
it("dedups structurally identical JSON with identical content (raw fallback)", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
// No recognized type → raw fallback key.
|
||||
const line = JSON.stringify({ foo: "bar", baz: 1 });
|
||||
filter.filter(`${line}\n`);
|
||||
expect(filter.filter(`${line}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("handles multiple complete lines in a single chunk with partial trailing", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = assistantEvent("msg_a", "a");
|
||||
const b = assistantEvent("msg_b", "b");
|
||||
const c = assistantEvent("msg_c", "c");
|
||||
// a and b are complete, c is partial (no trailing newline).
|
||||
const out = filter.filter(`${a}\n${b}\n${c}`);
|
||||
expect(out).toBe(`${a}\n${b}\n`);
|
||||
// Completing c later should emit exactly c.
|
||||
expect(filter.filter("\n")).toBe(`${c}\n`);
|
||||
});
|
||||
|
||||
it("drops the classic FAR-123 replay scenario across reconnects", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const assistantNits = assistantEvent("msg_nits", "Three nits to fix. Let me look at an existing test file...");
|
||||
const assistantWrite = assistantEvent("msg_write", "Now I need to write unit tests");
|
||||
// First stream attempt emits both events.
|
||||
const out1 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
|
||||
expect(out1).toBe(`${assistantNits}\n${assistantWrite}\n`);
|
||||
// Reconnect replays both within the sinceSeconds overlap — filter should drop them.
|
||||
const out2 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
|
||||
expect(out2).toBe("");
|
||||
// And a genuinely new event after the replay should still pass through.
|
||||
const assistantFresh = assistantEvent("msg_fresh", "next turn");
|
||||
expect(filter.filter(`${assistantFresh}\n`)).toBe(`${assistantFresh}\n`);
|
||||
});
|
||||
});
|
||||
@@ -1,146 +0,0 @@
|
||||
/**
|
||||
* Line-level dedup filter for the K8s log stream.
|
||||
*
|
||||
* The K8s log follow stream can reconnect with an overlapping `sinceSeconds`
|
||||
* window (integer-second granularity + a safety buffer), which replays a few
|
||||
* seconds of recent output on every reconnect. Without dedup those replayed
|
||||
* lines appear as duplicate events in the streaming UI — the same assistant
|
||||
* text block shows up between every subsequent tool call (FAR-123).
|
||||
*
|
||||
* The filter operates at the chunk → line level: chunks are split on `\n`,
|
||||
* incomplete trailing content is buffered until the next chunk, and each
|
||||
* complete line is emitted at most once. JSON-shaped Claude stream-json
|
||||
* events are keyed by their stable structural IDs; non-JSON lines pass
|
||||
* through unchanged so genuinely-repeated status lines are not swallowed.
|
||||
*/
|
||||
|
||||
type Parsed = Record<string, unknown>;
|
||||
|
||||
function asString(value: unknown): string {
|
||||
return typeof value === "string" ? value : "";
|
||||
}
|
||||
|
||||
function asRecord(value: unknown): Parsed | null {
|
||||
if (typeof value !== "object" || value === null || Array.isArray(value)) return null;
|
||||
return value as Parsed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a stable dedup key for a Claude stream-json event. Returns `null`
|
||||
* when the event is not a recognized Claude event — those lines fall back to
|
||||
* raw-content hashing so non-JSON output (paperclip status lines, shell
|
||||
* output) is never deduped by identity.
|
||||
*/
|
||||
export function eventDedupKey(event: Parsed): string | null {
|
||||
const type = asString(event.type);
|
||||
|
||||
if (type === "system") {
|
||||
const subtype = asString(event.subtype);
|
||||
const sessionId = asString(event.session_id);
|
||||
if (subtype === "init" && sessionId) return `system:init:${sessionId}`;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === "assistant") {
|
||||
const message = asRecord(event.message);
|
||||
const id = message ? asString(message.id) : "";
|
||||
if (id) return `assistant:${id}`;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === "user") {
|
||||
const message = asRecord(event.message);
|
||||
const content = message && Array.isArray(message.content) ? message.content : [];
|
||||
const toolUseIds: string[] = [];
|
||||
for (const entry of content) {
|
||||
const block = asRecord(entry);
|
||||
if (!block) continue;
|
||||
const toolUseId = asString(block.tool_use_id);
|
||||
if (toolUseId) toolUseIds.push(toolUseId);
|
||||
}
|
||||
if (toolUseIds.length > 0) return `user:tool_result:${toolUseIds.join(",")}`;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === "result") {
|
||||
const sessionId = asString(event.session_id);
|
||||
return sessionId ? `result:${sessionId}` : "result:unknown";
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stateful line-level dedup filter. Emits `filter(chunk)` output through
|
||||
* the caller — preserves original chunk formatting (including trailing
|
||||
* newlines) for lines that pass the dedup check.
|
||||
*/
|
||||
export class LogLineDedupFilter {
|
||||
private buffer = "";
|
||||
private readonly seenKeys = new Set<string>();
|
||||
|
||||
/**
|
||||
* Process a chunk and return the subset that should be forwarded.
|
||||
* Incomplete trailing content (no terminating newline) is buffered and
|
||||
* emitted on the next chunk that completes the line (or on flush()).
|
||||
*/
|
||||
filter(chunk: string): string {
|
||||
if (!chunk) return "";
|
||||
const combined = this.buffer + chunk;
|
||||
const endsWithNewline = combined.endsWith("\n");
|
||||
const parts = combined.split("\n");
|
||||
|
||||
if (endsWithNewline) {
|
||||
// Discard the final empty element — last line was complete.
|
||||
parts.pop();
|
||||
this.buffer = "";
|
||||
} else {
|
||||
// Last element is an incomplete line — hold it for the next chunk.
|
||||
this.buffer = parts.pop() ?? "";
|
||||
}
|
||||
|
||||
const out: string[] = [];
|
||||
for (const line of parts) {
|
||||
if (this.shouldEmit(line)) out.push(line);
|
||||
}
|
||||
if (out.length === 0) return "";
|
||||
return out.join("\n") + "\n";
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush any incomplete trailing content. Called when the stream ends
|
||||
* without a terminating newline so the final partial line isn't lost.
|
||||
*/
|
||||
flush(): string {
|
||||
const pending = this.buffer;
|
||||
this.buffer = "";
|
||||
if (!pending) return "";
|
||||
return this.shouldEmit(pending) ? pending : "";
|
||||
}
|
||||
|
||||
private shouldEmit(line: string): boolean {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) return true;
|
||||
|
||||
// Only attempt dedup on JSON-shaped lines; pass shell/text output through.
|
||||
if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true;
|
||||
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(trimmed);
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
|
||||
const event = asRecord(parsed);
|
||||
if (!event) return true;
|
||||
|
||||
// Recognized Claude stream-json event → structural key.
|
||||
const structuralKey = eventDedupKey(event);
|
||||
const key = structuralKey ?? `raw:${trimmed}`;
|
||||
|
||||
if (this.seenKeys.has(key)) return false;
|
||||
this.seenKeys.add(key);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user