fix: dedup replayed K8s log lines at the streaming UI boundary (FAR-123)
The K8s log follow stream replays the trailing few seconds of output on every reconnect because `sinceSeconds` uses integer-second granularity with a small safety buffer. FAR-105 dedupped those replays at the final parser (parse.ts), but the streaming UI consumes raw onLog chunks and still showed each replayed assistant/tool event as a fresh entry — which is how the duplicate "Three nits to fix…" blocks in the screenshot appeared between successive tool calls. Fix: add a stateful line-level dedup filter around onLog, shared across reconnects. Claude stream-json events are keyed by their stable structural IDs (message.id, tool_use_id, session_id); non-JSON output (paperclip status lines, shell output) passes through unchanged. - New `src/server/log-dedup.ts` + tests: LogLineDedupFilter handles chunk-to-line buffering, replay dedup, and end-of-stream flush. - `streamPodLogs` instantiates one filter per run so dedup state persists across reconnect attempts. Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
+17
-2
@@ -8,6 +8,7 @@ import {
|
||||
} from "./parse.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
|
||||
import { buildJobManifest } from "./job-manifest.js";
|
||||
import { LogLineDedupFilter } from "./log-dedup.js";
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import { Writable } from "node:stream";
|
||||
|
||||
@@ -190,6 +191,7 @@ async function streamPodLogsOnce(
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
sinceSeconds?: number,
|
||||
dedup?: LogLineDedupFilter,
|
||||
): Promise<string> {
|
||||
const logApi = getLogApi(kubeconfigPath);
|
||||
const chunks: string[] = [];
|
||||
@@ -198,7 +200,12 @@ async function streamPodLogsOnce(
|
||||
write(chunk: Buffer, _encoding, callback) {
|
||||
const text = chunk.toString("utf-8");
|
||||
chunks.push(text);
|
||||
void onLog("stdout", text).then(() => callback(), callback);
|
||||
const emitted = dedup ? dedup.filter(text) : text;
|
||||
if (!emitted) {
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
void onLog("stdout", emitted).then(() => callback(), callback);
|
||||
},
|
||||
});
|
||||
|
||||
@@ -238,6 +245,9 @@ async function streamPodLogs(
|
||||
// reconnects use a tight window instead of an ever-growing one anchored
|
||||
// at stream start. This is the primary fix for FAR-105 duplicative logs.
|
||||
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
// Shared across reconnects so replayed lines inside the `sinceSeconds`
|
||||
// overlap window are dropped before they reach the streaming UI (FAR-123).
|
||||
const dedup = new LogLineDedupFilter();
|
||||
|
||||
while (!stopSignal?.stopped) {
|
||||
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
|
||||
@@ -257,7 +267,7 @@ async function streamPodLogs(
|
||||
}
|
||||
|
||||
const preStreamTs = Math.floor(Date.now() / 1000);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
|
||||
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds, dedup);
|
||||
if (result) {
|
||||
allChunks.push(result);
|
||||
// Update last-received timestamp to now (the stream just ended,
|
||||
@@ -277,6 +287,11 @@ async function streamPodLogs(
|
||||
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
|
||||
}
|
||||
|
||||
// Flush any buffered partial line so the final assistant/result chunk
|
||||
// isn't dropped when the stream ends mid-line.
|
||||
const tail = dedup.flush();
|
||||
if (tail) await onLog("stdout", tail);
|
||||
|
||||
return allChunks.join("");
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,173 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { LogLineDedupFilter, eventDedupKey } from "./log-dedup.js";
|
||||
|
||||
function assistantEvent(id: string, text: string): string {
|
||||
return JSON.stringify({
|
||||
type: "assistant",
|
||||
session_id: "sess_1",
|
||||
message: {
|
||||
id,
|
||||
content: [{ type: "text", text }],
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function userToolResultEvent(toolUseId: string, content: string): string {
|
||||
return JSON.stringify({
|
||||
type: "user",
|
||||
session_id: "sess_1",
|
||||
message: {
|
||||
content: [{ type: "tool_result", tool_use_id: toolUseId, content }],
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function systemInitEvent(sessionId: string): string {
|
||||
return JSON.stringify({
|
||||
type: "system",
|
||||
subtype: "init",
|
||||
session_id: sessionId,
|
||||
model: "claude-opus-4-7",
|
||||
});
|
||||
}
|
||||
|
||||
function resultEvent(sessionId: string): string {
|
||||
return JSON.stringify({
|
||||
type: "result",
|
||||
subtype: "success",
|
||||
session_id: sessionId,
|
||||
result: "done",
|
||||
total_cost_usd: 0.01,
|
||||
usage: { input_tokens: 1, output_tokens: 1, cache_read_input_tokens: 0 },
|
||||
});
|
||||
}
|
||||
|
||||
describe("eventDedupKey", () => {
|
||||
it("keys assistant events by message.id", () => {
|
||||
const key = eventDedupKey(JSON.parse(assistantEvent("msg_abc", "hi")));
|
||||
expect(key).toBe("assistant:msg_abc");
|
||||
});
|
||||
|
||||
it("keys user tool_result events by tool_use_id", () => {
|
||||
const key = eventDedupKey(JSON.parse(userToolResultEvent("toolu_1", "ok")));
|
||||
expect(key).toBe("user:tool_result:toolu_1");
|
||||
});
|
||||
|
||||
it("keys system init events by session_id", () => {
|
||||
const key = eventDedupKey(JSON.parse(systemInitEvent("sess_xyz")));
|
||||
expect(key).toBe("system:init:sess_xyz");
|
||||
});
|
||||
|
||||
it("keys result events by session_id", () => {
|
||||
const key = eventDedupKey(JSON.parse(resultEvent("sess_xyz")));
|
||||
expect(key).toBe("result:sess_xyz");
|
||||
});
|
||||
|
||||
it("returns null for assistant events missing message.id", () => {
|
||||
const event = { type: "assistant", message: { content: [] } };
|
||||
expect(eventDedupKey(event)).toBeNull();
|
||||
});
|
||||
|
||||
it("returns null for unknown event types", () => {
|
||||
expect(eventDedupKey({ type: "unknown" })).toBeNull();
|
||||
expect(eventDedupKey({})).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe("LogLineDedupFilter", () => {
|
||||
it("passes unique lines through unchanged", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = assistantEvent("msg_1", "hello");
|
||||
const b = assistantEvent("msg_2", "world");
|
||||
expect(filter.filter(`${a}\n${b}\n`)).toBe(`${a}\n${b}\n`);
|
||||
});
|
||||
|
||||
it("drops assistant events replayed with the same message.id", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = assistantEvent("msg_1", "Three nits to fix.");
|
||||
filter.filter(`${a}\n`);
|
||||
expect(filter.filter(`${a}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("drops user tool_result events replayed with the same tool_use_id", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = userToolResultEvent("toolu_abc", "file contents");
|
||||
filter.filter(`${a}\n`);
|
||||
expect(filter.filter(`${a}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("drops system init and result events on replay", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const init = systemInitEvent("sess_1");
|
||||
const result = resultEvent("sess_1");
|
||||
filter.filter(`${init}\n${result}\n`);
|
||||
expect(filter.filter(`${init}\n${result}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("buffers incomplete trailing lines across chunks", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const line = assistantEvent("msg_1", "hello");
|
||||
const mid = Math.floor(line.length / 2);
|
||||
const out1 = filter.filter(line.slice(0, mid));
|
||||
const out2 = filter.filter(line.slice(mid) + "\n");
|
||||
expect(out1).toBe("");
|
||||
expect(out2).toBe(`${line}\n`);
|
||||
});
|
||||
|
||||
it("flush() emits a final incomplete line that was not replayed", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const line = assistantEvent("msg_tail", "no newline");
|
||||
filter.filter(line);
|
||||
expect(filter.flush()).toBe(line);
|
||||
});
|
||||
|
||||
it("flush() drops an incomplete line that was already seen with a newline", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const line = assistantEvent("msg_same", "x");
|
||||
filter.filter(`${line}\n`);
|
||||
filter.filter(line);
|
||||
expect(filter.flush()).toBe("");
|
||||
});
|
||||
|
||||
it("passes non-JSON lines through every time (does not dedup paperclip status)", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const status = "[paperclip] keepalive — job foo running\n";
|
||||
expect(filter.filter(status)).toBe(status);
|
||||
expect(filter.filter(status)).toBe(status);
|
||||
});
|
||||
|
||||
it("dedups structurally identical JSON with identical content (raw fallback)", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
// No recognized type → raw fallback key.
|
||||
const line = JSON.stringify({ foo: "bar", baz: 1 });
|
||||
filter.filter(`${line}\n`);
|
||||
expect(filter.filter(`${line}\n`)).toBe("");
|
||||
});
|
||||
|
||||
it("handles multiple complete lines in a single chunk with partial trailing", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const a = assistantEvent("msg_a", "a");
|
||||
const b = assistantEvent("msg_b", "b");
|
||||
const c = assistantEvent("msg_c", "c");
|
||||
// a and b are complete, c is partial (no trailing newline).
|
||||
const out = filter.filter(`${a}\n${b}\n${c}`);
|
||||
expect(out).toBe(`${a}\n${b}\n`);
|
||||
// Completing c later should emit exactly c.
|
||||
expect(filter.filter("\n")).toBe(`${c}\n`);
|
||||
});
|
||||
|
||||
it("drops the classic FAR-123 replay scenario across reconnects", () => {
|
||||
const filter = new LogLineDedupFilter();
|
||||
const assistantNits = assistantEvent("msg_nits", "Three nits to fix. Let me look at an existing test file...");
|
||||
const assistantWrite = assistantEvent("msg_write", "Now I need to write unit tests");
|
||||
// First stream attempt emits both events.
|
||||
const out1 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
|
||||
expect(out1).toBe(`${assistantNits}\n${assistantWrite}\n`);
|
||||
// Reconnect replays both within the sinceSeconds overlap — filter should drop them.
|
||||
const out2 = filter.filter(`${assistantNits}\n${assistantWrite}\n`);
|
||||
expect(out2).toBe("");
|
||||
// And a genuinely new event after the replay should still pass through.
|
||||
const assistantFresh = assistantEvent("msg_fresh", "next turn");
|
||||
expect(filter.filter(`${assistantFresh}\n`)).toBe(`${assistantFresh}\n`);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,146 @@
|
||||
/**
|
||||
* Line-level dedup filter for the K8s log stream.
|
||||
*
|
||||
* The K8s log follow stream can reconnect with an overlapping `sinceSeconds`
|
||||
* window (integer-second granularity + a safety buffer), which replays a few
|
||||
* seconds of recent output on every reconnect. Without dedup those replayed
|
||||
* lines appear as duplicate events in the streaming UI — the same assistant
|
||||
* text block shows up between every subsequent tool call (FAR-123).
|
||||
*
|
||||
* The filter operates at the chunk → line level: chunks are split on `\n`,
|
||||
* incomplete trailing content is buffered until the next chunk, and each
|
||||
* complete line is emitted at most once. JSON-shaped Claude stream-json
|
||||
* events are keyed by their stable structural IDs; non-JSON lines pass
|
||||
* through unchanged so genuinely-repeated status lines are not swallowed.
|
||||
*/
|
||||
|
||||
type Parsed = Record<string, unknown>;
|
||||
|
||||
function asString(value: unknown): string {
|
||||
return typeof value === "string" ? value : "";
|
||||
}
|
||||
|
||||
function asRecord(value: unknown): Parsed | null {
|
||||
if (typeof value !== "object" || value === null || Array.isArray(value)) return null;
|
||||
return value as Parsed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a stable dedup key for a Claude stream-json event. Returns `null`
|
||||
* when the event is not a recognized Claude event — those lines fall back to
|
||||
* raw-content hashing so non-JSON output (paperclip status lines, shell
|
||||
* output) is never deduped by identity.
|
||||
*/
|
||||
export function eventDedupKey(event: Parsed): string | null {
|
||||
const type = asString(event.type);
|
||||
|
||||
if (type === "system") {
|
||||
const subtype = asString(event.subtype);
|
||||
const sessionId = asString(event.session_id);
|
||||
if (subtype === "init" && sessionId) return `system:init:${sessionId}`;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === "assistant") {
|
||||
const message = asRecord(event.message);
|
||||
const id = message ? asString(message.id) : "";
|
||||
if (id) return `assistant:${id}`;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === "user") {
|
||||
const message = asRecord(event.message);
|
||||
const content = message && Array.isArray(message.content) ? message.content : [];
|
||||
const toolUseIds: string[] = [];
|
||||
for (const entry of content) {
|
||||
const block = asRecord(entry);
|
||||
if (!block) continue;
|
||||
const toolUseId = asString(block.tool_use_id);
|
||||
if (toolUseId) toolUseIds.push(toolUseId);
|
||||
}
|
||||
if (toolUseIds.length > 0) return `user:tool_result:${toolUseIds.join(",")}`;
|
||||
return null;
|
||||
}
|
||||
|
||||
if (type === "result") {
|
||||
const sessionId = asString(event.session_id);
|
||||
return sessionId ? `result:${sessionId}` : "result:unknown";
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stateful line-level dedup filter. Emits `filter(chunk)` output through
|
||||
* the caller — preserves original chunk formatting (including trailing
|
||||
* newlines) for lines that pass the dedup check.
|
||||
*/
|
||||
export class LogLineDedupFilter {
|
||||
private buffer = "";
|
||||
private readonly seenKeys = new Set<string>();
|
||||
|
||||
/**
|
||||
* Process a chunk and return the subset that should be forwarded.
|
||||
* Incomplete trailing content (no terminating newline) is buffered and
|
||||
* emitted on the next chunk that completes the line (or on flush()).
|
||||
*/
|
||||
filter(chunk: string): string {
|
||||
if (!chunk) return "";
|
||||
const combined = this.buffer + chunk;
|
||||
const endsWithNewline = combined.endsWith("\n");
|
||||
const parts = combined.split("\n");
|
||||
|
||||
if (endsWithNewline) {
|
||||
// Discard the final empty element — last line was complete.
|
||||
parts.pop();
|
||||
this.buffer = "";
|
||||
} else {
|
||||
// Last element is an incomplete line — hold it for the next chunk.
|
||||
this.buffer = parts.pop() ?? "";
|
||||
}
|
||||
|
||||
const out: string[] = [];
|
||||
for (const line of parts) {
|
||||
if (this.shouldEmit(line)) out.push(line);
|
||||
}
|
||||
if (out.length === 0) return "";
|
||||
return out.join("\n") + "\n";
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush any incomplete trailing content. Called when the stream ends
|
||||
* without a terminating newline so the final partial line isn't lost.
|
||||
*/
|
||||
flush(): string {
|
||||
const pending = this.buffer;
|
||||
this.buffer = "";
|
||||
if (!pending) return "";
|
||||
return this.shouldEmit(pending) ? pending : "";
|
||||
}
|
||||
|
||||
private shouldEmit(line: string): boolean {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) return true;
|
||||
|
||||
// Only attempt dedup on JSON-shaped lines; pass shell/text output through.
|
||||
if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true;
|
||||
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(trimmed);
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
|
||||
const event = asRecord(parsed);
|
||||
if (!event) return true;
|
||||
|
||||
// Recognized Claude stream-json event → structural key.
|
||||
const structuralKey = eventDedupKey(event);
|
||||
const key = structuralKey ?? `raw:${trimmed}`;
|
||||
|
||||
if (this.seenKeys.has(key)) return false;
|
||||
this.seenKeys.add(key);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user