All files / src/server log-dedup.ts

89.33% Statements 67/75
80.32% Branches 49/61
100% Functions 6/6
95.08% Lines 58/61

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147                                      147x       82x 82x                   65x   65x 21x 21x 21x       44x 18x 18x 18x 1x     26x 3x 3x 3x 3x 3x 3x 3x 3x   3x       23x 19x 19x     4x                 27x 27x               39x 39x 39x 39x   39x   35x 35x     4x     39x 39x 58x   39x 30x               20x 20x 20x 2x       60x 60x     60x     58x 58x         58x 58x     58x 58x   60x 48x 48x      
/**
 * Line-level dedup filter for the K8s log stream.
 *
 * The K8s log follow stream can reconnect with an overlapping `sinceSeconds`
 * window (integer-second granularity + a safety buffer), which replays a few
 * seconds of recent output on every reconnect. Without dedup those replayed
 * lines appear as duplicate events in the streaming UI — the same assistant
 * text block shows up between every subsequent tool call (FAR-123).
 *
 * The filter operates at the chunk → line level: chunks are split on `\n`,
 * incomplete trailing content is buffered until the next chunk, and each
 * complete line is emitted at most once. JSON-shaped Claude stream-json
 * events are keyed by their stable structural IDs; non-JSON lines pass
 * through unchanged so genuinely-repeated status lines are not swallowed.
 */
 
type Parsed = Record<string, unknown>;
 
function asString(value: unknown): string {
  return typeof value === "string" ? value : "";
}
 
function asRecord(value: unknown): Parsed | null {
  Iif (typeof value !== "object" || value === null || Array.isArray(value)) return null;
  return value as Parsed;
}
 
/**
 * Build a stable dedup key for a Claude stream-json event.  Returns `null`
 * when the event is not a recognized Claude event — those lines fall back to
 * raw-content hashing so non-JSON output (paperclip status lines, shell
 * output) is never deduped by identity.
 */
export function eventDedupKey(event: Parsed): string | null {
  const type = asString(event.type);
 
  if (type === "system") {
    const subtype = asString(event.subtype);
    const sessionId = asString(event.session_id);
    Eif (subtype === "init" && sessionId) return `system:init:${sessionId}`;
    return null;
  }
 
  if (type === "assistant") {
    const message = asRecord(event.message);
    const id = message ? asString(message.id) : "";
    if (id) return `assistant:${id}`;
    return null;
  }
 
  if (type === "user") {
    const message = asRecord(event.message);
    const content = message && Array.isArray(message.content) ? message.content : [];
    const toolUseIds: string[] = [];
    for (const entry of content) {
      const block = asRecord(entry);
      Iif (!block) continue;
      const toolUseId = asString(block.tool_use_id);
      Eif (toolUseId) toolUseIds.push(toolUseId);
    }
    Eif (toolUseIds.length > 0) return `user:tool_result:${toolUseIds.join(",")}`;
    return null;
  }
 
  if (type === "result") {
    const sessionId = asString(event.session_id);
    return sessionId ? `result:${sessionId}` : "result:unknown";
  }
 
  return null;
}
 
/**
 * Stateful line-level dedup filter.  Emits `filter(chunk)` output through
 * the caller — preserves original chunk formatting (including trailing
 * newlines) for lines that pass the dedup check.
 */
export class LogLineDedupFilter {
  private buffer = "";
  private readonly seenKeys = new Set<string>();
 
  /**
   * Process a chunk and return the subset that should be forwarded.
   * Incomplete trailing content (no terminating newline) is buffered and
   * emitted on the next chunk that completes the line (or on flush()).
   */
  filter(chunk: string): string {
    Iif (!chunk) return "";
    const combined = this.buffer + chunk;
    const endsWithNewline = combined.endsWith("\n");
    const parts = combined.split("\n");
 
    if (endsWithNewline) {
      // Discard the final empty element — last line was complete.
      parts.pop();
      this.buffer = "";
    } else {
      // Last element is an incomplete line — hold it for the next chunk.
      this.buffer = parts.pop() ?? "";
    }
 
    const out: string[] = [];
    for (const line of parts) {
      if (this.shouldEmit(line)) out.push(line);
    }
    if (out.length === 0) return "";
    return out.join("\n") + "\n";
  }
 
  /**
   * Flush any incomplete trailing content.  Called when the stream ends
   * without a terminating newline so the final partial line isn't lost.
   */
  flush(): string {
    const pending = this.buffer;
    this.buffer = "";
    if (!pending) return "";
    return this.shouldEmit(pending) ? pending : "";
  }
 
  private shouldEmit(line: string): boolean {
    const trimmed = line.trim();
    Iif (!trimmed) return true;
 
    // Only attempt dedup on JSON-shaped lines; pass shell/text output through.
    if (!trimmed.startsWith("{") || !trimmed.endsWith("}")) return true;
 
    let parsed: unknown;
    try {
      parsed = JSON.parse(trimmed);
    } catch {
      return true;
    }
 
    const event = asRecord(parsed);
    Iif (!event) return true;
 
    // Recognized Claude stream-json event → structural key.
    const structuralKey = eventDedupKey(event);
    const key = structuralKey ?? `raw:${trimmed}`;
 
    if (this.seenKeys.has(key)) return false;
    this.seenKeys.add(key);
    return true;
  }
}