Reconnect K8s log stream on silent API disconnects

The adapter opened a single follow-stream to the K8s API for pod logs.
If that TCP connection silently dropped (API server hiccup, network
timeout, load-balancer idle cut), streamPodLogs returned early and no
more real Claude output reached the UI — only keepalive pings.  The
pod kept running and producing logs (visible via kubectl), but the
adapter never reconnected.

Splits streamPodLogs into streamPodLogsOnce (single follow attempt) and
a reconnecting wrapper that retries with sinceSeconds until a shared
stop signal fires when waitForJobCompletion resolves.  On reconnect,
requests logs from the original stream start time (+5s overlap) so no
output is lost; the UI deduplicates chunks.

Bumps version to 0.1.12.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
2026-04-13 10:34:41 +00:00
parent e760bf9386
commit 77ba40d9bf
5 changed files with 80 additions and 11 deletions
+1 -1
View File
@@ -1 +1 @@
{"version":3,"file":"execute.d.ts","sourceRoot":"","sources":["../../src/server/execute.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,EAAE,uBAAuB,EAAE,sBAAsB,EAAE,MAAM,4BAA4B,CAAC;AA4PlG,wBAAsB,OAAO,CAAC,GAAG,EAAE,uBAAuB,GAAG,OAAO,CAAC,sBAAsB,CAAC,CA4O3F"}
{"version":3,"file":"execute.d.ts","sourceRoot":"","sources":["../../src/server/execute.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,EAAE,uBAAuB,EAAE,sBAAsB,EAAE,MAAM,4BAA4B,CAAC;AA6PlG,wBAAsB,OAAO,CAAC,GAAG,EAAE,uBAAuB,GAAG,OAAO,CAAC,sBAAsB,CAAC,CA2P3F"}
+17 -1
View File
@@ -4,6 +4,7 @@ import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client
import { buildJobManifest } from "./job-manifest.js";
import { Writable } from "node:stream";
const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
/**
* Wait for the Job's pod to reach a terminal or running state.
* Returns the pod name once logs can be streamed, or throws on failure.
@@ -270,6 +271,7 @@ export async function execute(ctx) {
let stdout = "";
let exitCode = null;
let jobTimedOut = false;
let keepaliveTimer = null;
try {
// Wait for pod to be ready for log streaming
const scheduleTimeoutMs = 120_000; // 2 minutes for scheduling
@@ -294,8 +296,20 @@ export async function execute(ctx) {
// We also poll the Job status to detect deadline exceeded.
// 0 = no timeout (run indefinitely, matching claude_local behavior)
const completionTimeoutMs = timeoutSec > 0 ? (timeoutSec + graceSec) * 1000 : 0;
// Keepalive: periodically send a status line via onLog so the
// Paperclip server knows the adapter is still alive even when the
// pod produces no output (e.g. Claude is in a long thinking phase).
let lastLogAt = Date.now();
keepaliveTimer = setInterval(() => {
const silenceSec = Math.round((Date.now() - lastLogAt) / 1000);
void onLog("stdout", `[paperclip] keepalive — job ${jobName} running (${silenceSec}s since last output)\n`);
}, KEEPALIVE_INTERVAL_MS);
const wrappedOnLog = async (stream, chunk) => {
lastLogAt = Date.now();
return onLog(stream, chunk);
};
const [logResult, completionResult] = await Promise.allSettled([
streamPodLogs(namespace, podName, onLog, kubeconfigPath),
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath),
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath),
]);
if (logResult.status === "fulfilled") {
@@ -319,6 +333,8 @@ export async function execute(ctx) {
exitCode = await getPodExitCode(namespace, jobName, kubeconfigPath);
}
finally {
if (keepaliveTimer)
clearInterval(keepaliveTimer);
if (!retainJobs) {
await cleanupJob(namespace, jobName, onLog, kubeconfigPath);
}
+1 -1
View File
File diff suppressed because one or more lines are too long
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@farhoodliquor/paperclip-adapter-claude-k8s",
"version": "0.1.11",
"version": "0.1.12",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+60 -7
View File
@@ -13,6 +13,7 @@ import { Writable } from "node:stream";
const POLL_INTERVAL_MS = 2000;
const KEEPALIVE_INTERVAL_MS = 15_000;
const LOG_STREAM_RECONNECT_DELAY_MS = 3_000;
/**
* Wait for the Job's pod to reach a terminal or running state.
@@ -126,14 +127,15 @@ async function waitForPod(
}
/**
* Stream pod logs and accumulate stdout for result parsing.
* Returns accumulated stdout when the stream ends.
* Stream pod logs once via follow. Returns accumulated stdout when the
* stream ends (container exit, API disconnect, or abort signal).
*/
async function streamPodLogs(
async function streamPodLogsOnce(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
sinceSeconds?: number,
): Promise<string> {
const logApi = getLogApi(kubeconfigPath);
const chunks: string[] = [];
@@ -150,15 +152,59 @@ async function streamPodLogs(
await logApi.log(namespace, podName, "claude", writable, {
follow: true,
pretty: false,
...(sinceSeconds ? { sinceSeconds } : {}),
});
} catch {
// follow may fail if the container already exited — not fatal,
// we'll try a one-shot read below
// follow may fail if the container already exited or the API
// connection dropped — not fatal, caller decides whether to retry.
}
return chunks.join("");
}
/**
* Stream pod logs with automatic reconnection. Keeps retrying the log
* stream until the stop signal fires (job completed) or the container
* exits normally. This handles silent K8s API connection drops that
* would otherwise cause the UI to stop receiving real output.
*/
async function streamPodLogs(
namespace: string,
podName: string,
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
): Promise<string> {
const allChunks: string[] = [];
let attempt = 0;
const streamStartedAt = Math.floor(Date.now() / 1000);
while (!stopSignal?.stopped) {
// On reconnect, ask for logs since the stream originally started to
// avoid missing output during the reconnect gap. Duplicates are
// tolerable — the UI deduplicates log chunks.
const sinceSeconds = attempt > 0
? Math.max(1, Math.floor(Date.now() / 1000) - streamStartedAt + 5)
: undefined;
if (attempt > 0) {
await onLog("stdout", `[paperclip] Log stream disconnected — reconnecting (attempt ${attempt})...\n`);
}
const result = await streamPodLogsOnce(namespace, podName, onLog, kubeconfigPath, sinceSeconds);
if (result) allChunks.push(result);
attempt++;
// If the job is done or the container exited, no need to reconnect.
if (stopSignal?.stopped) break;
// Brief pause before reconnecting to avoid tight loops.
await new Promise((resolve) => setTimeout(resolve, LOG_STREAM_RECONNECT_DELAY_MS));
}
return allChunks.join("");
}
/**
* One-shot read of pod logs (no follow). Used as fallback when the
* follow stream missed output because the container exited quickly.
@@ -372,9 +418,16 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return onLog(stream, chunk);
};
// Shared signal: when job completion resolves, tell the log
// streamer to stop reconnecting.
const logStopSignal = { stopped: false };
const [logResult, completionResult] = await Promise.allSettled([
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath),
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath),
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal),
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then((r) => {
logStopSignal.stopped = true;
return r;
}),
]);
if (logResult.status === "fulfilled") {