fix: eliminate reconnect duplicate logs with KMP dedup (FAR-105) #4
Generated
+2
-2
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.25",
|
||||
"version": "0.1.26",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.25",
|
||||
"version": "0.1.26",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@kubernetes/client-node": "^1.0.0",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.25",
|
||||
"version": "0.1.26",
|
||||
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
|
||||
"license": "MIT",
|
||||
"repository": {
|
||||
|
||||
+78
-19
@@ -163,12 +163,54 @@ async function streamPodLogsOnce(
|
||||
return chunks.join("");
|
||||
}
|
||||
|
||||
/**
|
||||
* Given content already sent to the UI and new content from a reconnect
|
||||
* stream, return only the portion that is genuinely new.
|
||||
*
|
||||
* The K8s sinceSeconds parameter causes reconnect streams to begin with
|
||||
* content that was already sent during a prior stream attempt. We find
|
||||
* the longest prefix of reconnectContent that overlaps with the tail of
|
||||
* existingContent (i.e. the largest block of already-seen bytes at the
|
||||
* start of the reconnect data) and skip it.
|
||||
*
|
||||
* Uses the KMP failure-function algorithm for O(|reconnectContent|) time.
|
||||
*/
|
||||
function findNewLogContent(existingContent: string, reconnectContent: string): string {
|
||||
if (!reconnectContent) return "";
|
||||
if (!existingContent) return reconnectContent;
|
||||
|
||||
// The overlap can be at most reconnectContent.length bytes long, so we
|
||||
// only need to inspect the matching-length tail of existingContent.
|
||||
const limit = Math.min(reconnectContent.length, existingContent.length);
|
||||
const pattern = reconnectContent.slice(0, limit); // potential overlap prefix
|
||||
const text = existingContent.slice(-limit); // tail to search in
|
||||
|
||||
// Build KMP failure function on (pattern + sentinel + text).
|
||||
// The value of f at the final position equals the longest prefix of
|
||||
// `pattern` that is simultaneously a suffix of `text` — the overlap length.
|
||||
const sep = "\x00"; // sentinel unlikely to appear in log bytes
|
||||
const s = pattern + sep + text;
|
||||
const f = new Int32Array(s.length);
|
||||
let k = 0;
|
||||
for (let i = 1; i < s.length; i++) {
|
||||
while (k > 0 && s.charCodeAt(i) !== s.charCodeAt(k)) k = f[k - 1];
|
||||
if (s.charCodeAt(i) === s.charCodeAt(k)) k++;
|
||||
f[i] = k;
|
||||
}
|
||||
return reconnectContent.slice(f[s.length - 1]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream pod logs with automatic reconnection. Keeps retrying the log
|
||||
* stream until the stop signal fires (job completed) or the container
|
||||
* exits normally. This handles silent K8s API connection drops that
|
||||
* would otherwise cause the UI to stop receiving real output.
|
||||
*
|
||||
* On reconnects, content is buffered rather than emitted directly.
|
||||
* findNewLogContent then strips any overlap introduced by sinceSeconds,
|
||||
* and only the genuinely new bytes are forwarded to onLog. This is the
|
||||
* definitive fix for FAR-105 duplicate log output.
|
||||
*
|
||||
* Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect
|
||||
* loops during sustained API partitions.
|
||||
*/
|
||||
@@ -179,11 +221,10 @@ async function streamPodLogs(
|
||||
kubeconfigPath?: string,
|
||||
stopSignal?: { stopped: boolean },
|
||||
): Promise<string> {
|
||||
const allChunks: string[] = [];
|
||||
let accumulated = ""; // all log content received and emitted so far
|
||||
let attempt = 0;
|
||||
// Track the timestamp of the last successfully received log line so
|
||||
// reconnects use a tight window instead of an ever-growing one anchored
|
||||
// at stream start. This is the primary fix for FAR-105 duplicative logs.
|
||||
// Updated per-chunk (inside callbacks) so it reflects when the last
|
||||
// byte of log data actually arrived, not when the stream object closed.
|
||||
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
|
||||
while (!stopSignal?.stopped) {
|
||||
@@ -192,9 +233,8 @@ async function streamPodLogs(
|
||||
break;
|
||||
}
|
||||
|
||||
// On reconnect, ask for logs since the last received line (+5s buffer)
|
||||
// instead of since stream start. This keeps the window tight and
|
||||
// avoids ever-growing duplicate output.
|
||||
// On reconnect, ask for logs since the last received chunk (+5s buffer)
|
||||
// to avoid missing output produced during the reconnect gap.
|
||||
const sinceSeconds = attempt > 0
|
||||
? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5)
|
||||
: undefined;
|
||||
@@ -203,18 +243,37 @@ async function streamPodLogs(
|
||||
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`);
|
||||
}
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
// so any log lines in `result` were received up to this moment).
|
||||
lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
} else if (attempt === 0) {
|
||||
// First attempt returned nothing — update timestamp so reconnect
|
||||
// window stays reasonable.
|
||||
lastLogReceivedAt = preStreamTs;
|
||||
if (attempt === 0) {
|
||||
// First attempt: emit chunks to onLog in real-time and track content.
|
||||
const trackingOnLog: typeof onLog = async (stream, chunk) => {
|
||||
accumulated += chunk;
|
||||
lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
return onLog(stream, chunk);
|
||||
};
|
||||
await streamPodLogsOnce(namespace, podName, trackingOnLog, kubeconfigPath, undefined);
|
||||
} else {
|
||||
// Reconnect: buffer all chunks received from this stream attempt.
|
||||
// The sinceSeconds window means the stream begins with content that
|
||||
// was already emitted — we must not forward those bytes again.
|
||||
const reconnectChunks: string[] = [];
|
||||
const bufferingLog: typeof onLog = async (_stream, chunk) => {
|
||||
reconnectChunks.push(chunk);
|
||||
lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
};
|
||||
await streamPodLogsOnce(namespace, podName, bufferingLog, kubeconfigPath, sinceSeconds);
|
||||
|
||||
const reconnectContent = reconnectChunks.join("");
|
||||
if (reconnectContent) {
|
||||
// Strip the overlapping prefix (already-sent content) and emit only
|
||||
// what extends beyond what the UI has already seen.
|
||||
const newContent = findNewLogContent(accumulated, reconnectContent);
|
||||
accumulated += newContent;
|
||||
if (newContent) {
|
||||
await onLog("stdout", newContent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
attempt++;
|
||||
|
||||
// If the job is done or the container exited, no need to reconnect.
|
||||
@@ -224,7 +283,7 @@ async function streamPodLogs(
|
||||
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
|
||||
}
|
||||
|
||||
return allChunks.join("");
|
||||
return accumulated;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user