Compare commits

...

4 Commits

Author SHA1 Message Date
Chris Farhood fc3866924a 0.1.54 2026-04-27 00:38:06 +00:00
Chris Farhood 368254d75d fix: per-chunk activity tracking + pod-phase gate on grace timer (FAR-107)
The 0.1.53 fix tracked stream liveness by updating lastActiveAt only
after streamPodLogsOnce returned.  That worked for the
disconnect-then-reconnect-then-disconnect case, but missed the
disconnect-then-long-running-reconnect case: a streaming attempt that
runs for minutes without disconnecting never refreshes lastActiveAt,
so the grace timer fires 30s after the prior disconnect even though
the new attempt is currently producing output.  Nancy reproduced
exactly this on 0.1.53 — claude_truncated with pod phase=Running.

Two changes:

1. streamPodLogsOnce now accepts the activity ref and updates
   lastActiveAt inside its writable's write handler — every chunk
   delivered from the container refreshes the timer in real time,
   not just on stream return.

2. Before the grace timer settles, gate on pod phase: if the pod is
   still Running or Pending, the container is alive (Claude's
   long tool-use silences exceed 30s for slow upstream APIs).
   Refresh lastActiveAt, leave the poller armed, and let
   waitForJobCompletion remain the authoritative termination
   signal.  Only proceed with the grace settlement when the pod
   has actually reached a terminal phase or is gone.

The original FAR-23 fast-path (container exits, Job condition lags)
still works: when the container terminates, pod phase moves to
Succeeded/Failed and the gate falls through to the existing
Job-presence check.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-27 00:38:06 +00:00
Chris Farhood 34756f8215 0.1.53 2026-04-27 00:28:45 +00:00
Chris Farhood 07ef106c66 fix: gate grace timer on stream-output silence, not first disconnect (FAR-107)
The 30s grace timer that bounds K8s Job condition propagation lag was
armed by streamPodLogs's onFirstStreamExit callback the moment
streamPodLogsOnce returned for the first time.  A transient K8s log-API
disconnect mid-run also returns from streamPodLogsOnce — so the grace
timer fired 30s later regardless of whether streamPodLogs had already
reconnected and the container was still producing output.

Nancy / Privileged Escalation reproduced this on long Opus-4-6 runs:
the prod paperclip pod was stable, the cancel-poll guard was already
narrowed in 0.1.51, but every long run truncated with claude_truncated
+ "container terminated state not yet observable (pod phase=Running)"
because the run was being abandoned mid-output.

Replace the boolean onFirstStreamExit signal with a streamActivity ref
carrying lastActiveAt + streamHasExited.  streamPodLogs refreshes
lastActiveAt every time a streamPodLogsOnce attempt returns non-empty
output, so reconnects that resume real output keep the grace clock
reset.  The grace timer fires only once the stream has exited at least
once AND no chunk has arrived for the full grace window — which
preserves the original FAR-23 behaviour (container truly exited but
Job condition lags) while ending the false-truncation of healthy
streams.  Adds a regression test that asserts a stream drop + reconnect
+ deferred Job completion does not surface as truncated.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-27 00:28:44 +00:00
4 changed files with 127 additions and 32 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.52",
"version": "0.1.54",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.52",
"version": "0.1.54",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.52",
"version": "0.1.54",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+48
View File
@@ -1512,6 +1512,54 @@ describe("execute: log-stream-exit grace period (FAR-23)", () => {
// (grace did not fire, real completion arrived)
expect(result.errorMessage).toBeNull();
});
it("does NOT fire grace when stream drops mid-output and reconnects with more output (FAR-107)", async () => {
// Reproduces Nancy / Privileged Escalation symptom: the K8s log API drops
// the streaming connection mid-run; streamPodLogs reconnects and the
// container is still producing. Before the fix, the grace timer was
// armed on first stream exit and fired 30s later regardless of whether
// output had resumed, surfacing claude_truncated even though the pod was
// still phase=Running.
let attemptIndex = 0;
mockLogFn.mockImplementation(
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
if (attemptIndex === 0) {
// Stream a partial init line then "drop" the connection without a
// result event — this is the transient API disconnect.
writable.write(JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_test123" }) + "\n");
attemptIndex++;
return;
}
// Reconnect produces the rest of the stream including the result event.
writable.write(CLAUDE_HAPPY_OUTPUT);
},
);
// Job condition arrives only after the reconnect produces output, well
// beyond the 30s grace window; the old code would have grace-fired at
// ~30s and treated the run as truncated.
let readJobCalls = 0;
mockBatchReadJob.mockImplementation(async () => {
readJobCalls++;
// Stay non-terminal until the reconnect has had time to run and the
// grace window has fully elapsed since the FIRST disconnect.
if (readJobCalls < 25) return { status: { conditions: [] } };
return { status: { conditions: [{ type: "Complete", status: "True" }] } };
});
const executePromise = execute(makeCtx());
// t=3000: first reconnect sleep fires → second streamPodLogsOnce attempt
await vi.advanceTimersByTimeAsync(3_100);
// Drive past the old (buggy) 30s grace boundary without firing real completion
await vi.advanceTimersByTimeAsync(35_000);
// Then let the Job's Complete condition land
await vi.advanceTimersByTimeAsync(20_000);
const result = await executePromise;
// Run completed normally — grace must not have falsely truncated it.
expect(result.exitCode).toBe(0);
expect(result.errorCode).toBeUndefined();
expect(result.sessionId).toBe("sess_test123");
}, 80_000);
});
// ─── execute: concurrency guard — multiple orphan sorting ────────────────────
+76 -29
View File
@@ -401,6 +401,7 @@ export async function streamPodLogsOnce(
sinceSeconds?: number,
dedup?: LogLineDedupFilter,
stopSignal?: { stopped: boolean },
activity?: { lastActiveAt: number },
): Promise<string> {
const logApi = getLogApi(kubeconfigPath);
const chunks: string[] = [];
@@ -409,6 +410,13 @@ export async function streamPodLogsOnce(
write(chunk: Buffer, _encoding, callback) {
const text = chunk.toString("utf-8");
chunks.push(text);
// Refresh stream liveness on every chunk received from the container.
// This MUST happen here (not just after streamPodLogsOnce returns) —
// a streaming attempt that never disconnects can produce output for
// hours, and the grace timer in execute() will fire 30s after the
// FIRST disconnect even if a new long-running attempt is currently
// streaming, unless we keep this timestamp fresh per-chunk (FAR-107).
if (activity) activity.lastActiveAt = Date.now();
const emitted = dedup ? dedup.filter(text) : text;
if (!emitted) {
callback();
@@ -481,10 +489,18 @@ export async function streamPodLogsOnce(
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*
* onFirstStreamExit is called the first time streamPodLogsOnce returns
* (container has exited or stream disconnected). Used by execute() to
* start the LOG_EXIT_COMPLETION_GRACE_MS grace timer (FAR-23) without
* waiting for all reconnects to exhaust.
* `activity` tracks stream liveness so execute()'s grace timer can
* distinguish a transient K8s log-API reconnect from a real container
* exit (FAR-107). Two signals:
* - `streamHasExited` becomes true on the first return from
* streamPodLogsOnce. Until then we are still in the warm-up window
* and waitForJobCompletion is the authoritative signal — grace must
* not fire.
* - `lastActiveAt` advances every time a streamPodLogsOnce attempt
* returns non-empty output (the container is still producing).
* The grace timer fires only once GRACE_MS have passed since the
* last chunk, so output that resumes after a transient drop keeps
* the run alive.
*/
async function streamPodLogs(
namespace: string,
@@ -493,7 +509,7 @@ async function streamPodLogs(
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
dedup?: LogLineDedupFilter,
onFirstStreamExit?: () => void,
activity?: { lastActiveAt: number; streamHasExited: boolean },
): Promise<string> {
const allChunks: string[] = [];
let attempt = 0;
@@ -523,15 +539,16 @@ async function streamPodLogs(
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
// Signal first stream exit immediately so the grace-period timer in
// execute() can start without waiting for all reconnects to complete.
if (attempt === 0) onFirstStreamExit?.();
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal, activity);
if (activity) activity.streamHasExited = true;
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
// so any log lines in `result` were received up to this moment).
lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Refresh stream liveness so the grace timer in execute() does not
// fire while output is still flowing through reconnects (FAR-107).
if (activity) activity.lastActiveAt = Date.now();
} else if (attempt === 0) {
// First attempt returned nothing — update timestamp so reconnect
// window stays reasonable.
@@ -1340,17 +1357,16 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return onLog(stream, chunk);
};
// Track when the log stream first exits so the grace-period can fire
// if the K8s Job condition lags behind container exit (FAR-23).
// Set via onFirstStreamExit callback (called after attempt=0 returns)
// rather than in .then() of streamPodLogs, which would create a
// deadlock: streamPodLogs only resolves after stopSignal is set, but
// stopSignal is set by the grace timer which needs logExitTime to be
// non-null.
let logExitTime: number | null = null;
// Track stream liveness so the grace timer below only fires when output
// has actually stopped — not on a transient K8s log-API reconnect that
// streamPodLogs heals on its own (FAR-107).
const streamActivity: { lastActiveAt: number; streamHasExited: boolean } = {
lastActiveAt: Date.now(),
streamHasExited: false,
};
const trackedLogStream = streamPodLogs(
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
() => { logExitTime = Date.now(); },
streamActivity,
);
// completionWithGrace races waitForJobCompletion against a grace timer
@@ -1379,19 +1395,50 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
reject(err);
};
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr);
let graceCheckInFlight = false;
gracePoller = setInterval(() => {
if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) {
// Stop the grace poller immediately so we don't double-fire while the
// verification read below is in flight.
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
// The log stream exiting only means the container stopped producing
// output — it does NOT prove the Job was deleted. Verify Job
// presence with a one-shot read so we can distinguish:
// (a) Job 404 → truly gone (TTL or external deletion)
// (b) Job still present → K8s condition propagation lag (FAR-23)
// Without this check we mis-classify (b) as "deleted externally" and
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
// Only consider grace once the stream has exited at least once.
// Until then we are still in the warm-up window and
// waitForJobCompletion is the authoritative signal. Once the
// stream has exited, fire only after GRACE_MS of inactivity
// measured against the last received chunk — output that resumes
// through a reconnect resets the clock so transient drops do not
// truncate live runs (FAR-107).
if (graceCheckInFlight) return;
if (
streamActivity.streamHasExited &&
Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS
) {
graceCheckInFlight = true;
void (async () => {
try {
// Pod-phase gate (FAR-107): if the pod is still Running/Pending
// the container is alive — Claude can be silent for >30s during
// long tool calls (web fetches, slow upstream APIs). Refresh
// the stream-activity timer, leave the poller armed, and let
// waitForJobCompletion remain the authoritative signal. Only
// proceed with the grace settlement when the pod has actually
// reached a terminal phase or is gone.
const podLookup = await lookupPodState(namespace, jobName, kubeconfigPath);
if (!podLookup.podMissing && (podLookup.phase === "Running" || podLookup.phase === "Pending")) {
streamActivity.lastActiveAt = Date.now();
graceCheckInFlight = false;
return;
}
} catch (err) {
await onLog("stderr", `[paperclip] grace gate: pod state lookup failed (${err instanceof Error ? err.message : String(err)}) — falling through to Job-presence check\n`).catch(() => {});
}
// Pod is no longer Running — proceed with Job-presence verification.
// Stop the grace poller immediately so we don't double-fire while the
// verification read below is in flight.
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
// The log stream exiting only means the container stopped producing
// output — it does NOT prove the Job was deleted. Verify Job
// presence with a one-shot read so we can distinguish:
// (a) Job 404 → truly gone (TTL or external deletion)
// (b) Job still present → K8s condition propagation lag (FAR-23)
// Without this check we mis-classify (b) as "deleted externally" and
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
try {
await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace });
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {});