|
|
|
@@ -401,6 +401,7 @@ export async function streamPodLogsOnce(
|
|
|
|
|
sinceSeconds?: number,
|
|
|
|
|
dedup?: LogLineDedupFilter,
|
|
|
|
|
stopSignal?: { stopped: boolean },
|
|
|
|
|
activity?: { lastActiveAt: number },
|
|
|
|
|
): Promise<string> {
|
|
|
|
|
const logApi = getLogApi(kubeconfigPath);
|
|
|
|
|
const chunks: string[] = [];
|
|
|
|
@@ -409,6 +410,13 @@ export async function streamPodLogsOnce(
|
|
|
|
|
write(chunk: Buffer, _encoding, callback) {
|
|
|
|
|
const text = chunk.toString("utf-8");
|
|
|
|
|
chunks.push(text);
|
|
|
|
|
// Refresh stream liveness on every chunk received from the container.
|
|
|
|
|
// This MUST happen here (not just after streamPodLogsOnce returns) —
|
|
|
|
|
// a streaming attempt that never disconnects can produce output for
|
|
|
|
|
// hours, and the grace timer in execute() will fire 30s after the
|
|
|
|
|
// FIRST disconnect even if a new long-running attempt is currently
|
|
|
|
|
// streaming, unless we keep this timestamp fresh per-chunk (FAR-107).
|
|
|
|
|
if (activity) activity.lastActiveAt = Date.now();
|
|
|
|
|
const emitted = dedup ? dedup.filter(text) : text;
|
|
|
|
|
if (!emitted) {
|
|
|
|
|
callback();
|
|
|
|
@@ -481,10 +489,18 @@ export async function streamPodLogsOnce(
|
|
|
|
|
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
|
|
|
|
|
* loops during sustained API partitions.
|
|
|
|
|
*
|
|
|
|
|
* onFirstStreamExit is called the first time streamPodLogsOnce returns
|
|
|
|
|
* (container has exited or stream disconnected). Used by execute() to
|
|
|
|
|
* start the LOG_EXIT_COMPLETION_GRACE_MS grace timer (FAR-23) without
|
|
|
|
|
* waiting for all reconnects to exhaust.
|
|
|
|
|
* `activity` tracks stream liveness so execute()'s grace timer can
|
|
|
|
|
* distinguish a transient K8s log-API reconnect from a real container
|
|
|
|
|
* exit (FAR-107). Two signals:
|
|
|
|
|
* - `streamHasExited` becomes true on the first return from
|
|
|
|
|
* streamPodLogsOnce. Until then we are still in the warm-up window
|
|
|
|
|
* and waitForJobCompletion is the authoritative signal — grace must
|
|
|
|
|
* not fire.
|
|
|
|
|
* - `lastActiveAt` advances every time a streamPodLogsOnce attempt
|
|
|
|
|
* returns non-empty output (the container is still producing).
|
|
|
|
|
* The grace timer fires only once GRACE_MS have passed since the
|
|
|
|
|
* last chunk, so output that resumes after a transient drop keeps
|
|
|
|
|
* the run alive.
|
|
|
|
|
*/
|
|
|
|
|
async function streamPodLogs(
|
|
|
|
|
namespace: string,
|
|
|
|
@@ -493,7 +509,7 @@ async function streamPodLogs(
|
|
|
|
|
kubeconfigPath?: string,
|
|
|
|
|
stopSignal?: { stopped: boolean },
|
|
|
|
|
dedup?: LogLineDedupFilter,
|
|
|
|
|
onFirstStreamExit?: () => void,
|
|
|
|
|
activity?: { lastActiveAt: number; streamHasExited: boolean },
|
|
|
|
|
): Promise<string> {
|
|
|
|
|
const allChunks: string[] = [];
|
|
|
|
|
let attempt = 0;
|
|
|
|
@@ -523,15 +539,16 @@ async function streamPodLogs(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const preStreamTs = Math.floor(Date.now() / 1000);
|
|
|
|
|
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
|
|
|
|
|
// Signal first stream exit immediately so the grace-period timer in
|
|
|
|
|
// execute() can start without waiting for all reconnects to complete.
|
|
|
|
|
if (attempt === 0) onFirstStreamExit?.();
|
|
|
|
|
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal, activity);
|
|
|
|
|
if (activity) activity.streamHasExited = true;
|
|
|
|
|
if (result) {
|
|
|
|
|
allChunks.push(result);
|
|
|
|
|
// Update last-received timestamp to now (the stream just ended,
|
|
|
|
|
// so any log lines in `result` were received up to this moment).
|
|
|
|
|
lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
|
|
|
|
// Refresh stream liveness so the grace timer in execute() does not
|
|
|
|
|
// fire while output is still flowing through reconnects (FAR-107).
|
|
|
|
|
if (activity) activity.lastActiveAt = Date.now();
|
|
|
|
|
} else if (attempt === 0) {
|
|
|
|
|
// First attempt returned nothing — update timestamp so reconnect
|
|
|
|
|
// window stays reasonable.
|
|
|
|
@@ -650,30 +667,82 @@ export interface PodTerminatedState {
|
|
|
|
|
signal: number | null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function getPodTerminatedState(
|
|
|
|
|
/**
|
|
|
|
|
* Result of a pod-state lookup. `state` is the terminated state when available;
|
|
|
|
|
* `phase` and `podMissing` give the caller enough context to render an honest
|
|
|
|
|
* truncation-cause message instead of guessing "likely deleted" (FAR-107).
|
|
|
|
|
*/
|
|
|
|
|
export interface PodLookupResult {
|
|
|
|
|
state: PodTerminatedState | null;
|
|
|
|
|
phase: string | null;
|
|
|
|
|
podMissing: boolean;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function lookupPodState(
|
|
|
|
|
namespace: string,
|
|
|
|
|
jobName: string,
|
|
|
|
|
kubeconfigPath?: string,
|
|
|
|
|
): Promise<PodTerminatedState | null> {
|
|
|
|
|
): Promise<PodLookupResult> {
|
|
|
|
|
const coreApi = getCoreApi(kubeconfigPath);
|
|
|
|
|
const podList = await coreApi.listNamespacedPod({
|
|
|
|
|
namespace,
|
|
|
|
|
labelSelector: `job-name=${jobName}`,
|
|
|
|
|
});
|
|
|
|
|
const pod = podList.items[0];
|
|
|
|
|
if (!pod) return null;
|
|
|
|
|
if (!pod) return { state: null, phase: null, podMissing: true };
|
|
|
|
|
|
|
|
|
|
const phase = pod.status?.phase ?? null;
|
|
|
|
|
const containerStatus = pod.status?.containerStatuses?.find((s) => s.name === "claude");
|
|
|
|
|
const terminated = containerStatus?.state?.terminated;
|
|
|
|
|
if (!terminated) return null;
|
|
|
|
|
if (!terminated) return { state: null, phase, podMissing: false };
|
|
|
|
|
return {
|
|
|
|
|
exitCode: terminated.exitCode ?? null,
|
|
|
|
|
reason: terminated.reason ?? null,
|
|
|
|
|
message: (terminated.message ?? "").trim() || null,
|
|
|
|
|
signal: terminated.signal ?? null,
|
|
|
|
|
state: {
|
|
|
|
|
exitCode: terminated.exitCode ?? null,
|
|
|
|
|
reason: terminated.reason ?? null,
|
|
|
|
|
message: (terminated.message ?? "").trim() || null,
|
|
|
|
|
signal: terminated.signal ?? null,
|
|
|
|
|
},
|
|
|
|
|
phase,
|
|
|
|
|
podMissing: false,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Read the claude container's terminated state, retrying briefly when the pod
|
|
|
|
|
* exists in a terminal phase but kubelet has not yet propagated the
|
|
|
|
|
* containerStatuses[].state.terminated field. Without this retry, fast
|
|
|
|
|
* truncated-stream exits surface as "pod state unavailable" (FAR-107) and
|
|
|
|
|
* mask the real exit code / OOMKilled / SIGTERM cause.
|
|
|
|
|
*/
|
|
|
|
|
async function getPodLookupWithRetry(
|
|
|
|
|
namespace: string,
|
|
|
|
|
jobName: string,
|
|
|
|
|
kubeconfigPath?: string,
|
|
|
|
|
attempts = 4,
|
|
|
|
|
delayMs = 500,
|
|
|
|
|
): Promise<PodLookupResult> {
|
|
|
|
|
let last: PodLookupResult = { state: null, phase: null, podMissing: true };
|
|
|
|
|
for (let i = 0; i < attempts; i++) {
|
|
|
|
|
last = await lookupPodState(namespace, jobName, kubeconfigPath);
|
|
|
|
|
if (last.state) return last;
|
|
|
|
|
if (last.podMissing) return last;
|
|
|
|
|
// Pod exists but no terminated state. If it is in a terminal phase the
|
|
|
|
|
// containerStatuses update is in flight — wait briefly and retry. If it
|
|
|
|
|
// is still Running/Pending, retrying is unlikely to help, so bail.
|
|
|
|
|
if (last.phase !== "Succeeded" && last.phase !== "Failed") return last;
|
|
|
|
|
if (i < attempts - 1) await new Promise((r) => setTimeout(r, delayMs));
|
|
|
|
|
}
|
|
|
|
|
return last;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function getPodTerminatedState(
|
|
|
|
|
namespace: string,
|
|
|
|
|
jobName: string,
|
|
|
|
|
kubeconfigPath?: string,
|
|
|
|
|
): Promise<PodTerminatedState | null> {
|
|
|
|
|
return (await lookupPodState(namespace, jobName, kubeconfigPath)).state;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Format a human-readable explanation for a truncated run, including the
|
|
|
|
|
* pod's claude-container terminated state when available. Exit code 137
|
|
|
|
@@ -682,9 +751,17 @@ async function getPodTerminatedState(
|
|
|
|
|
*/
|
|
|
|
|
export function describeTruncationCause(
|
|
|
|
|
state: PodTerminatedState | null,
|
|
|
|
|
lookup?: PodLookupResult,
|
|
|
|
|
): string {
|
|
|
|
|
if (!state) {
|
|
|
|
|
return "pod state unavailable — likely deleted before exit could be read";
|
|
|
|
|
if (lookup?.podMissing) {
|
|
|
|
|
return "pod is gone — Job pod was removed (eviction, preemption, or external delete) before exit could be read";
|
|
|
|
|
}
|
|
|
|
|
if (lookup && !lookup.podMissing) {
|
|
|
|
|
const phaseHint = lookup.phase ? `pod phase=${lookup.phase}` : "pod present";
|
|
|
|
|
return `container terminated state not yet observable (${phaseHint}) — kubelet status update did not land within retry window; exit cause unknown`;
|
|
|
|
|
}
|
|
|
|
|
return "pod state unavailable — exit cause unknown";
|
|
|
|
|
}
|
|
|
|
|
const parts: string[] = [];
|
|
|
|
|
if (state.exitCode !== null) {
|
|
|
|
@@ -1280,17 +1357,16 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|
|
|
|
return onLog(stream, chunk);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Track when the log stream first exits so the grace-period can fire
|
|
|
|
|
// if the K8s Job condition lags behind container exit (FAR-23).
|
|
|
|
|
// Set via onFirstStreamExit callback (called after attempt=0 returns)
|
|
|
|
|
// rather than in .then() of streamPodLogs, which would create a
|
|
|
|
|
// deadlock: streamPodLogs only resolves after stopSignal is set, but
|
|
|
|
|
// stopSignal is set by the grace timer which needs logExitTime to be
|
|
|
|
|
// non-null.
|
|
|
|
|
let logExitTime: number | null = null;
|
|
|
|
|
// Track stream liveness so the grace timer below only fires when output
|
|
|
|
|
// has actually stopped — not on a transient K8s log-API reconnect that
|
|
|
|
|
// streamPodLogs heals on its own (FAR-107).
|
|
|
|
|
const streamActivity: { lastActiveAt: number; streamHasExited: boolean } = {
|
|
|
|
|
lastActiveAt: Date.now(),
|
|
|
|
|
streamHasExited: false,
|
|
|
|
|
};
|
|
|
|
|
const trackedLogStream = streamPodLogs(
|
|
|
|
|
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
|
|
|
|
|
() => { logExitTime = Date.now(); },
|
|
|
|
|
streamActivity,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// completionWithGrace races waitForJobCompletion against a grace timer
|
|
|
|
@@ -1319,19 +1395,50 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|
|
|
|
reject(err);
|
|
|
|
|
};
|
|
|
|
|
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath, jobObserver).then(settleOk).catch(settleErr);
|
|
|
|
|
let graceCheckInFlight = false;
|
|
|
|
|
gracePoller = setInterval(() => {
|
|
|
|
|
if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) {
|
|
|
|
|
// Stop the grace poller immediately so we don't double-fire while the
|
|
|
|
|
// verification read below is in flight.
|
|
|
|
|
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
|
|
|
|
// The log stream exiting only means the container stopped producing
|
|
|
|
|
// output — it does NOT prove the Job was deleted. Verify Job
|
|
|
|
|
// presence with a one-shot read so we can distinguish:
|
|
|
|
|
// (a) Job 404 → truly gone (TTL or external deletion)
|
|
|
|
|
// (b) Job still present → K8s condition propagation lag (FAR-23)
|
|
|
|
|
// Without this check we mis-classify (b) as "deleted externally" and
|
|
|
|
|
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
|
|
|
|
|
// Only consider grace once the stream has exited at least once.
|
|
|
|
|
// Until then we are still in the warm-up window and
|
|
|
|
|
// waitForJobCompletion is the authoritative signal. Once the
|
|
|
|
|
// stream has exited, fire only after GRACE_MS of inactivity
|
|
|
|
|
// measured against the last received chunk — output that resumes
|
|
|
|
|
// through a reconnect resets the clock so transient drops do not
|
|
|
|
|
// truncate live runs (FAR-107).
|
|
|
|
|
if (graceCheckInFlight) return;
|
|
|
|
|
if (
|
|
|
|
|
streamActivity.streamHasExited &&
|
|
|
|
|
Date.now() - streamActivity.lastActiveAt >= LOG_EXIT_COMPLETION_GRACE_MS
|
|
|
|
|
) {
|
|
|
|
|
graceCheckInFlight = true;
|
|
|
|
|
void (async () => {
|
|
|
|
|
try {
|
|
|
|
|
// Pod-phase gate (FAR-107): if the pod is still Running/Pending
|
|
|
|
|
// the container is alive — Claude can be silent for >30s during
|
|
|
|
|
// long tool calls (web fetches, slow upstream APIs). Refresh
|
|
|
|
|
// the stream-activity timer, leave the poller armed, and let
|
|
|
|
|
// waitForJobCompletion remain the authoritative signal. Only
|
|
|
|
|
// proceed with the grace settlement when the pod has actually
|
|
|
|
|
// reached a terminal phase or is gone.
|
|
|
|
|
const podLookup = await lookupPodState(namespace, jobName, kubeconfigPath);
|
|
|
|
|
if (!podLookup.podMissing && (podLookup.phase === "Running" || podLookup.phase === "Pending")) {
|
|
|
|
|
streamActivity.lastActiveAt = Date.now();
|
|
|
|
|
graceCheckInFlight = false;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
} catch (err) {
|
|
|
|
|
await onLog("stderr", `[paperclip] grace gate: pod state lookup failed (${err instanceof Error ? err.message : String(err)}) — falling through to Job-presence check\n`).catch(() => {});
|
|
|
|
|
}
|
|
|
|
|
// Pod is no longer Running — proceed with Job-presence verification.
|
|
|
|
|
// Stop the grace poller immediately so we don't double-fire while the
|
|
|
|
|
// verification read below is in flight.
|
|
|
|
|
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
|
|
|
|
|
// The log stream exiting only means the container stopped producing
|
|
|
|
|
// output — it does NOT prove the Job was deleted. Verify Job
|
|
|
|
|
// presence with a one-shot read so we can distinguish:
|
|
|
|
|
// (a) Job 404 → truly gone (TTL or external deletion)
|
|
|
|
|
// (b) Job still present → K8s condition propagation lag (FAR-23)
|
|
|
|
|
// Without this check we mis-classify (b) as "deleted externally" and
|
|
|
|
|
// emit a false-positive k8s_job_deleted_externally error (FAR-107).
|
|
|
|
|
try {
|
|
|
|
|
await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace });
|
|
|
|
|
await onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update; Job ${jobName} still present — proceeding with captured output (FAR-23)\n`).catch(() => {});
|
|
|
|
@@ -1554,7 +1661,23 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
if (parsedStream.truncatedMidStream) {
|
|
|
|
|
const cause = describeTruncationCause(podTerminatedState);
|
|
|
|
|
// Re-query pod state with retry — the initial single-shot read can lose
|
|
|
|
|
// to kubelet propagation lag and surface a useless "pod state unavailable"
|
|
|
|
|
// message that hides the real exit cause (OOMKilled, SIGTERM, etc). The
|
|
|
|
|
// retry distinguishes pod-genuinely-gone from terminated-state-lag and
|
|
|
|
|
// gives the operator the actual exit code/reason where possible (FAR-107).
|
|
|
|
|
let lookup: PodLookupResult | undefined;
|
|
|
|
|
let refreshedState = podTerminatedState;
|
|
|
|
|
try {
|
|
|
|
|
lookup = await getPodLookupWithRetry(namespace, jobName, kubeconfigPath);
|
|
|
|
|
refreshedState = lookup.state;
|
|
|
|
|
if (refreshedState && refreshedState.exitCode !== null) {
|
|
|
|
|
exitCode = refreshedState.exitCode;
|
|
|
|
|
}
|
|
|
|
|
} catch (err) {
|
|
|
|
|
await onLog("stderr", `[paperclip] truncation diagnostic: pod re-query failed (${err instanceof Error ? err.message : String(err)})\n`).catch(() => {});
|
|
|
|
|
}
|
|
|
|
|
const cause = describeTruncationCause(refreshedState, lookup);
|
|
|
|
|
const modelHint = parsedStream.model ? ` (model: ${parsedStream.model})` : "";
|
|
|
|
|
return {
|
|
|
|
|
exitCode,
|
|
|
|
|