diff --git a/package-lock.json b/package-lock.json index 24e370a..324f64f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@farhoodliquor/paperclip-adapter-claude-k8s", - "version": "0.1.25", + "version": "0.1.26", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@farhoodliquor/paperclip-adapter-claude-k8s", - "version": "0.1.25", + "version": "0.1.26", "license": "MIT", "dependencies": { "@kubernetes/client-node": "^1.0.0", diff --git a/package.json b/package.json index e363980..54a7cb3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@farhoodliquor/paperclip-adapter-claude-k8s", - "version": "0.1.25", + "version": "0.1.26", "description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs", "license": "MIT", "repository": { diff --git a/src/server/execute.ts b/src/server/execute.ts index 78994f5..0bc3581 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -163,12 +163,54 @@ async function streamPodLogsOnce( return chunks.join(""); } +/** + * Given content already sent to the UI and new content from a reconnect + * stream, return only the portion that is genuinely new. + * + * The K8s sinceSeconds parameter causes reconnect streams to begin with + * content that was already sent during a prior stream attempt. We find + * the longest prefix of reconnectContent that overlaps with the tail of + * existingContent (i.e. the largest block of already-seen bytes at the + * start of the reconnect data) and skip it. + * + * Uses the KMP failure-function algorithm for O(|reconnectContent|) time. + */ +function findNewLogContent(existingContent: string, reconnectContent: string): string { + if (!reconnectContent) return ""; + if (!existingContent) return reconnectContent; + + // The overlap can be at most reconnectContent.length bytes long, so we + // only need to inspect the matching-length tail of existingContent. + const limit = Math.min(reconnectContent.length, existingContent.length); + const pattern = reconnectContent.slice(0, limit); // potential overlap prefix + const text = existingContent.slice(-limit); // tail to search in + + // Build KMP failure function on (pattern + sentinel + text). + // The value of f at the final position equals the longest prefix of + // `pattern` that is simultaneously a suffix of `text` — the overlap length. + const sep = "\x00"; // sentinel unlikely to appear in log bytes + const s = pattern + sep + text; + const f = new Int32Array(s.length); + let k = 0; + for (let i = 1; i < s.length; i++) { + while (k > 0 && s.charCodeAt(i) !== s.charCodeAt(k)) k = f[k - 1]; + if (s.charCodeAt(i) === s.charCodeAt(k)) k++; + f[i] = k; + } + return reconnectContent.slice(f[s.length - 1]); +} + /** * Stream pod logs with automatic reconnection. Keeps retrying the log * stream until the stop signal fires (job completed) or the container * exits normally. This handles silent K8s API connection drops that * would otherwise cause the UI to stop receiving real output. * + * On reconnects, content is buffered rather than emitted directly. + * findNewLogContent then strips any overlap introduced by sinceSeconds, + * and only the genuinely new bytes are forwarded to onLog. This is the + * definitive fix for FAR-105 duplicate log output. + * * Capped at MAX_LOG_RECONNECT_ATTEMPTS to prevent infinite reconnect * loops during sustained API partitions. */ @@ -179,11 +221,10 @@ async function streamPodLogs( kubeconfigPath?: string, stopSignal?: { stopped: boolean }, ): Promise { - const allChunks: string[] = []; + let accumulated = ""; // all log content received and emitted so far let attempt = 0; - // Track the timestamp of the last successfully received log line so - // reconnects use a tight window instead of an ever-growing one anchored - // at stream start. This is the primary fix for FAR-105 duplicative logs. + // Updated per-chunk (inside callbacks) so it reflects when the last + // byte of log data actually arrived, not when the stream object closed. let lastLogReceivedAt = Math.floor(Date.now() / 1000); while (!stopSignal?.stopped) { @@ -192,9 +233,8 @@ async function streamPodLogs( break; } - // On reconnect, ask for logs since the last received line (+5s buffer) - // instead of since stream start. This keeps the window tight and - // avoids ever-growing duplicate output. + // On reconnect, ask for logs since the last received chunk (+5s buffer) + // to avoid missing output produced during the reconnect gap. const sinceSeconds = attempt > 0 ? Math.max(1, Math.floor(Date.now() / 1000) - lastLogReceivedAt + 5) : undefined; @@ -203,18 +243,37 @@ async function streamPodLogs( await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt}/${MAX_LOG_RECONNECT_ATTEMPTS})...\n`); } - const preStreamTs = Math.floor(Date.now() / 1000); - const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds); - if (result) { - allChunks.push(result); - // Update last-received timestamp to now (the stream just ended, - // so any log lines in `result` were received up to this moment). - lastLogReceivedAt = Math.floor(Date.now() / 1000); - } else if (attempt === 0) { - // First attempt returned nothing — update timestamp so reconnect - // window stays reasonable. - lastLogReceivedAt = preStreamTs; + if (attempt === 0) { + // First attempt: emit chunks to onLog in real-time and track content. + const trackingOnLog: typeof onLog = async (stream, chunk) => { + accumulated += chunk; + lastLogReceivedAt = Math.floor(Date.now() / 1000); + return onLog(stream, chunk); + }; + await streamPodLogsOnce(namespace, podName, trackingOnLog, kubeconfigPath, undefined); + } else { + // Reconnect: buffer all chunks received from this stream attempt. + // The sinceSeconds window means the stream begins with content that + // was already emitted — we must not forward those bytes again. + const reconnectChunks: string[] = []; + const bufferingLog: typeof onLog = async (_stream, chunk) => { + reconnectChunks.push(chunk); + lastLogReceivedAt = Math.floor(Date.now() / 1000); + }; + await streamPodLogsOnce(namespace, podName, bufferingLog, kubeconfigPath, sinceSeconds); + + const reconnectContent = reconnectChunks.join(""); + if (reconnectContent) { + // Strip the overlapping prefix (already-sent content) and emit only + // what extends beyond what the UI has already seen. + const newContent = findNewLogContent(accumulated, reconnectContent); + accumulated += newContent; + if (newContent) { + await onLog("stdout", newContent); + } + } } + attempt++; // If the job is done or the container exited, no need to reconnect. @@ -224,7 +283,7 @@ async function streamPodLogs( await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS)); } - return allChunks.join(""); + return accumulated; } /**