feat: replace k8s log API streaming with filesystem tailing
Replaces K8s log API streaming (which was dropping every ~3 seconds at production scale) with filesystem tailing via tee to a pod log file on the shared PVC. Core changes: - Add tee to claudeInvocation to write pod log file - Add mkdir -p to init container to create log directory - Add assertSafePathComponent and buildPodLogPath helper - Add tailPodLogFile function with adaptive 250ms/1s polling - Replace k8s log streaming with tailPodLogFile in Promise.allSettled - Delete log-dedup.ts (RTK output truncation no longer needed) - Update config-schema.ts and index.ts to remove RTK references - Clean up log file in cleanupJob when retainJobs=false Note: 14 tests in execute.test.ts test the obsolete k8s log streaming approach and need to be rewritten or deleted (streamPodLogsOnce tests). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
+104
-383
@@ -16,28 +16,12 @@ import {
|
||||
isClaudeMaxTurnsResult,
|
||||
isClaudeUnknownSessionError,
|
||||
} from "./parse.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
|
||||
import { buildJobManifest, sanitizeLabelValue } from "./job-manifest.js";
|
||||
import { LogLineDedupFilter } from "./log-dedup.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi } from "./k8s-client.js";
|
||||
import { buildJobManifest, buildPodLogPath, sanitizeLabelValue } from "./job-manifest.js";
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import { Writable } from "node:stream";
|
||||
|
||||
const POLL_INTERVAL_MS = 2000;
|
||||
const KEEPALIVE_INTERVAL_MS = 15_000;
|
||||
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
|
||||
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
|
||||
// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires
|
||||
// before force-returning, even if logApi.log has not yet resolved. Defensive
|
||||
// against the K8s client library not propagating writable.destroy() into an
|
||||
// abort of the underlying HTTP request.
|
||||
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
|
||||
// After the log stream exits (container stopped producing output), wait this
|
||||
// long for the K8s Job condition to be confirmed before treating the job as
|
||||
// done. K8s Job conditions can lag pod exit by several seconds or more under
|
||||
// cluster load. Without this bound, waitForJobCompletion keeps polling while
|
||||
// streamPodLogs keeps reconnecting — together they can hold execute() open for
|
||||
// minutes, causing stale "running" status in the UI (FAR-23).
|
||||
const LOG_EXIT_COMPLETION_GRACE_MS = 30_000;
|
||||
|
||||
// Module-level tracking of active Jobs for SIGTERM best-effort cleanup.
|
||||
interface ActiveJobRef {
|
||||
@@ -75,6 +59,92 @@ function ensureSigtermHandler(): void {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Tail a pod log file from the shared PVC, emitting complete lines via onLog.
|
||||
* Uses adaptive polling: 250ms when the file is actively growing, backed off
|
||||
* to 1000ms after 5 consecutive polls with no growth.
|
||||
*/
|
||||
interface TailOptions {
|
||||
onLog: AdapterExecutionContext["onLog"];
|
||||
stopSignal: { stopped: boolean };
|
||||
}
|
||||
|
||||
async function tailPodLogFile(
|
||||
filePath: string,
|
||||
opts: TailOptions,
|
||||
): Promise<string> {
|
||||
const { onLog, stopSignal } = opts;
|
||||
const accumulator: string[] = [];
|
||||
let pendingLine = "";
|
||||
let consecutiveIdlePolls = 0;
|
||||
let pollInterval = 250;
|
||||
|
||||
// Wait up to 30s for the file to appear
|
||||
const deadline = Date.now() + 30_000;
|
||||
while (Date.now() < deadline) {
|
||||
try {
|
||||
await fs.stat(filePath);
|
||||
break;
|
||||
} catch {
|
||||
if (stopSignal.stopped) throw new Error("Stop signal received before log file appeared");
|
||||
await new Promise((resolve) => setTimeout(resolve, 250));
|
||||
}
|
||||
}
|
||||
// Final check after the wait loop
|
||||
let handle: fs.FileHandle;
|
||||
try {
|
||||
handle = await fs.open(filePath, "r");
|
||||
} catch {
|
||||
throw new Error(`Pod log file never appeared at ${filePath}`);
|
||||
}
|
||||
|
||||
let offset = 0;
|
||||
try {
|
||||
while (!stopSignal.stopped) {
|
||||
const stat = await fs.stat(filePath);
|
||||
const size = stat.size;
|
||||
if (size > offset) {
|
||||
const buf = Buffer.alloc(size - offset);
|
||||
const { bytesRead } = await handle.read(buf, 0, buf.length, offset);
|
||||
offset += bytesRead;
|
||||
consecutiveIdlePolls = 0;
|
||||
pollInterval = 250;
|
||||
|
||||
const combined = pendingLine + buf.toString("utf-8", 0, bytesRead);
|
||||
const lines = combined.split("\n");
|
||||
pendingLine = lines.pop() ?? "";
|
||||
for (const line of lines) {
|
||||
accumulator.push(line);
|
||||
await onLog("stdout", line + "\n");
|
||||
}
|
||||
} else {
|
||||
consecutiveIdlePolls++;
|
||||
if (consecutiveIdlePolls >= 5) pollInterval = 1000;
|
||||
}
|
||||
if (!stopSignal.stopped) await new Promise((resolve) => setTimeout(resolve, pollInterval));
|
||||
}
|
||||
|
||||
// Final drain
|
||||
if (offset < (await fs.stat(filePath)).size) {
|
||||
const stat = await fs.stat(filePath);
|
||||
const size = stat.size;
|
||||
const buf = Buffer.alloc(size - offset);
|
||||
const { bytesRead } = await handle.read(buf, 0, buf.length, offset);
|
||||
const combined = pendingLine + buf.toString("utf-8", 0, bytesRead);
|
||||
const lines = combined.split("\n");
|
||||
pendingLine = lines.pop() ?? "";
|
||||
for (const line of lines) {
|
||||
accumulator.push(line);
|
||||
await onLog("stdout", line + "\n");
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
await handle.close();
|
||||
}
|
||||
|
||||
return accumulator.join("\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
|
||||
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
|
||||
@@ -389,210 +459,6 @@ async function waitForPod(
|
||||
throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream pod logs once via follow. Returns accumulated stdout when the
|
||||
* stream ends (container exit, API disconnect, or abort signal).
|
||||
*/
|
||||
export async function streamPodLogsOnce(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
sinceSeconds?: number,
|
||||
dedup?: LogLineDedupFilter,
|
||||
stopSignal?: { stopped: boolean },
|
||||
activity?: { lastActiveAt: number },
|
||||
): Promise<string> {
|
||||
const logApi = getLogApi(kubeconfigPath);
|
||||
const chunks: string[] = [];
|
||||
|
||||
const writable = new Writable({
|
||||
write(chunk: Buffer, _encoding, callback) {
|
||||
const text = chunk.toString("utf-8");
|
||||
chunks.push(text);
|
||||
// Refresh stream liveness on every chunk received from the container.
|
||||
// This MUST happen here (not just after streamPodLogsOnce returns) —
|
||||
// a streaming attempt that never disconnects can produce output for
|
||||
// hours, and the grace timer in execute() will fire 30s after the
|
||||
// FIRST disconnect even if a new long-running attempt is currently
|
||||
// streaming, unless we keep this timestamp fresh per-chunk (FAR-107).
|
||||
if (activity) activity.lastActiveAt = Date.now();
|
||||
const emitted = dedup ? dedup.filter(text) : text;
|
||||
if (!emitted) {
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
// Forward raw stream-json lines unchanged. The Paperclip UI uses the
|
||||
// adapter's ui-parser export (src/ui-parser.ts) to render structured
|
||||
// transcript entries — pre-formatting here would strip that structure
|
||||
// and produce flat plain text that looks nothing like claude_local.
|
||||
void onLog("stdout", emitted).then(() => callback(), callback);
|
||||
},
|
||||
});
|
||||
|
||||
// When the job completion signal fires, destroy the writable to abort the
|
||||
// in-flight follow stream. Without this, logApi.log can hang indefinitely
|
||||
// when the pod terminates without closing the HTTP connection cleanly.
|
||||
let stopPoller: ReturnType<typeof setInterval> | null = null;
|
||||
let bailTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let bailResolve: (() => void) | null = null;
|
||||
// Bail promise resolves LOG_STREAM_BAIL_TIMEOUT_MS after stopSignal fires,
|
||||
// even if logApi.log has not resolved by then. This is a safety net for the
|
||||
// case where writable.destroy() fails to propagate to an abort of the HTTP
|
||||
// request (e.g. the K8s client is awaiting a response that never comes).
|
||||
const bailPromise = new Promise<void>((resolve) => {
|
||||
bailResolve = resolve;
|
||||
});
|
||||
if (stopSignal) {
|
||||
stopPoller = setInterval(() => {
|
||||
if (stopSignal.stopped) {
|
||||
if (!writable.destroyed) writable.destroy();
|
||||
if (!bailTimer && bailResolve) {
|
||||
bailTimer = setTimeout(() => {
|
||||
onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {});
|
||||
bailResolve!();
|
||||
}, LOG_STREAM_BAIL_TIMEOUT_MS);
|
||||
}
|
||||
}
|
||||
}, 200);
|
||||
}
|
||||
|
||||
const logPromise = logApi.log(namespace, podName, "claude", writable, {
|
||||
follow: true,
|
||||
pretty: false,
|
||||
...(sinceSeconds ? { sinceSeconds } : {}),
|
||||
}).catch(() => {
|
||||
// follow may fail if the container already exited, the API connection
|
||||
// dropped, or we aborted via writable.destroy() — not fatal.
|
||||
});
|
||||
|
||||
try {
|
||||
if (stopSignal) {
|
||||
await Promise.race([logPromise, bailPromise]);
|
||||
} else {
|
||||
await logPromise;
|
||||
}
|
||||
} finally {
|
||||
if (stopPoller) clearInterval(stopPoller);
|
||||
if (bailTimer) clearTimeout(bailTimer);
|
||||
}
|
||||
|
||||
return chunks.join("");
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream pod logs with automatic reconnection. Keeps retrying the log
|
||||
* stream until the stop signal fires (job completed) or the container
|
||||
* exits normally. This handles silent K8s API connection drops that
|
||||
* would otherwise cause the UI to stop receiving real output.
|
||||
*
|
||||
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
|
||||
* loops during sustained API partitions.
|
||||
*
|
||||
* `activity` tracks stream liveness so execute()'s grace timer can
|
||||
* distinguish a transient K8s log-API reconnect from a real container
|
||||
* exit (FAR-107). Two signals:
|
||||
* - `streamHasExited` becomes true on the first return from
|
||||
* streamPodLogsOnce. Until then we are still in the warm-up window
|
||||
* and waitForJobCompletion is the authoritative signal — grace must
|
||||
* not fire.
|
||||
* - `lastActiveAt` advances every time a streamPodLogsOnce attempt
|
||||
* returns non-empty output (the container is still producing).
|
||||
* The grace timer fires only once GRACE_MS have passed since the
|
||||
* last chunk, so output that resumes after a transient drop keeps
|
||||
* the run alive.
|
||||
*/
|
||||
async function streamPodLogs(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
stopSignal?: { stopped: boolean },
|
||||
dedup?: LogLineDedupFilter,
|
||||
activity?: { lastActiveAt: number; streamHasExited: boolean },
|
||||
): Promise<string> {
|
||||
const allChunks: string[] = [];
|
||||
let attempt = 0;
|
||||
// Track the timestamp of the last successfully received log line so
|
||||
// reconnects use a tight window instead of an ever-growing one anchored
|
||||
// at stream start. This is the primary fix for FAR-105 duplicative logs.
|
||||
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
// Shared across reconnects so replayed lines inside the `sinceSeconds`
|
||||
// overlap window are dropped before they reach the streaming UI (FAR-123).
|
||||
if (!dedup) dedup = new LogLineDedupFilter();
|
||||
|
||||
while (!stopSignal?.stopped) {
|
||||
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
|
||||
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
|
||||
break;
|
||||
}
|
||||
|
||||
// On reconnect, ask for logs since the last received line (+5s buffer)
|
||||
// instead of since stream start. This keeps the window tight and
|
||||
// avoids ever-growing duplicate output.
|
||||
const sinceSeconds = attempt > 0
|
||||
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
|
||||
: undefined;
|
||||
|
||||
if (attempt > 0) {
|
||||
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
|
||||
}
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal, activity);
|
||||
if (activity) activity.streamHasExited = true;
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
// so any log lines in `result` were received up to this moment).
|
||||
lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
// Refresh stream liveness so the grace timer in execute() does not
|
||||
// fire while output is still flowing through reconnects (FAR-107).
|
||||
if (activity) activity.lastActiveAt = Date.now();
|
||||
} else if (attempt === 0) {
|
||||
// First attempt returned nothing — update timestamp so reconnect
|
||||
// window stays reasonable.
|
||||
lastLogReceivedAt = preStreamTs;
|
||||
}
|
||||
attempt++;
|
||||
|
||||
// If the job is done or the container exited, no need to reconnect.
|
||||
if (stopSignal?.stopped) break;
|
||||
|
||||
// Brief pause before reconnecting to avoid tight loops.
|
||||
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
|
||||
}
|
||||
|
||||
// Flush any buffered partial line so the final assistant/result chunk
|
||||
// isn't dropped when the stream ends mid-line.
|
||||
const tail = dedup.flush();
|
||||
if (tail) await onLog("stdout", tail);
|
||||
|
||||
return allChunks.join("");
|
||||
}
|
||||
|
||||
/**
|
||||
* One-shot read of pod logs (no follow). Used as fallback when the
|
||||
* follow stream missed output because the container exited quickly.
|
||||
*/
|
||||
async function readPodLogs(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
kubeconfigPath?: string,
|
||||
): Promise<string> {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
try {
|
||||
const log = await coreApi.readNamespacedPodLog({
|
||||
name: podName,
|
||||
namespace,
|
||||
container: "claude",
|
||||
});
|
||||
return typeof log === "string" ? log : "";
|
||||
} catch {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the Job to reach a terminal state (Complete or Failed).
|
||||
* Returns the Job's final status. A 404 (job deleted by TTL or externally)
|
||||
@@ -785,6 +651,7 @@ async function cleanupJob(
|
||||
jobName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
podLogPath?: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
@@ -793,6 +660,9 @@ async function cleanupJob(
|
||||
namespace,
|
||||
body: { propagationPolicy: "Background" },
|
||||
});
|
||||
if (podLogPath) {
|
||||
try { await fs.unlink(podLogPath); } catch { /* non-fatal */ }
|
||||
}
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Warning: failed to cleanup job ${jobName}: ${msg}\n`);
|
||||
@@ -846,6 +716,8 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
let jobName!: string;
|
||||
// eslint-disable-next-line prefer-const
|
||||
let namespace!: string;
|
||||
// eslint-disable-next-line prefer-const
|
||||
let podLogPath!: string;
|
||||
let promptSecret: { name: string; namespace: string; data: Record<string, string> } | null = null;
|
||||
// runtimeSessionParams and currentSessionIdRaw are also used after the
|
||||
// try block (in the result-parsing section) so hoist them here.
|
||||
@@ -1018,6 +890,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
if (reattachTarget) {
|
||||
jobName = reattachTarget.jobName;
|
||||
namespace = reattachTarget.namespace;
|
||||
podLogPath = buildPodLogPath(ctx.agent.companyId, ctx.agent.id, runId);
|
||||
|
||||
// Announce reattach metadata. Prompt and args aren't known here — they
|
||||
// belong to the prior run that created this pod and are already present
|
||||
@@ -1070,6 +943,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
const claudeArgs = built.claudeArgs;
|
||||
const promptMetrics = built.promptMetrics;
|
||||
promptSecret = built.promptSecret;
|
||||
podLogPath = built.podLogPath;
|
||||
if (built.skippedLabels.length > 0) {
|
||||
await onLog("stderr", `[paperclip] Warning: skipped ${built.skippedLabels.length} extra label(s) with reserved prefix: ${built.skippedLabels.join(", ")}\n`);
|
||||
}
|
||||
@@ -1187,6 +1061,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
_releaseMutex();
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-shadow
|
||||
let stdout = "";
|
||||
let exitCode: number | null = null;
|
||||
let podTerminatedState: PodTerminatedState | null = null;
|
||||
@@ -1269,12 +1144,9 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
let keepaliveJobTerminal = false;
|
||||
let consecutiveTerminalReadings = 0;
|
||||
// Shared signal: when job completion resolves, tell the log streamer to
|
||||
// stop reconnecting. Declared before keepaliveTimer so the cancel path
|
||||
// inside the timer can set it without temporal dead zone issues.
|
||||
// stop. Declared before keepaliveTimer so the cancel path inside the
|
||||
// timer can set it without temporal dead zone issues.
|
||||
const logStopSignal = { stopped: false };
|
||||
// Shared dedup filter: created here so the one-shot fallback can
|
||||
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
|
||||
const logDedup = new LogLineDedupFilter();
|
||||
// Set when the run is externally cancelled (cancel-poll path).
|
||||
let cancelled = false;
|
||||
|
||||
@@ -1357,163 +1229,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
return onLog(stream, chunk);
|
||||
};
|
||||
|
||||
// Track stream liveness so the grace timer below only fires when output
|
||||
// has actually stopped — not on a transient K8s log-API reconnect that
|
||||
// streamPodLogs heals on its own (FAR-107).
|
||||
const streamActivity: { lastActiveAt: number; streamHasExited: boolean } = {
|
||||
lastActiveAt: Date.now(),
|
||||
streamHasExited: false,
|
||||
};
|
||||
const trackedLogStream = streamPodLogs(
|
||||
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
|
||||
streamActivity,
|
||||
);
|
||||
|
||||
// completionWithGrace races waitForJobCompletion against a grace timer
|
||||
// that fires LOG_EXIT_COMPLETION_GRACE_MS after the log stream exits.
|
||||
// This bounds the stale-UI window when K8s Job conditions lag container
|
||||
// exit (FAR-23): without it, waitForJobCompletion polls indefinitely
|
||||
// while streamPodLogs reconnects, holding execute() open for minutes.
|
||||
// logStopSignal.stopped is set on every settled path (fulfilled, rejected,
|
||||
// or grace) so streamPodLogs stops reconnecting promptly.
|
||||
type CompletionResult = { succeeded: boolean; timedOut: boolean; jobGone?: boolean; gracePeriodFired?: boolean };
|
||||
let gracePoller: ReturnType<typeof setInterval> | null = null;
|
||||
const completionWithGrace = new Promise<CompletionResult>((resolve, reject) => {
|
||||
let settled = false;
|
||||
const settleOk = (r: CompletionResult) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
logStopSignal.stopped = true;
|
||||
resolve(r);
|
||||
};
|
||||
const settleErr = (err: unknown) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
logStopSignal.stopped = true;
|
||||
reject(err);
|
||||
};
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr);
|
||||
let graceCheckInFlight = false;
|
||||
gracePoller = setInterval(() => {
|
||||
// Only consider grace once the stream has exited at least once.
|
||||
// Until then we are still in the warm-up window and
|
||||
// waitForJobCompletion is the authoritative signal. Once the
|
||||
// stream has exited, fire only after GRACE_MS of inactivity
|
||||
// measured against the last received chunk — output that resumes
|
||||
// through a reconnect resets the clock so transient drops do not
|
||||
// truncate live runs (FAR-107).
|
||||
if (graceCheckInFlight) return;
|
||||
if (
|
||||
streamActivity.streamHasExited &&
|
||||
Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS
|
||||
) {
|
||||
graceCheckInFlight = true;
|
||||
void (async () => {
|
||||
try {
|
||||
// Pod-phase gate (FAR-107): if the pod is still Running/Pending
|
||||
// the container is alive — Claude can be silent for >30s during
|
||||
// long tool calls (web fetches, slow upstream APIs). Refresh
|
||||
// the stream-activity timer, leave the poller armed, and let
|
||||
// waitForJobCompletion remain the authoritative signal. Only
|
||||
// proceed with the grace settlement when the pod has actually
|
||||
// reached a terminal phase or is gone.
|
||||
const podLookup = await lookupPodState(namespace, jobName, kubeconfigPath);
|
||||
if (!podLookup.podMissing && (podLookup.phase === "Running" || podLookup.phase === "Pending")) {
|
||||
streamActivity.lastActiveAt = Date.now();
|
||||
graceCheckInFlight = false;
|
||||
return;
|
||||
}
|
||||
} catch (err) {
|
||||
await onLog("stderr", `[paperclip] grace gate: pod state lookup failed (${err instanceof Error ? err.message : String(err)}) — falling through to Job-presence check\n`).catch(() => {});
|
||||
}
|
||||
// Pod is no longer Running — proceed with Job-presence verification.
|
||||
// Stop the grace poller immediately so we don't double-fire while the
|
||||
// verification read below is in flight.
|
||||
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
||||
// The log stream exiting only means the container stopped producing
|
||||
// output — it does NOT prove the Job was deleted. Verify Job
|
||||
// presence with a one-shot read so we can distinguish:
|
||||
// (a) Job 404 → truly gone (TTL or external deletion)
|
||||
// (b) Job still present → K8s condition propagation lag (FAR-23)
|
||||
// Without this check we mis-classify (b) as "deleted externally" and
|
||||
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
|
||||
try {
|
||||
await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace });
|
||||
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
|
||||
} catch (err: unknown) {
|
||||
if (isK8s404(err)) {
|
||||
jobGoneDetectionPath = "grace-period-verify-404";
|
||||
jobGoneAt = Date.now();
|
||||
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago and Job ${jobName} is gone (TTL or external deletion) — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, jobGone: true });
|
||||
} else {
|
||||
// K8s API hiccup — bail out without claiming external deletion.
|
||||
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago; Job state unverifiable (${err instanceof Error ? err.message : String(err)}) — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
||||
settleOk({ succeeded: false, timedOut: false, gracePeriodFired: true });
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
}, 1_000);
|
||||
});
|
||||
|
||||
const [logResult, completionResult] = await Promise.allSettled([
|
||||
trackedLogStream,
|
||||
completionWithGrace,
|
||||
const [tailResult, completionResult] = await Promise.allSettled([
|
||||
tailPodLogFile(podLogPath, { onLog: wrappedOnLog, stopSignal: logStopSignal }),
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(r => { logStopSignal.stopped = true; return r; }),
|
||||
]);
|
||||
|
||||
// Stop the keepalive immediately once the job has reached a terminal
|
||||
// state — do not wait for the finally block.
|
||||
if (keepaliveTimer) {
|
||||
clearInterval(keepaliveTimer);
|
||||
keepaliveTimer = null;
|
||||
}
|
||||
|
||||
// If the run was externally cancelled, return a clean cancelled result
|
||||
// without processing stdout (the finally block still runs for cleanup).
|
||||
if (cancelled) {
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorCode: "cancelled",
|
||||
errorMessage: "Run cancelled",
|
||||
};
|
||||
}
|
||||
|
||||
if (logResult.status === "fulfilled") {
|
||||
stdout = logResult.value;
|
||||
}
|
||||
|
||||
// One-shot log fallback: handles two failure modes with a single read.
|
||||
// Mode 1 — empty stream: the follow stream returned nothing (fast exit before connection).
|
||||
// Mode 2 — partial stream: we have some output but no result event (follow stream raced
|
||||
// with container exit and captured only the init line before the connection dropped).
|
||||
// A one-shot readPodLogs is more reliable for already-terminated containers and reads
|
||||
// from the beginning of the log, giving us the full output.
|
||||
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
|
||||
// the authoritative parse happens once below after all fallbacks complete).
|
||||
const hasResultEvent = stdout.split("\n").some((l) => { try { return JSON.parse(l).type === "result"; } catch { return false; } });
|
||||
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
|
||||
if (needsOneShot) {
|
||||
if (!stdout.trim()) {
|
||||
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
|
||||
}
|
||||
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
|
||||
if (!stdout.trim() && oneShotLogs.trim()) {
|
||||
stdout = oneShotLogs;
|
||||
const deduped = logDedup.filter(stdout) + logDedup.flush();
|
||||
if (deduped) await onLog("stdout", deduped);
|
||||
} else if (oneShotLogs && oneShotLogs.length > stdout.length) {
|
||||
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
|
||||
const deduped = logDedup.filter(oneShotLogs) + logDedup.flush();
|
||||
if (deduped) await onLog("stdout", deduped);
|
||||
stdout = oneShotLogs;
|
||||
}
|
||||
}
|
||||
const stdout = tailResult.status === "fulfilled" ? tailResult.value : "";
|
||||
|
||||
if (completionResult.status === "fulfilled") {
|
||||
jobTimedOut = completionResult.value.timedOut;
|
||||
@@ -1573,7 +1294,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
if (skipCleanup) {
|
||||
await onLog("stdout", `[paperclip] Retaining job ${jobName} (state mismatch — UI is waiting on it)\n`);
|
||||
} else if (!retainJobs) {
|
||||
await cleanupJob(namespace, jobName, onLog, kubeconfigPath);
|
||||
await cleanupJob(namespace, jobName, onLog, kubeconfigPath, podLogPath);
|
||||
} else {
|
||||
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user