fix: P1 correctness and operational fixes from FAR-104/FAR-105 analysis
5. Cap log stream reconnect attempts at 50 — prevents infinite
reconnect loops during sustained API partitions.
6. Fire keepalive refresh earlier — tick 1 + every 12 ticks (~3min)
instead of every 16 ticks (~4min), providing better safety margin
under the 5-minute reaper window.
7. Catch rejections from onLog inside keepalive — add .catch(() => {})
to prevent unhandledRejection on SSE backpressure.
8. Prevent sanitized-name collisions — extend slugs to 16 chars each,
add a 6-char SHA-256 hash suffix, shorten prefix to `ac-` to stay
well within the 63-char DNS label limit.
10. Fix config-hint parity for nodeSelector and labels — parse both
`key=value` multiline text and JSON objects, matching what the
textarea hint promises.
11. Large-prompt fallback via Secret — prompts >256 KiB are staged as a
K8s Secret and mounted as a volume instead of passed via env var,
protecting against the ~1 MiB PodSpec limit.
13. Track last-seen log timestamp on reconnect — anchor sinceSeconds at
the last received log line instead of stream start, fixing FAR-105
duplicative logs. Belt-and-braces: dedupe assistantTexts at the
parser boundary in parse.ts.
Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
+79
-12
@@ -14,6 +14,7 @@ import { Writable } from "node:stream";
|
||||
const POLL_INTERVAL_MS = 2000;
|
||||
const KEEPALIVE_INTERVAL_MS = 15_000;
|
||||
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
|
||||
const MAX_LOG_RECONNECT_ATTEMPTS = 50;
|
||||
|
||||
/**
|
||||
* Wait for the Job's pod to reach a terminal or running state.
|
||||
@@ -167,6 +168,9 @@ async function streamPodLogsOnce(
|
||||
* stream until the stop signal fires (job completed) or the container
|
||||
* exits normally. This handles silent K8s API connection drops that
|
||||
* would otherwise cause the UI to stop receiving real output.
|
||||
*
|
||||
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
|
||||
* loops during sustained API partitions.
|
||||
*/
|
||||
async function streamPodLogs(
|
||||
namespace: string,
|
||||
@@ -177,22 +181,40 @@ async function streamPodLogs(
|
||||
): Promise<string> {
|
||||
const allChunks: string[] = [];
|
||||
let attempt = 0;
|
||||
const streamStartedAt = Math.floor(Date.now() / 1000);
|
||||
// Track the timestamp of the last successfully received log line so
|
||||
// reconnects use a tight window instead of an ever-growing one anchored
|
||||
// at stream start. This is the primary fix for FAR-105 duplicative logs.
|
||||
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
|
||||
while (!stopSignal?.stopped) {
|
||||
// On reconnect, ask for logs since the stream originally started to
|
||||
// avoid missing output during the reconnect gap. Duplicates are
|
||||
// tolerable — the UI deduplicates log chunks.
|
||||
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
|
||||
await onLog("stderr", `[paperclip] Log stream: max reconnect attempts (${MAX_LOG_RECONNECT_ATTEMPTS}) reached — giving up.\n`);
|
||||
break;
|
||||
}
|
||||
|
||||
// On reconnect, ask for logs since the last received line (+5s buffer)
|
||||
// instead of since stream start. This keeps the window tight and
|
||||
// avoids ever-growing duplicate output.
|
||||
const sinceSeconds = attempt > 0
|
||||
? Math.max(1, Math.floor(Date.now() / 1000) - streamStartedAt + 5)
|
||||
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
|
||||
: undefined;
|
||||
|
||||
if (attempt > 0) {
|
||||
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt})...\n`);
|
||||
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
|
||||
}
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
|
||||
if (result) allChunks.push(result);
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
// so any log lines in `result` were received up to this moment).
|
||||
lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
} else if (attempt === 0) {
|
||||
// First attempt returned nothing — update timestamp so reconnect
|
||||
// window stays reasonable.
|
||||
lastLogReceivedAt = preStreamTs;
|
||||
}
|
||||
attempt++;
|
||||
|
||||
// If the job is done or the container exited, no need to reconnect.
|
||||
@@ -373,7 +395,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
}
|
||||
|
||||
// Build Job manifest
|
||||
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics } = buildJobManifest({
|
||||
const { job, jobName, namespace, prompt, claudeArgs, promptMetrics, promptSecret } = buildJobManifest({
|
||||
ctx,
|
||||
selfPod,
|
||||
});
|
||||
@@ -396,6 +418,42 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
} as Parameters<typeof onMeta>[0]);
|
||||
}
|
||||
|
||||
// If the prompt is large, create a Secret to hold it (avoids the ~1 MiB
|
||||
// PodSpec limit). The Secret is cleaned up in the finally block.
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
if (promptSecret) {
|
||||
try {
|
||||
await coreApi.createNamespacedSecret({
|
||||
namespace: promptSecret.namespace,
|
||||
body: {
|
||||
apiVersion: "v1",
|
||||
kind: "Secret",
|
||||
metadata: {
|
||||
name: promptSecret.name,
|
||||
namespace: promptSecret.namespace,
|
||||
labels: {
|
||||
"app.kubernetes.io/managed-by": "paperclip",
|
||||
"paperclip.io/adapter-type": "claude_k8s",
|
||||
"paperclip.io/run-id": runId,
|
||||
},
|
||||
},
|
||||
stringData: promptSecret.data,
|
||||
},
|
||||
});
|
||||
await onLog("stdout", `[paperclip] Created prompt Secret: ${promptSecret.name} (${Math.round(Buffer.byteLength(prompt, "utf-8") / 1024)} KiB)\n`);
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Failed to create prompt Secret: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Failed to create prompt Secret: ${msg}`,
|
||||
errorCode: "k8s_prompt_secret_create_failed",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Create the Job
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
try {
|
||||
@@ -517,12 +575,13 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
}
|
||||
|
||||
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
|
||||
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`);
|
||||
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`).catch(() => {});
|
||||
|
||||
// Refresh updatedAt every ~4 minutes (16 ticks × 15s) to stay
|
||||
// well within the 5-minute reaper staleness window.
|
||||
// Refresh updatedAt every ~3 minutes (12 ticks × 15s = 180s) to
|
||||
// stay well within the 5-minute reaper staleness window. Also
|
||||
// fire on tick 1 for an early safety margin after job start.
|
||||
keepaliveTick++;
|
||||
if (ctx.onSpawn && keepaliveTick % 16 === 0) {
|
||||
if (ctx.onSpawn && (keepaliveTick === 1 || keepaliveTick % 12 === 0)) {
|
||||
void ctx.onSpawn({ pid: -1, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
|
||||
}
|
||||
})();
|
||||
@@ -607,6 +666,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
} else {
|
||||
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
|
||||
}
|
||||
// Clean up prompt Secret if one was created
|
||||
if (promptSecret) {
|
||||
try {
|
||||
await coreApi.deleteNamespacedSecret({ name: promptSecret.name, namespace: promptSecret.namespace });
|
||||
} catch {
|
||||
// Best-effort cleanup — TTL or manual deletion will catch stragglers
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Parse Claude output (reuse claude_local parsing)
|
||||
|
||||
Reference in New Issue
Block a user