fix: per-chunk activity tracking + pod-phase gate on grace timer (FAR-107)
The 0.1.53 fix tracked stream liveness by updating lastActiveAt only after streamPodLogsOnce returned. That worked for the disconnect-then-reconnect-then-disconnect case, but missed the disconnect-then-long-running-reconnect case: a streaming attempt that runs for minutes without disconnecting never refreshes lastActiveAt, so the grace timer fires 30s after the prior disconnect even though the new attempt is currently producing output. Nancy reproduced exactly this on 0.1.53 — claude_truncated with pod phase=Running. Two changes: 1. streamPodLogsOnce now accepts the activity ref and updates lastActiveAt inside its writable's write handler — every chunk delivered from the container refreshes the timer in real time, not just on stream return. 2. Before the grace timer settles, gate on pod phase: if the pod is still Running or Pending, the container is alive (Claude's long tool-use silences exceed 30s for slow upstream APIs). Refresh lastActiveAt, leave the poller armed, and let waitForJobCompletion remain the authoritative termination signal. Only proceed with the grace settlement when the pod has actually reached a terminal phase or is gone. The original FAR-23 fast-path (container exits, Job condition lags) still works: when the container terminates, pod phase moves to Succeeded/Failed and the gate falls through to the existing Job-presence check. Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
+40
-11
@@ -401,6 +401,7 @@ export async function streamPodLogsOnce(
|
||||
sinceSeconds?: number,
|
||||
dedup?: LogLineDedupFilter,
|
||||
stopSignal?: { stopped: boolean },
|
||||
activity?: { lastActiveAt: number },
|
||||
): Promise<string> {
|
||||
const logApi = getLogApi(kubeconfigPath);
|
||||
const chunks: string[] = [];
|
||||
@@ -409,6 +410,13 @@ export async function streamPodLogsOnce(
|
||||
write(chunk: Buffer, _encoding, callback) {
|
||||
const text = chunk.toString("utf-8");
|
||||
chunks.push(text);
|
||||
// Refresh stream liveness on every chunk received from the container.
|
||||
// This MUST happen here (not just after streamPodLogsOnce returns) —
|
||||
// a streaming attempt that never disconnects can produce output for
|
||||
// hours, and the grace timer in execute() will fire 30s after the
|
||||
// FIRST disconnect even if a new long-running attempt is currently
|
||||
// streaming, unless we keep this timestamp fresh per-chunk (FAR-107).
|
||||
if (activity) activity.lastActiveAt = Date.now();
|
||||
const emitted = dedup ? dedup.filter(text) : text;
|
||||
if (!emitted) {
|
||||
callback();
|
||||
@@ -531,7 +539,7 @@ async function streamPodLogs(
|
||||
}
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal, activity);
|
||||
if (activity) activity.streamHasExited = true;
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
@@ -1387,6 +1395,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
reject(err);
|
||||
};
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr);
|
||||
let graceCheckInFlight = false;
|
||||
gracePoller = setInterval(() => {
|
||||
// Only consider grace once the stream has exited at least once.
|
||||
// Until then we are still in the warm-up window and
|
||||
@@ -1395,21 +1404,41 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// measured against the last received chunk — output that resumes
|
||||
// through a reconnect resets the clock so transient drops do not
|
||||
// truncate live runs (FAR-107).
|
||||
if (graceCheckInFlight) return;
|
||||
if (
|
||||
streamActivity.streamHasExited &&
|
||||
Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS
|
||||
) {
|
||||
// Stop the grace poller immediately so we don't double-fire while the
|
||||
// verification read below is in flight.
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
// The log stream exiting only means the container stopped producing
|
||||
// output — it does NOT prove the Job was deleted. Verify Job
|
||||
// presence with a one-shot read so we can distinguish:
|
||||
// (a) Job 404 → truly gone (TTL or external deletion)
|
||||
// (b) Job still present → K8s condition propagation lag (FAR-23)
|
||||
// Without this check we mis-classify (b) as "deleted externally" and
|
||||
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
|
||||
graceCheckInFlight = true;
|
||||
void (async () => {
|
||||
try {
|
||||
// Pod-phase gate (FAR-107): if the pod is still Running/Pending
|
||||
// the container is alive — Claude can be silent for >30s during
|
||||
// long tool calls (web fetches, slow upstream APIs). Refresh
|
||||
// the stream-activity timer, leave the poller armed, and let
|
||||
// waitForJobCompletion remain the authoritative signal. Only
|
||||
// proceed with the grace settlement when the pod has actually
|
||||
// reached a terminal phase or is gone.
|
||||
const podLookup = await lookupPodState(namespace, jobName, kubeconfigPath);
|
||||
if (!podLookup.podMissing && (podLookup.phase === "Running" || podLookup.phase === "Pending")) {
|
||||
streamActivity.lastActiveAt = Date.now();
|
||||
graceCheckInFlight = false;
|
||||
return;
|
||||
}
|
||||
} catch (err) {
|
||||
await onLog("stderr", `[paperclip] grace gate: pod state lookup failed (${err instanceof Error ? err.message : String(err)}) — falling through to Job-presence check\n`).catch(() => {});
|
||||
}
|
||||
// Pod is no longer Running — proceed with Job-presence verification.
|
||||
// Stop the grace poller immediately so we don't double-fire while the
|
||||
// verification read below is in flight.
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
// The log stream exiting only means the container stopped producing
|
||||
// output — it does NOT prove the Job was deleted. Verify Job
|
||||
// presence with a one-shot read so we can distinguish:
|
||||
// (a) Job 404 → truly gone (TTL or external deletion)
|
||||
// (b) Job still present → K8s condition propagation lag (FAR-23)
|
||||
// Without this check we mis-classify (b) as "deleted externally" and
|
||||
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
|
||||
try {
|
||||
await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace });
|
||||
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
|
||||
Reference in New Issue
Block a user