From 3fe4721da6455206366615162df3a5a2022474e2 Mon Sep 17 00:00:00 2001 From: Test User Date: Tue, 21 Apr 2026 20:13:58 +0000 Subject: [PATCH] fix: eliminate reconnect duplicate logs with KMP-based dedup (FAR-105) The prior fix (sinceSeconds window anchored to lastLogReceivedAt) still caused duplicates: on reconnect, the K8s API re-streams N seconds of already-seen content and those bytes were forwarded to onLog verbatim. New approach: - First stream attempt: emit chunks to onLog in real-time as before. - Reconnect attempts: buffer all incoming chunks rather than emitting immediately, then call findNewLogContent() to strip any prefix of the buffered data that overlaps with content already sent, and forward only the genuinely new suffix. - findNewLogContent uses the KMP failure-function algorithm (O(N)) to find the longest prefix of the reconnect data that matches a suffix of the accumulated content, guaranteeing correct deduplication even for large sinceSeconds windows or rapidly repeated lines. Co-Authored-By: Paperclip --- package-lock.json | 4 +- package.json | 2 +- src/server/execute.ts | 97 ++++++++++++++++++++++++++++++++++--------- 3 files changed, 81 insertions(+), 22 deletions(-) diff --git a/package-lock.json b/package-lock.json index 24e370a..324f64f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@farhoodliquor/paperclip-adapter-claude-k8s", - "version": "0.1.25", + "version": "0.1.26", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@farhoodliquor/paperclip-adapter-claude-k8s", - "version": "0.1.25", + "version": "0.1.26", "license": "MIT", "dependencies": { "@kubernetes/client-node": "^1.0.0", diff --git a/package.json b/package.json index e363980..54a7cb3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@farhoodliquor/paperclip-adapter-claude-k8s", - "version": "0.1.25", + "version": "0.1.26", "description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs", "license": "MIT", "repository": { diff --git a/src/server/execute.ts b/src/server/execute.ts index 78994f5..0bc3581 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -163,12 +163,54 @@ async function streamPodLogsOnce( return chunks.join(""); } +/** + * Given content already sent to the UI and new content from a reconnect + * stream, return only the portion that is genuinely new. + * + * The K8s sinceSeconds parameter causes reconnect streams to begin with + * content that was already sent during a prior stream attempt. We find + * the longest prefix of reconnectContent that overlaps with the tail of + * existingContent (i.e. the largest block of already-seen bytes at the + * start of the reconnect data) and skip it. + * + * Uses the KMP failure-function algorithm for O(|reconnectContent|) time. + */ +function findNewLogContent(existingContent: string, reconnectContent: string): string { + if (!reconnectContent) return ""; + if (!existingContent) return reconnectContent; + + // The overlap can be at most reconnectContent.length bytes long, so we + // only need to inspect the matching-length tail of existingContent. + const limit = Math.min(reconnectContent.length, existingContent.length); + const pattern = reconnectContent.slice(0, limit); // potential overlap prefix + const text = existingContent.slice(-limit); // tail to search in + + // Build KMP failure function on (pattern + sentinel + text). + // The value of f at the final position equals the longest prefix of + // `pattern` that is simultaneously a suffix of `text` — the overlap length. + const sep = "\x00"; // sentinel unlikely to appear in log bytes + const s = pattern + sep + text; + const f = new Int32Array(s.length); + let k = 0; + for (let i = 1; i < s.length; i++) { + while (k > 0 && s.charCodeAt(i) !== s.charCodeAt(k)) k = f[k - 1]; + if (s.charCodeAt(i) === s.charCodeAt(k)) k++; + f[i] = k; + } + return reconnectContent.slice(f[s.length - 1]); +} + /** * Stream pod logs with automatic reconnection. Keeps retrying the log * stream until the stop signal fires (job completed) or the container * exits normally. This handles silent K8s API connection drops that * would otherwise cause the UI to stop receiving real output. * + * On reconnects, content is buffered rather than emitted directly. + * findNewLogContent then strips any overlap introduced by sinceSeconds, + * and only the genuinely new bytes are forwarded to onLog. This is the + * definitive fix for FAR-105 duplicate log output. + * * Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect * loops during sustained API partitions. */ @@ -179,11 +221,10 @@ async function streamPodLogs( kubeconfigPath?: string, stopSignal?: { stopped: boolean }, ): Promise { - const allChunks: string[] = []; + let accumulated = ""; // all log content received and emitted so far let attempt = 0; - // Track the timestamp of the last successfully received log line so - // reconnects use a tight window instead of an ever-growing one anchored - // at stream start. This is the primary fix for FAR-105 duplicative logs. + // Updated per-chunk (inside callbacks) so it reflects when the last + // byte of log data actually arrived, not when the stream object closed. let lastLogReceivedAt = Math.floor(Date.now() / 1000); while (!stopSignal?.stopped) { @@ -192,9 +233,8 @@ async function streamPodLogs( break; } - // On reconnect, ask for logs since the last received line (+5s buffer) - // instead of since stream start. This keeps the window tight and - // avoids ever-growing duplicate output. + // On reconnect, ask for logs since the last received chunk (+5s buffer) + // to avoid missing output produced during the reconnect gap. const sinceSeconds = attempt > 0 ? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5) : undefined; @@ -203,18 +243,37 @@ async function streamPodLogs( await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`); } - const preStreamTs = Math.floor(Date.now() / 1000); - const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds); - if (result) { - allChunks.push(result); - // Update last-received timestamp to now (the stream just ended, - // so any log lines in `result` were received up to this moment). - lastLogReceivedAt = Math.floor(Date.now() / 1000); - } else if (attempt === 0) { - // First attempt returned nothing — update timestamp so reconnect - // window stays reasonable. - lastLogReceivedAt = preStreamTs; + if (attempt === 0) { + // First attempt: emit chunks to onLog in real-time and track content. + const trackingOnLog: typeof onLog = async (stream, chunk) => { + accumulated += chunk; + lastLogReceivedAt = Math.floor(Date.now() / 1000); + return onLog(stream, chunk); + }; + await streamPodLogsOnce(namespace, podName, trackingOnLog, kubeconfigPath, undefined); + } else { + // Reconnect: buffer all chunks received from this stream attempt. + // The sinceSeconds window means the stream begins with content that + // was already emitted — we must not forward those bytes again. + const reconnectChunks: string[] = []; + const bufferingLog: typeof onLog = async (_stream, chunk) => { + reconnectChunks.push(chunk); + lastLogReceivedAt = Math.floor(Date.now() / 1000); + }; + await streamPodLogsOnce(namespace, podName, bufferingLog, kubeconfigPath, sinceSeconds); + + const reconnectContent = reconnectChunks.join(""); + if (reconnectContent) { + // Strip the overlapping prefix (already-sent content) and emit only + // what extends beyond what the UI has already seen. + const newContent = findNewLogContent(accumulated, reconnectContent); + accumulated += newContent; + if (newContent) { + await onLog("stdout", newContent); + } + } } + attempt++; // If the job is done or the container exited, no need to reconnect. @@ -224,7 +283,7 @@ async function streamPodLogs( await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS)); } - return allChunks.join(""); + return accumulated; } /** -- 2.52.0