feat: inherit valueFrom/envFrom env from Deployment; prefer paperclip container

- SelfPodInfo gains inheritedEnvValueFrom (V1EnvVar[]) and inheritedEnvFrom (V1EnvFromSource[])
- Container selection now prefers the container named "paperclip", falls back to first
- buildJobManifest appends valueFrom env vars (skipping names already overridden)
  and sets envFrom on the opencode container when present
- Tests updated: mock updated, 5 new cases covering secretKeyRef forwarding,
  dedup, envFrom passthrough, and empty-envFrom omission

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-04-24 22:12:31 +00:00
parent 84dc0f5930
commit 61d2a42a66
11 changed files with 1367 additions and 60 deletions
+257 -49
View File
@@ -9,10 +9,19 @@ import {
} from "./parse.js";
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
import { buildJobManifest } from "./job-manifest.js";
import { LogLineDedupFilter } from "./log-dedup.js";
import type * as k8s from "@kubernetes/client-node";
import { Writable } from "node:stream";
const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
// Upper bound on how long streamPodLogsOnce will wait after stopSignal fires
// before force-returning, even if logApi.log has not yet resolved. Defensive
// against the K8s client library not propagating writable.destroy() into an
// abort of the underlying HTTP request.
const LOG_STREAM_BAIL_TIMEOUT_MS = 3_000;
const LOG_EXIT_COMPLETION_GRACE_MS = parseInt(process.env.LOG_EXIT_COMPLETION_GRACE_MS ?? "30000", 10);
export function isK8s404(err: unknown): boolean {
@@ -132,55 +141,157 @@ async function waitForPod(
throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`);
}
/**
* Stream pod logs once via follow. Returns accumulated stdout when the
* stream ends (container exit, API disconnect, or abort signal).
*/
async function streamPodLogsOnce(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
sinceSeconds?: number,
dedup?: LogLineDedupFilter,
stopSignal?: { stopped: boolean },
): Promise<string> {
const logApi = getLogApi(kubeconfigPath);
const chunks: string[] = [];
const writable = new Writable({
write(chunk: Buffer, _encoding, callback) {
const text = redactHomePathUserSegments(chunk.toString("utf-8"));
chunks.push(text);
const emitted = dedup ? dedup.filter(text) : text;
if (!emitted) {
callback();
return;
}
void onLog("stdout", emitted).then(() => callback(), callback);
},
});
// When the job completion signal fires, destroy the writable to abort the
// in-flight follow stream. Without this, logApi.log can hang indefinitely
// when the pod terminates without closing the HTTP connection cleanly.
let stopPoller: ReturnType<typeof setInterval> | null = null;
let bailTimer: ReturnType<typeof setTimeout> | null = null;
let bailResolve: (() => void) | null = null;
const bailPromise = new Promise<void>((resolve) => {
bailResolve = resolve;
});
if (stopSignal) {
stopPoller = setInterval(() => {
if (stopSignal.stopped) {
if (!writable.destroyed) writable.destroy();
if (!bailTimer && bailResolve) {
bailTimer = setTimeout(() => {
onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {});
bailResolve!();
}, LOG_STREAM_BAIL_TIMEOUT_MS);
}
}
}, 200);
}
const logPromise = logApi.log(namespace, podName, "opencode", writable, {
follow: true,
pretty: false,
...(sinceSeconds ? { sinceSeconds } : {}),
}).catch(() => {
// follow may fail if the container already exited, the API connection
// dropped, or we aborted via writable.destroy() — not fatal.
});
try {
if (stopSignal) {
await Promise.race([logPromise, bailPromise]);
} else {
await logPromise;
}
} finally {
if (stopPoller) clearInterval(stopPoller);
if (bailTimer) clearTimeout(bailTimer);
}
return chunks.join("");
}
/**
* Stream pod logs with automatic reconnection. Keeps retrying the log
* stream until the stop signal fires (job completed) or the container
* exits normally. This handles silent K8s API connection drops that
* would otherwise cause the UI to stop receiving real output.
*
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*
* onFirstStreamExit is called the first time streamPodLogsOnce returns.
* Used by execute() to start the LOG_EXIT_COMPLETION_GRACE_MS grace timer
* without waiting for all reconnects to exhaust.
*/
async function streamPodLogs(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
dedup?: LogLineDedupFilter,
onFirstStreamExit?: () => void,
): Promise<string> {
const logApi = getLogApi(kubeconfigPath);
const parts: string[] = [];
let lineBuffer = "";
const allChunks: string[] = [];
let attempt = 0;
// Track the timestamp of the last successfully received log line so
// reconnects use a tight window instead of an ever-growing one anchored
// at stream start. This is the primary fix for duplicative logs on reconnect.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
if (!dedup) dedup = new LogLineDedupFilter();
const writable = new Writable({
write(chunk: Buffer, _encoding, callback) {
const incoming = lineBuffer + chunk.toString("utf-8");
const nlIdx = incoming.lastIndexOf("\n");
if (nlIdx === -1) {
// No complete line yet — buffer until newline arrives
lineBuffer = incoming;
callback();
return;
}
lineBuffer = incoming.slice(nlIdx + 1);
// Redact each complete line individually to avoid path splits across chunk boundaries
const redacted = incoming
.slice(0, nlIdx + 1)
.split("\n")
.map((line) => redactHomePathUserSegments(line))
.join("\n");
parts.push(redacted);
void onLog("stdout", redacted).then(() => callback(), callback);
},
});
while (!stopSignal?.stopped) {
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
break;
}
try {
await logApi.log(namespace, podName, "opencode", writable, {
follow: true,
pretty: false,
});
} catch {
// follow may fail if the container already exited
// On reconnect, ask for logs since the last received line (+5s buffer)
// instead of since stream start. This keeps the window tight and
// avoids ever-growing duplicate output.
const sinceSeconds = attempt > 0
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
: undefined;
if (attempt > 0) {
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup, stopSignal);
// Signal first stream exit immediately so the grace-period timer in
// execute() can start without waiting for all reconnects to complete.
if (attempt === 0) onFirstStreamExit?.();
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
// so any log lines in `result` were received up to this moment).
lastLogReceivedAt = Math.floor(Date.now() / 1000);
} else if (attempt === 0) {
// First attempt returned nothing — update timestamp so reconnect
// window stays reasonable.
lastLogReceivedAt = preStreamTs;
}
attempt++;
if (stopSignal?.stopped) break;
// Brief pause before reconnecting to avoid tight loops.
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
}
// Flush any partial line that never received a trailing newline
if (lineBuffer) {
const redacted = redactHomePathUserSegments(lineBuffer);
parts.push(redacted);
await onLog("stdout", redacted);
}
// Flush any buffered partial line so the final assistant/result chunk
// isn't dropped when the stream ends mid-line.
const tail = dedup.flush();
if (tail) await onLog("stdout", tail);
return parts.join("");
return allChunks.join("");
}
async function readPodLogs(
@@ -201,7 +312,7 @@ async function readPodLogs(
}
}
type JobCompletionResult = { succeeded: boolean; timedOut: boolean; jobGone: boolean };
export type JobCompletionResult = { succeeded: boolean; timedOut: boolean; jobGone: boolean };
async function waitForJobCompletion(
namespace: string,
@@ -406,6 +517,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let exitCode: number | null = null;
let jobTimedOut = false;
let podTerminatedReason: string | null = null;
let keepaliveTimer: ReturnType<typeof setInterval> | null = null;
try {
const scheduleTimeoutMs = 120_000;
@@ -427,10 +539,101 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const completionTimeoutMs = timeoutSec > 0 ? (timeoutSec + graceSec) * 1000 : 0;
// Start completion poller in parallel with log streaming
const completionPromise = waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath);
// Shared stop signal: set to true when job completion is detected so
// the log stream stops reconnecting promptly.
const logStopSignal = { stopped: false };
// Shared dedup filter across reconnects so replayed lines inside the
// sinceSeconds overlap window are dropped before reaching the UI.
const logDedup = new LogLineDedupFilter();
stdout = await streamPodLogs(namespace, podName, onLog, kubeconfigPath);
// Keepalive: periodically emit a status line so the Paperclip server
// knows the adapter is still alive during long silent phases.
let lastLogAt = Date.now();
let keepaliveJobTerminal = false;
let consecutiveTerminalReadings = 0;
keepaliveTimer = setInterval(() => {
void (async () => {
if (keepaliveJobTerminal) return;
// Require two consecutive terminal readings before latching to
// guard against a stale K8s API cache returning a false terminal
// status on a single read.
try {
const j = await getBatchApi(kubeconfigPath).readNamespacedJob({ name: jobName, namespace });
const terminal = j.status?.conditions?.some(
(c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True",
);
if (terminal) {
consecutiveTerminalReadings++;
if (consecutiveTerminalReadings >= 2) keepaliveJobTerminal = true;
return;
}
consecutiveTerminalReadings = 0;
} catch {
return;
}
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {});
})();
}, KEEPALIVE_INTERVAL_MS);
// wrappedOnLog updates lastLogAt so the keepalive timer can measure silence.
const wrappedOnLog: typeof onLog = async (stream, chunk) => {
lastLogAt = Date.now();
return onLog(stream, chunk);
};
// Track when the log stream first exits so the grace-period can fire
// if the K8s Job condition lags behind container exit.
let logExitTime: number | null = null;
const trackedLogStream = streamPodLogs(
namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup,
() => { logExitTime = Date.now(); },
);
// completionGraced races waitForJobCompletion against a grace timer that
// fires LOG_EXIT_COMPLETION_GRACE_MS after the log stream exits. This bounds
// the stale-UI window when K8s Job conditions lag container exit.
let gracePoller: ReturnType<typeof setInterval> | null = null;
const completionGraced = new Promise<JobCompletionResult>((resolve, reject) => {
let settled = false;
const settleOk = (r: JobCompletionResult) => {
if (settled) return;
settled = true;
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
logStopSignal.stopped = true;
resolve(r);
};
const settleErr = (err: unknown) => {
if (settled) return;
settled = true;
if (gracePoller) { clearInterval(gracePoller); gracePoller = null; }
logStopSignal.stopped = true;
reject(err);
};
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then(settleOk).catch(settleErr);
gracePoller = setInterval(() => {
if (logExitTime !== null && Date.now() - logExitTime >= LOG_EXIT_COMPLETION_GRACE_MS) {
void onLog("stdout", `[paperclip] Log stream exited ${LOG_EXIT_COMPLETION_GRACE_MS / 1000}s ago without K8s Job condition update — proceeding with captured output\n`).catch(() => {});
settleOk({ succeeded: false, timedOut: false, jobGone: true });
}
}, 1_000);
});
const [logResult, completionResult] = await Promise.allSettled([
trackedLogStream,
completionGraced,
]);
if (keepaliveTimer) {
clearInterval(keepaliveTimer);
keepaliveTimer = null;
}
if (logResult.status === "fulfilled") {
stdout = logResult.value;
}
if (!stdout.trim()) {
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
@@ -448,19 +651,24 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
}
// After log stream exits, wait at most LOG_EXIT_COMPLETION_GRACE_MS for the job
// condition to settle — avoids racing TTL cleanup vs condition update lag
const completion = await completionWithGrace(completionPromise, LOG_EXIT_COMPLETION_GRACE_MS);
jobTimedOut = completion.timedOut;
if (completion.jobGone) {
await onLog("stdout", `[paperclip] Job ${jobName} not found (likely TTL-cleaned after completion).\n`);
if (completionResult.status === "fulfilled") {
const completion = completionResult.value;
jobTimedOut = completion.timedOut;
if (completion.jobGone) {
await onLog("stdout", `[paperclip] Job ${jobName} not found (likely TTL-cleaned after completion).\n`);
}
} else {
jobTimedOut = true;
}
const terminatedInfo = await getPodTerminatedInfo(namespace, jobName, kubeconfigPath);
exitCode = terminatedInfo.exitCode;
podTerminatedReason = terminatedInfo.reason;
} finally {
if (keepaliveTimer) {
clearInterval(keepaliveTimer);
keepaliveTimer = null;
}
if (!retainJobs) {
await cleanupJob(namespace, jobName, onLog, kubeconfigPath);
} else {