fix: handle K8s 404 (job deleted) gracefully in waitForJobCompletion (FAR-122) #5
+44
-10
@@ -16,6 +16,19 @@ const KEEPALIVE_INTERVAL_MS = 15_000;
|
||||
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
|
||||
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
|
||||
|
||||
/**
|
||||
* Detect a Kubernetes 404 (Not Found) error from @kubernetes/client-node.
|
||||
* Works for both v0.x (response.statusCode) and v1.0+ (response.status, message).
|
||||
*/
|
||||
function isK8s404(err: unknown): boolean {
|
||||
if (!(err instanceof Error)) return false;
|
||||
const e = err as unknown as Record<string, unknown>;
|
||||
const resp = e.response as Record<string, unknown> | undefined;
|
||||
if (resp?.statusCode === 404 || resp?.status === 404) return true;
|
||||
if (e.statusCode === 404) return true;
|
||||
return /HTTP-Code:\s*404\b/.test(err.message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the Job's pod to reach a terminal or running state.
|
||||
* Returns the pod name once logs can be streamed, or throws on failure.
|
||||
@@ -251,19 +264,32 @@ async function readPodLogs(
|
||||
|
||||
/**
|
||||
* Wait for the Job to reach a terminal state (Complete or Failed).
|
||||
* Returns the Job's final status.
|
||||
* Returns the Job's final status. A 404 (job deleted by TTL or externally)
|
||||
* is treated as a soft terminal: succeeded=false, timedOut=false, jobGone=true.
|
||||
* The caller should log this and fall through to stdout parsing.
|
||||
*/
|
||||
async function waitForJobCompletion(
|
||||
namespace: string,
|
||||
jobName: string,
|
||||
timeoutMs: number,
|
||||
kubeconfigPath?: string,
|
||||
): Promise<{ succeeded: boolean; timedOut: boolean }> {
|
||||
): Promise<{ succeeded: boolean; timedOut: boolean; jobGone?: boolean }> {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : 0;
|
||||
|
||||
while (deadline === 0 || Date.now() < deadline) {
|
||||
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
|
||||
let job;
|
||||
try {
|
||||
job = await batchApi.readNamespacedJob({ name: jobName, namespace });
|
||||
} catch (err: unknown) {
|
||||
if (isK8s404(err)) {
|
||||
// Job was deleted (TTL garbage collection or external deletion) before
|
||||
// we detected its terminal condition. The container must have already
|
||||
// exited for TTL to fire, so log streaming will have captured the output.
|
||||
return { succeeded: false, timedOut: false, jobGone: true };
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
const conditions = job.status?.conditions ?? [];
|
||||
|
||||
const complete = conditions.find((c) => c.type === "Complete" && c.status === "True");
|
||||
@@ -561,9 +587,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// connection resets should NOT permanently disable the keepalive —
|
||||
// the next tick will re-check and the reaper uses the staleness
|
||||
// window as a safety net.
|
||||
const statusCode = (err as { response?: { statusCode?: number } })?.response?.statusCode
|
||||
?? (err as { statusCode?: number })?.statusCode;
|
||||
if (statusCode === 404) {
|
||||
if (isK8s404(err)) {
|
||||
keepaliveJobTerminal = true;
|
||||
return;
|
||||
}
|
||||
@@ -628,11 +652,17 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
|
||||
if (completionResult.status === "fulfilled") {
|
||||
jobTimedOut = completionResult.value.timedOut;
|
||||
if (completionResult.value.jobGone) {
|
||||
// Job was deleted by TTL or externally before we observed the Complete/Failed
|
||||
// condition. The container must have exited first (TTL only fires after
|
||||
// completion), so log streaming has captured the full output — continue
|
||||
// to stdout parsing rather than returning an error.
|
||||
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
|
||||
}
|
||||
} else {
|
||||
// waitForJobCompletion threw — re-check job state to avoid returning
|
||||
// while the job is still running (which would cause UI staleness and
|
||||
// concurrency errors on retry). Use a bounded timeout (60s) so we
|
||||
// don't hang the heartbeat indefinitely if the K8s API is degraded.
|
||||
// waitForJobCompletion threw an unexpected error — re-check job state to
|
||||
// avoid returning while the job is still running. Use a bounded timeout
|
||||
// (60s) so we don't hang the heartbeat indefinitely if the K8s API is degraded.
|
||||
jobTimedOut = false;
|
||||
const RECHECK_TIMEOUT_MS = 60_000;
|
||||
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath);
|
||||
@@ -640,6 +670,10 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// Re-check itself timed out — the job may still be running.
|
||||
// Return an error so the UI knows the run is not done.
|
||||
jobTimedOut = true;
|
||||
} else if (actualState.jobGone) {
|
||||
// Job was deleted before we could confirm terminal state — same as the
|
||||
// fulfilled+jobGone case above: proceed with captured output.
|
||||
await onLog("stdout", `[paperclip] Job ${jobName} was deleted before terminal condition was observed (TTL or external deletion) — proceeding with captured output.\n`);
|
||||
} else if (!actualState.succeeded) {
|
||||
// Job still not terminal — the completion error was likely transient.
|
||||
// Return an error so the UI knows the run is not done, rather than
|
||||
|
||||
Reference in New Issue
Block a user