Compare commits

...

1 Commits

Author SHA1 Message Date
Test User 3fe4721da6 fix: eliminate reconnect duplicate logs with KMP-based dedup (FAR-105)
The prior fix (sinceSeconds window anchored to lastLogReceivedAt) still
caused duplicates: on reconnect, the K8s API re-streams N seconds of
already-seen content and those bytes were forwarded to onLog verbatim.

New approach:
- First stream attempt: emit chunks to onLog in real-time as before.
- Reconnect attempts: buffer all incoming chunks rather than emitting
  immediately, then call findNewLogContent() to strip any prefix of the
  buffered data that overlaps with content already sent, and forward only
  the genuinely new suffix.
- findNewLogContent uses the KMP failure-function algorithm (O(N)) to
  find the longest prefix of the reconnect data that matches a suffix of
  the accumulated content, guaranteeing correct deduplication even for
  large sinceSeconds windows or rapidly repeated lines.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-21 20:13:58 +00:00
3 changed files with 81 additions and 22 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.25",
"version": "0.1.26",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.25",
"version": "0.1.26",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.25",
"version": "0.1.26",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+78 -19
View File
@@ -163,12 +163,54 @@ async function streamPodLogsOnce(
return chunks.join("");
}
/**
* Given content already sent to the UI and new content from a reconnect
* stream, return only the portion that is genuinely new.
*
* The K8s sinceSeconds parameter causes reconnect streams to begin with
* content that was already sent during a prior stream attempt. We find
* the longest prefix of reconnectContent that overlaps with the tail of
* existingContent (i.e. the largest block of already-seen bytes at the
* start of the reconnect data) and skip it.
*
* Uses the KMP failure-function algorithm for O(|reconnectContent|) time.
*/
function findNewLogContent(existingContent: string, reconnectContent: string): string {
if (!reconnectContent) return "";
if (!existingContent) return reconnectContent;
// The overlap can be at most reconnectContent.length bytes long, so we
// only need to inspect the matching-length tail of existingContent.
const limit = Math.min(reconnectContent.length, existingContent.length);
const pattern = reconnectContent.slice(0, limit); // potential overlap prefix
const text = existingContent.slice(-limit); // tail to search in
// Build KMP failure function on (pattern + sentinel + text).
// The value of f at the final position equals the longest prefix of
// `pattern` that is simultaneously a suffix of `text` — the overlap length.
const sep = "\x00"; // sentinel unlikely to appear in log bytes
const s = pattern + sep + text;
const f = new Int32Array(s.length);
let k = 0;
for (let i = 1; i < s.length; i++) {
while (k > 0 && s.charCodeAt(i) !== s.charCodeAt(k)) k = f[k - 1];
if (s.charCodeAt(i) === s.charCodeAt(k)) k++;
f[i] = k;
}
return reconnectContent.slice(f[s.length - 1]);
}
/**
* Stream pod logs with automatic reconnection. Keeps retrying the log
* stream until the stop signal fires (job completed) or the container
* exits normally. This handles silent K8s API connection drops that
* would otherwise cause the UI to stop receiving real output.
*
* On reconnects, content is buffered rather than emitted directly.
* findNewLogContent then strips any overlap introduced by sinceSeconds,
* and only the genuinely new bytes are forwarded to onLog. This is the
* definitive fix for FAR-105 duplicate log output.
*
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
* loops during sustained API partitions.
*/
@@ -179,11 +221,10 @@ async function streamPodLogs(
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
): Promise<string> {
const allChunks: string[] = [];
let accumulated = ""; // all log content received and emitted so far
let attempt = 0;
// Track the timestamp of the last successfully received log line so
// reconnects use a tight window instead of an ever-growing one anchored
// at stream start. This is the primary fix for FAR-105 duplicative logs.
// Updated per-chunk (inside callbacks) so it reflects when the last
// byte of log data actually arrived, not when the stream object closed.
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
while (!stopSignal?.stopped) {
@@ -192,9 +233,8 @@ async function streamPodLogs(
break;
}
// On reconnect, ask for logs since the last received line (+5s buffer)
// instead of since stream start. This keeps the window tight and
// avoids ever-growing duplicate output.
// On reconnect, ask for logs since the last received chunk (+5s buffer)
// to avoid missing output produced during the reconnect gap.
const sinceSeconds = attempt > 0
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
: undefined;
@@ -203,18 +243,37 @@ async function streamPodLogs(
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
}
const preStreamTs = Math.floor(Date.now() / 1000);
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
if (result) {
allChunks.push(result);
// Update last-received timestamp to now (the stream just ended,
// so any log lines in `result` were received up to this moment).
lastLogReceivedAt = Math.floor(Date.now() / 1000);
} else if (attempt === 0) {
// First attempt returned nothing — update timestamp so reconnect
// window stays reasonable.
lastLogReceivedAt = preStreamTs;
if (attempt === 0) {
// First attempt: emit chunks to onLog in real-time and track content.
const trackingOnLog: typeof onLog = async (stream, chunk) => {
accumulated += chunk;
lastLogReceivedAt = Math.floor(Date.now() / 1000);
return onLog(stream, chunk);
};
await streamPodLogsOnce(namespace, podName, trackingOnLog, kubeconfigPath, undefined);
} else {
// Reconnect: buffer all chunks received from this stream attempt.
// The sinceSeconds window means the stream begins with content that
// was already emitted — we must not forward those bytes again.
const reconnectChunks: string[] = [];
const bufferingLog: typeof onLog = async (_stream, chunk) => {
reconnectChunks.push(chunk);
lastLogReceivedAt = Math.floor(Date.now() / 1000);
};
await streamPodLogsOnce(namespace, podName, bufferingLog, kubeconfigPath, sinceSeconds);
const reconnectContent = reconnectChunks.join("");
if (reconnectContent) {
// Strip the overlapping prefix (already-sent content) and emit only
// what extends beyond what the UI has already seen.
const newContent = findNewLogContent(accumulated, reconnectContent);
accumulated += newContent;
if (newContent) {
await onLog("stdout", newContent);
}
}
}
attempt++;
// If the job is done or the container exited, no need to reconnect.
@@ -224,7 +283,7 @@ async function streamPodLogs(
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
}
return allChunks.join("");
return accumulated;
}
/**