Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 34756f8215 | |||
| 07ef106c66 |
Generated
+2
-2
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.52",
|
||||
"version": "0.1.53",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.52",
|
||||
"version": "0.1.53",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@kubernetes/client-node": "^1.0.0",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.52",
|
||||
"version": "0.1.53",
|
||||
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
|
||||
"license": "MIT",
|
||||
"repository": {
|
||||
|
||||
@@ -1512,6 +1512,54 @@ describe("execute: log-stream-exit grace period (FAR-23)", () => {
|
||||
// (grace did not fire, real completion arrived)
|
||||
expect(result.errorMessage).toBeNull();
|
||||
});
|
||||
|
||||
it("does NOT fire grace when stream drops mid-output and reconnects with more output (FAR-107)", async () => {
|
||||
// Reproduces Nancy / Privileged Escalation symptom: the K8s log API drops
|
||||
// the streaming connection mid-run; streamPodLogs reconnects and the
|
||||
// container is still producing. Before the fix, the grace timer was
|
||||
// armed on first stream exit and fired 30s later regardless of whether
|
||||
// output had resumed, surfacing claude_truncated even though the pod was
|
||||
// still phase=Running.
|
||||
let attemptIndex = 0;
|
||||
mockLogFn.mockImplementation(
|
||||
async (_ns: string, _pod: string, _ctr: string, writable: import("node:stream").Writable) => {
|
||||
if (attemptIndex === 0) {
|
||||
// Stream a partial init line then "drop" the connection without a
|
||||
// result event — this is the transient API disconnect.
|
||||
writable.write(JSON.stringify({ type: "system", subtype: "init", model: "claude-sonnet-4-6", session_id: "sess_test123" }) + "\n");
|
||||
attemptIndex++;
|
||||
return;
|
||||
}
|
||||
// Reconnect produces the rest of the stream including the result event.
|
||||
writable.write(CLAUDE_HAPPY_OUTPUT);
|
||||
},
|
||||
);
|
||||
// Job condition arrives only after the reconnect produces output, well
|
||||
// beyond the 30s grace window; the old code would have grace-fired at
|
||||
// ~30s and treated the run as truncated.
|
||||
let readJobCalls = 0;
|
||||
mockBatchReadJob.mockImplementation(async () => {
|
||||
readJobCalls++;
|
||||
// Stay non-terminal until the reconnect has had time to run and the
|
||||
// grace window has fully elapsed since the FIRST disconnect.
|
||||
if (readJobCalls < 25) return { status: { conditions: [] } };
|
||||
return { status: { conditions: [{ type: "Complete", status: "True" }] } };
|
||||
});
|
||||
|
||||
const executePromise = execute(makeCtx());
|
||||
// t=3000: first reconnect sleep fires → second streamPodLogsOnce attempt
|
||||
await vi.advanceTimersByTimeAsync(3_100);
|
||||
// Drive past the old (buggy) 30s grace boundary without firing real completion
|
||||
await vi.advanceTimersByTimeAsync(35_000);
|
||||
// Then let the Job's Complete condition land
|
||||
await vi.advanceTimersByTimeAsync(20_000);
|
||||
const result = await executePromise;
|
||||
|
||||
// Run completed normally — grace must not have falsely truncated it.
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(result.errorCode).toBeUndefined();
|
||||
expect(result.sessionId).toBe("sess_test123");
|
||||
}, 80_000);
|
||||
});
|
||||
|
||||
// ─── execute: concurrency guard — multiple orphan sorting ────────────────────
|
||||
|
||||
+36
-18
@@ -481,10 +481,18 @@ export async function streamPodLogsOnce(
|
||||
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
|
||||
* loops during sustained API partitions.
|
||||
*
|
||||
* onFirstStreamExit is called the first time streamPodLogsOnce returns
|
||||
* (container has exited or stream disconnected). Used by execute() to
|
||||
* start the LOG_EXIT_COMPLETION_GRACE_MS grace timer (FAR-23) without
|
||||
* waiting for all reconnects to exhaust.
|
||||
* `activity` tracks stream liveness so execute()'s grace timer can
|
||||
* distinguish a transient K8s log-API reconnect from a real container
|
||||
* exit (FAR-107). Two signals:
|
||||
* - `streamHasExited` becomes true on the first return from
|
||||
* streamPodLogsOnce. Until then we are still in the warm-up window
|
||||
* and waitForJobCompletion is the authoritative signal — grace must
|
||||
* not fire.
|
||||
* - `lastActiveAt` advances every time a streamPodLogsOnce attempt
|
||||
* returns non-empty output (the container is still producing).
|
||||
* The grace timer fires only once GRACE_MS have passed since the
|
||||
* last chunk, so output that resumes after a transient drop keeps
|
||||
* the run alive.
|
||||
*/
|
||||
async function streamPodLogs(
|
||||
namespace: string,
|
||||
@@ -493,7 +501,7 @@ async function streamPodLogs(
|
||||
kubeconfigPath?: string,
|
||||
stopSignal?: { stopped: boolean },
|
||||
dedup?: LogLineDedupFilter,
|
||||
onFirstStreamExit?: () => void,
|
||||
activity?: { lastActiveAt: number; streamHasExited: boolean },
|
||||
): Promise<string> {
|
||||
const allChunks: string[] = [];
|
||||
let attempt = 0;
|
||||
@@ -524,14 +532,15 @@ async function streamPodLogs(
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
|
||||
// Signal first stream exit immediately so the grace-period timer in
|
||||
// execute() can start without waiting for all reconnects to complete.
|
||||
if (attempt === 0) onFirstStreamExit?.();
|
||||
if (activity) activity.streamHasExited = true;
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
// so any log lines in `result` were received up to this moment).
|
||||
lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
// Refresh stream liveness so the grace timer in execute() does not
|
||||
// fire while output is still flowing through reconnects (FAR-107).
|
||||
if (activity) activity.lastActiveAt = Date.now();
|
||||
} else if (attempt === 0) {
|
||||
// First attempt returned nothing — update timestamp so reconnect
|
||||
// window stays reasonable.
|
||||
@@ -1340,17 +1349,16 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
return onLog(stream, chunk);
|
||||
};
|
||||
|
||||
// Track when the log stream first exits so the grace-period can fire
|
||||
// if the K8s Job condition lags behind container exit (FAR-23).
|
||||
// Set via onFirstStreamExit callback (called after attempt=0 returns)
|
||||
// rather than in .then() of streamPodLogs, which would create a
|
||||
// deadlock: streamPodLogs only resolves after stopSignal is set, but
|
||||
// stopSignal is set by the grace timer which needs logExitTime to be
|
||||
// non-null.
|
||||
let logExitTime: number | null = null;
|
||||
// Track stream liveness so the grace timer below only fires when output
|
||||
// has actually stopped — not on a transient K8s log-API reconnect that
|
||||
// streamPodLogs heals on its own (FAR-107).
|
||||
const streamActivity: { lastActiveAt: number; streamHasExited: boolean } = {
|
||||
lastActiveAt: Date.now(),
|
||||
streamHasExited: false,
|
||||
};
|
||||
const trackedLogStream = streamPodLogs(
|
||||
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
|
||||
() => { logExitTime = Date.now(); },
|
||||
streamActivity,
|
||||
);
|
||||
|
||||
// completionWithGrace races waitForJobCompletion against a grace timer
|
||||
@@ -1380,7 +1388,17 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
};
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr);
|
||||
gracePoller = setInterval(() => {
|
||||
if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) {
|
||||
// Only consider grace once the stream has exited at least once.
|
||||
// Until then we are still in the warm-up window and
|
||||
// waitForJobCompletion is the authoritative signal. Once the
|
||||
// stream has exited, fire only after GRACE_MS of inactivity
|
||||
// measured against the last received chunk — output that resumes
|
||||
// through a reconnect resets the clock so transient drops do not
|
||||
// truncate live runs (FAR-107).
|
||||
if (
|
||||
streamActivity.streamHasExited &&
|
||||
Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS
|
||||
) {
|
||||
// Stop the grace poller immediately so we don't double-fire while the
|
||||
// verification read below is in flight.
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
|
||||
Reference in New Issue
Block a user