fix: eliminate reconnect duplicate logs with KMP dedup (FAR-105) #4

Closed
farhoodliquor-paperclip[bot] wants to merge 1 commits from fix/far-105-dedup-kmp-v2 into master
3 changed files with 81 additions and 22 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.25",
"version": "0.1.26",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.25",
"version": "0.1.26",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.25",
"version": "0.1.26",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+78 -19
View File
@@ -163,12 +163,54 @@ async function streamPodLogsOnce(
return chunks.join("");
}
/**
* Given content already sent to the UI and new content from a reconnect
* stream, return only the portion that is genuinely new.
*
* The K8s sinceSeconds parameter causes reconnect streams to begin with
* content that was already sent during a prior stream attempt. We find
* the longest prefix of reconnectContent that overlaps with the tail of
* existingContent (i.e. the largest block of already-seen bytes at the
* start of the reconnect data) and skip it.
*
* Uses the KMP failure-function algorithm for O(|reconnectContent|) time.
*/
function findNewLogContent(existingContent: string, reconnectContent: string): string {
if (!reconnectContent) return "";
if (!existingContent) return reconnectContent;
// The overlap can be at most reconnectContent.length bytes long, so we
// only need to inspect the matching-length tail of existingContent.
const limit = Math.min(reconnectContent.length, existingContent.length);
const pattern = reconnectContent.slice(0, limit); // potential overlap prefix
const text = existingContent.slice(-limit); // tail to search in
// Build KMP failure function on (pattern + sentinel + text).
// The value of f at the final position equals the longest prefix of
// `pattern` that is simultaneously a suffix of `text` — the overlap length.
const sep = "\x00"; // sentinel unlikely to appear in log bytes
const s = pattern + sep + text;
const f = new Int32Array(s.length);
let k = 0;
for (let i = 1; i < s.length; i++) {
while (k > 0 && s.charCodeAt(i) !== s.charCodeAt(k)) k = f[k - 1];
if (s.charCodeAt(i) === s.charCodeAt(k)) k++;
f[i] = k;
}
return reconnectContent.slice(f[s.length - 1]);
}
/**
* Stream pod logs with automatic reconnection. Keeps retrying the log
* stream until the stop signal fires (job completed) or the container
* exits normally. This handles silent K8s API connection drops that
* would otherwise cause the UI to stop receiving real output.
*
* On reconnects, content is buffered rather than emitted directly.
* findNewLogContent then strips any overlap introduced by sinceSeconds,
* and only the genuinely new bytes are forwarded to onLog. This is the
* definitive fix for FAR-105 duplicate log output.
*
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*/
@@ -179,11 +221,10 @@ async function streamPodLogs(
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
): Promise<string> {
const allChunks: string[] = [];
let accumulated = ""; // all log content received and emitted so far
let attempt = 0;
// Track the timestamp of the last successfully received log line so
// reconnects use a tight window instead of an ever-growing one anchored
// at stream start. This is the primary fix for FAR-105 duplicative logs.
// Updated per-chunk (inside callbacks) so it reflects when the last
// byte of log data actually arrived, not when the stream object closed.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
while (!stopSignal?.stopped) {
@@ -192,9 +233,8 @@ async function streamPodLogs(
break;
}
// On reconnect, ask for logs since the last received line (+5s buffer)
// instead of since stream start. This keeps the window tight and
// avoids ever-growing duplicate output.
// On reconnect, ask for logs since the last received chunk (+5s buffer)
// to avoid missing output produced during the reconnect gap.
const sinceSeconds = attempt > 0
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
: undefined;
@@ -203,18 +243,37 @@ async function streamPodLogs(
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
// so any log lines in `result` were received up to this moment).
lastLogReceivedAt = Math.floor(Date.now() / 1000);
} else if (attempt === 0) {
// First attempt returned nothing — update timestamp so reconnect
// window stays reasonable.
lastLogReceivedAt = preStreamTs;
if (attempt === 0) {
// First attempt: emit chunks to onLog in real-time and track content.
const trackingOnLog: typeof onLog = async (stream, chunk) => {
accumulated += chunk;
lastLogReceivedAt = Math.floor(Date.now() / 1000);
return onLog(stream, chunk);
};
await streamPodLogsOnce(namespace, podName, trackingOnLog, kubeconfigPath, undefined);
} else {
// Reconnect: buffer all chunks received from this stream attempt.
// The sinceSeconds window means the stream begins with content that
// was already emitted — we must not forward those bytes again.
const reconnectChunks: string[] = [];
const bufferingLog: typeof onLog = async (_stream, chunk) => {
reconnectChunks.push(chunk);
lastLogReceivedAt = Math.floor(Date.now() / 1000);
};
await streamPodLogsOnce(namespace, podName, bufferingLog, kubeconfigPath, sinceSeconds);
const reconnectContent = reconnectChunks.join("");
if (reconnectContent) {
// Strip the overlapping prefix (already-sent content) and emit only
// what extends beyond what the UI has already seen.
const newContent = findNewLogContent(accumulated, reconnectContent);
accumulated += newContent;
if (newContent) {
await onLog("stdout", newContent);
}
}
}
attempt++;
// If the job is done or the container exited, no need to reconnect.
@@ -224,7 +283,7 @@ async function streamPodLogs(
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
}
return allChunks.join("");
return accumulated;
}
/**