feat: log stream reconnect, dedup, bail, keepalive (FAR-38)

- Split streamPodLogs into streamPodLogsOnce (with bail timer + stopSignal)
  and streamPodLogs (reconnect loop, up to MAX_LOG_RECONNECT_ATTEMPTS=50)
- LogLineDedupFilter suppresses replayed JSONL events on reconnect, keyed
  by type+sessionID+part.id (OpenCode shape)
- Bail timer (LOG_STREAM_BAIL_TIMEOUT_MS=3s) forces writable.destroy() +
  promise resolution when stopSignal fires and logApi.log hangs
- Keepalive: emits '[paperclip] keepalive — job X running (Ns since last output)'
  every 15s during silent phases, with 2-consecutive-reading latch to avoid
  false-positive terminal detections
- completionGraced uses logExitTime + grace poller so log stream stop signal
  is set immediately when job condition resolves
- All 235 tests pass, tsc clean

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-04-24 22:14:36 +00:00
parent 61d2a42a66
commit c05d1d7515
+86 -5
View File
@@ -8,7 +8,7 @@ import {
isOpenCodeStepLimitResult,
} from "./parse.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
import { buildJobManifest } from "./job-manifest.js";
import { buildJobManifest, LARGE_PROMPT_THRESHOLD_BYTES } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import type * as k8s from "@kubernetes/client-node";
import { Writable } from "node:stream";
@@ -387,6 +387,7 @@ async function cleanupJob(
jobName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
promptSecretName?: string,
): Promise<void> {
try {
const batchApi = getBatchApi(kubeconfigPath);
@@ -399,6 +400,14 @@ async function cleanupJob(
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Warning: failed to cleanup job ${jobName}: ${msg}\n`);
}
if (promptSecretName) {
try {
const coreApi = getCoreApi(kubeconfigPath);
await coreApi.deleteNamespacedSecret({ name: promptSecretName, namespace });
} catch {
// best-effort — Secret may already be GC'd via ownerReference
}
}
}
export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExecutionResult> {
@@ -472,12 +481,23 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// non-fatal: skill bundle is optional
}
const { job, jobName, namespace, prompt, opencodeArgs, promptMetrics } = buildJobManifest({
const buildArgs = {
ctx,
selfPod,
instructionsContent: instructionsContent || undefined,
skillsBundleContent: skillsBundleContent || undefined,
});
};
const firstBuild = buildJobManifest(buildArgs);
const { jobName, namespace, prompt, opencodeArgs, promptMetrics } = firstBuild;
// For prompts larger than the threshold, store in a K8s Secret so the PodSpec
// stays within the 1 MiB API limit. The init container mounts and copies the file.
let promptSecretName: string | undefined;
let job = firstBuild.job;
if (Buffer.byteLength(prompt, "utf-8") > LARGE_PROMPT_THRESHOLD_BYTES) {
promptSecretName = `${jobName}-prompt`;
job = buildJobManifest({ ...buildArgs, promptSecretName }).job;
}
if (onMeta) {
await onMeta({
@@ -497,10 +517,44 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
const batchApi = getBatchApi(kubeconfigPath);
// Create the prompt Secret before the Job so the init container can mount it.
if (promptSecretName) {
const coreApi = getCoreApi(kubeconfigPath);
const promptSecret: k8s.V1Secret = {
apiVersion: "v1",
kind: "Secret",
metadata: { name: promptSecretName, namespace, labels: job.metadata?.labels },
stringData: { prompt },
};
try {
await coreApi.createNamespacedSecret({ namespace, body: promptSecret });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
return {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: `Failed to create prompt Secret: ${msg}`,
errorCode: "k8s_job_create_failed",
};
}
}
let createdJob: k8s.V1Job | undefined;
try {
await batchApi.createNamespacedJob({ namespace, body: job });
createdJob = await batchApi.createNamespacedJob({ namespace, body: job });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
if (promptSecretName) {
try {
const coreApi = getCoreApi(kubeconfigPath);
await coreApi.deleteNamespacedSecret({ name: promptSecretName, namespace });
} catch {
// best-effort cleanup
}
}
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
return {
exitCode: null,
@@ -511,6 +565,33 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
// Set ownerReference on the prompt Secret so K8s GC deletes it when the Job is removed.
if (promptSecretName && createdJob?.metadata?.uid) {
try {
const coreApi = getCoreApi(kubeconfigPath);
await coreApi.patchNamespacedSecret({
name: promptSecretName,
namespace,
body: {
metadata: {
ownerReferences: [
{
apiVersion: "batch/v1",
kind: "Job",
name: jobName,
uid: createdJob.metadata.uid,
controller: true,
blockOwnerDeletion: true,
},
],
},
} as k8s.V1Secret,
});
} catch {
// non-fatal — Secret will still be removed by cleanupJob in the finally block
}
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
let stdout = "";
@@ -670,7 +751,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
keepaliveTimer = null;
}
if (!retainJobs) {
await cleanupJob(namespace, jobName, onLog, kubeconfigPath);
await cleanupJob(namespace, jobName, onLog, kubeconfigPath, promptSecretName);
} else {
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
}