fix: prevent prompt Secret leak by attaching ownerReference to Job (FAR-15)

When a large prompt creates a K8s Secret, it can orphan if the process
crashes before the finally block runs. Now the Secret gets an
ownerReference pointing to the Job after creation, so K8s GC cleans it
up automatically. Also cleans up the Secret on job creation failure.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Gandalf the Greybeard
2026-04-23 23:29:47 +00:00
parent 346f5cc1df
commit 21a02da00f
+64 -7
View File
@@ -348,6 +348,7 @@ async function streamPodLogs(
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
dedup?: LogLineDedupFilter,
): Promise<string> {
const allChunks: string[] = [];
let attempt = 0;
@@ -357,7 +358,7 @@ async function streamPodLogs(
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Shared across reconnects so replayed lines inside the `sinceSeconds`
// overlap window are dropped before they reach the streaming UI (FAR-123).
const dedup = new LogLineDedupFilter();
if (!dedup) dedup = new LogLineDedupFilter();
while (!stopSignal?.stopped) {
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
@@ -754,11 +755,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
// Create the Job
let createdJobUid: string | undefined;
try {
await batchApi.createNamespacedJob({ namespace, body: job });
const created = await batchApi.createNamespacedJob({ namespace, body: job });
createdJobUid = created.metadata?.uid;
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
if (promptSecret) {
try {
await coreApi.deleteNamespacedSecret({ name: promptSecret.name, namespace: promptSecret.namespace });
} catch { /* best-effort */ }
}
return {
exitCode: null,
signal: null,
@@ -768,6 +776,35 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
// Attach ownerReference so K8s GC cleans up the Secret if the process
// crashes before the finally block runs.
if (promptSecret && createdJobUid) {
try {
await coreApi.patchNamespacedSecret({
name: promptSecret.name,
namespace: promptSecret.namespace,
body: [
{
op: "add",
path: "/metadata/ownerReferences",
value: [
{
apiVersion: "batch/v1",
kind: "Job",
name: jobName,
uid: createdJobUid,
blockOwnerDeletion: false,
},
],
},
],
});
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Warning: failed to set ownerReference on prompt Secret: ${msg}\n`);
}
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
}
@@ -856,6 +893,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let keepaliveTick = 0;
let keepaliveJobTerminal = false;
let keepaliveJobTerminalAt: number | null = null;
let consecutiveTerminalReadings = 0;
keepaliveTimer = setInterval(() => {
// Fire-and-forget the async work; setInterval callbacks must be
// synchronous or the timer will drift.
@@ -878,19 +916,32 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
// Verify the Job is still alive before announcing or refreshing.
// Require two consecutive terminal readings before latching to
// guard against a stale K8s API cache returning a false terminal
// status on a single read (finding #5, FAR-15).
try {
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
const terminal = job.status?.conditions?.some(
(c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True",
);
if (terminal) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
if (ctx.onSpawn) {
consecutiveTerminalReadings++;
if (consecutiveTerminalReadings >= 2) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
if (ctx.onSpawn) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
return;
}
// First terminal reading — do not latch yet; next tick confirms.
keepaliveTick++;
if (ctx.onSpawn && (keepaliveTick === 1 || keepaliveTick % 12 === 0)) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
return;
}
consecutiveTerminalReadings = 0;
} catch (err: unknown) {
// Only treat 404 (Job deleted) as terminal. Transient 5xx or
// connection resets should NOT permanently disable the keepalive —
@@ -931,9 +982,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// Shared signal: when job completion resolves, tell the log
// streamer to stop reconnecting.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
const [logResult, completionResult] = await Promise.allSettled([
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal),
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup),
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then((r) => {
logStopSignal.stopped = true;
return r;
@@ -970,9 +1024,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
if (!stdout.trim() && oneShotLogs.trim()) {
stdout = oneShotLogs;
await onLog("stdout", stdout);
const deduped = logDedup.filter(stdout) + logDedup.flush();
if (deduped) await onLog("stdout", deduped);
} else if (oneShotLogs && oneShotLogs.length > stdout.length) {
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
const deduped = logDedup.filter(oneShotLogs) + logDedup.flush();
if (deduped) await onLog("stdout", deduped);
stdout = oneShotLogs;
}
}