diff --git a/src/server/execute.ts b/src/server/execute.ts index 082cdd5..c65a43f 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -348,6 +348,7 @@ async function streamPodLogs( onLog: AdapterExecutionContext["onLog"], kubeconfigPath?: string, stopSignal?: { stopped: boolean }, + dedup?: LogLineDedupFilter, ): Promise { const allChunks: string[] = []; let attempt = 0; @@ -357,7 +358,7 @@ async function streamPodLogs( let lastLogReceivedAt = Math.floor(Date.now() / 1000); // Shared across reconnects so replayed lines inside the `sinceSeconds` // overlap window are dropped before they reach the streaming UI (FAR-123). - const dedup = new LogLineDedupFilter(); + if (!dedup) dedup = new LogLineDedupFilter(); while (!stopSignal?.stopped) { if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) { @@ -754,11 +755,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise 0 ? `${timeoutSec}s` : "none"})\n`); } @@ -856,6 +893,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise { // Fire-and-forget the async work; setInterval callbacks must be // synchronous or the timer will drift. @@ -878,19 +916,32 @@ export async function execute(ctx: AdapterExecutionContext): Promise (c.type === "Complete" || c.type === "Failed") && c.status === "True", ); if (terminal) { - keepaliveJobTerminal = true; - keepaliveJobTerminalAt = Date.now(); - if (ctx.onSpawn) { + consecutiveTerminalReadings++; + if (consecutiveTerminalReadings >= 2) { + keepaliveJobTerminal = true; + keepaliveJobTerminalAt = Date.now(); + if (ctx.onSpawn) { + void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {}); + } + return; + } + // First terminal reading — do not latch yet; next tick confirms. + keepaliveTick++; + if (ctx.onSpawn && (keepaliveTick === 1 || keepaliveTick % 12 === 0)) { void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {}); } return; } + consecutiveTerminalReadings = 0; } catch (err: unknown) { // Only treat 404 (Job deleted) as terminal. Transient 5xx or // connection resets should NOT permanently disable the keepalive — @@ -931,9 +982,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise { logStopSignal.stopped = true; return r; @@ -970,9 +1024,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise stdout.length) { await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`); + const deduped = logDedup.filter(oneShotLogs) + logDedup.flush(); + if (deduped) await onLog("stdout", deduped); stdout = oneShotLogs; } }