Compare commits

...

5 Commits

Author SHA1 Message Date
Gandalf the Greybeard e86b14a677 0.1.34 2026-04-23 23:35:02 +00:00
Gandalf the Greybeard 98f3821f91 fix: address remaining minor code review findings (FAR-15)
- #9: match Paperclip container by name in k8s-client instead of
  trusting spec.containers[0], which could be a service-mesh sidecar
- #11: key assistant-text dedup by (message.id, index) so legitimate
  duplicate content across turns isn't collapsed in the summary
- #16: trim trailing hyphens from sanitized K8s names so truncation
  doesn't produce names ending in "-"

Findings #5 (keepalive re-verify) and #6 (one-shot log dedup) were
already addressed in the current code — verified during this review.
#8 (orphan reattach behavior) requires a product decision on whether
"new session wins" is intentional, so deferring.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:34:59 +00:00
Gandalf the Greybeard 21a02da00f fix: prevent prompt Secret leak by attaching ownerReference to Job (FAR-15)
When a large prompt creates a K8s Secret, it can orphan if the process
crashes before the finally block runs. Now the Secret gets an
ownerReference pointing to the Job after creation, so K8s GC cleans it
up automatically. Also cleans up the Secret on job creation failure.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:29:47 +00:00
Gandalf the Greybeard 346f5cc1df fix: prevent UTF-8 corruption when RTK truncation splits multi-byte codepoints (FAR-19)
The trunc function in the RTK filter script now walks back from the
truncation point past continuation bytes and checks whether the full
codepoint fits, avoiding replacement characters from mid-codepoint slicing.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:28:28 +00:00
Gandalf the Greybeard ef73586a41 fix: address 6 critical/minor code review findings (FAR-15)
1. Fix resources.* dotted-key config — UI fields now correctly read
2. Fix operator precedence bug in container status key (add parens)
3. Add missing RBAC checks to testEnvironment (jobs/list, secrets/*, pvc)
4. Add bail timer log message for debuggability
5. Make result-event detection robust to JSON whitespace variations
6. Remove namespace short-circuit so all checks run on first attempt

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-23 23:15:01 +00:00
8 changed files with 147 additions and 43 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.33",
"version": "0.1.34",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.33",
"version": "0.1.34",
"license": "MIT",
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "paperclip-adapter-claude-k8s",
"version": "0.1.33",
"version": "0.1.34",
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
"license": "MIT",
"repository": {
+70 -10
View File
@@ -176,7 +176,7 @@ async function waitForPod(
const containerStatuses = pod.status?.containerStatuses ?? [];
// Log phase transitions
const statusKey = `${phase}:${initStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.terminated?.reason ?? "ok").join(",")}:${containerStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.running ? "running" : "waiting").join(",")}`;
const statusKey = `${phase}:${initStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.terminated?.reason ?? "ok").join(",")}:${containerStatuses.map((s) => s.state?.waiting?.reason ?? (s.state?.running ? "running" : "waiting")).join(",")}`;
if (statusKey !== lastStatus) {
const details: string[] = [`phase=${phase}`];
for (const init of initStatuses) {
@@ -301,7 +301,10 @@ export async function streamPodLogsOnce(
if (stopSignal.stopped) {
if (!writable.destroyed) writable.destroy();
if (!bailTimer && bailResolve) {
bailTimer = setTimeout(bailResolve, LOG_STREAM_BAIL_TIMEOUT_MS);
bailTimer = setTimeout(() => {
onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {});
bailResolve!();
}, LOG_STREAM_BAIL_TIMEOUT_MS);
}
}
}, 200);
@@ -345,6 +348,7 @@ async function streamPodLogs(
onLog: AdapterExecutionContext["onLog"],
kubeconfigPath?: string,
stopSignal?: { stopped: boolean },
dedup?: LogLineDedupFilter,
): Promise<string> {
const allChunks: string[] = [];
let attempt = 0;
@@ -354,7 +358,7 @@ async function streamPodLogs(
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
// Shared across reconnects so replayed lines inside the `sinceSeconds`
// overlap window are dropped before they reach the streaming UI (FAR-123).
const dedup = new LogLineDedupFilter();
if (!dedup) dedup = new LogLineDedupFilter();
while (!stopSignal?.stopped) {
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
@@ -751,11 +755,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
// Create the Job
let createdJobUid: string | undefined;
try {
await batchApi.createNamespacedJob({ namespace, body: job });
const created = await batchApi.createNamespacedJob({ namespace, body: job });
createdJobUid = created.metadata?.uid;
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
if (promptSecret) {
try {
await coreApi.deleteNamespacedSecret({ name: promptSecret.name, namespace: promptSecret.namespace });
} catch { /* best-effort */ }
}
return {
exitCode: null,
signal: null,
@@ -765,6 +776,35 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
};
}
// Attach ownerReference so K8s GC cleans up the Secret if the process
// crashes before the finally block runs.
if (promptSecret && createdJobUid) {
try {
await coreApi.patchNamespacedSecret({
name: promptSecret.name,
namespace: promptSecret.namespace,
body: [
{
op: "add",
path: "/metadata/ownerReferences",
value: [
{
apiVersion: "batch/v1",
kind: "Job",
name: jobName,
uid: createdJobUid,
blockOwnerDeletion: false,
},
],
},
],
});
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
await onLog("stderr", `[paperclip] Warning: failed to set ownerReference on prompt Secret: ${msg}\n`);
}
}
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
}
@@ -853,6 +893,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
let keepaliveTick = 0;
let keepaliveJobTerminal = false;
let keepaliveJobTerminalAt: number | null = null;
let consecutiveTerminalReadings = 0;
keepaliveTimer = setInterval(() => {
// Fire-and-forget the async work; setInterval callbacks must be
// synchronous or the timer will drift.
@@ -875,19 +916,32 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
}
// Verify the Job is still alive before announcing or refreshing.
// Require two consecutive terminal readings before latching to
// guard against a stale K8s API cache returning a false terminal
// status on a single read (finding #5, FAR-15).
try {
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
const terminal = job.status?.conditions?.some(
(c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True",
);
if (terminal) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
if (ctx.onSpawn) {
consecutiveTerminalReadings++;
if (consecutiveTerminalReadings >= 2) {
keepaliveJobTerminal = true;
keepaliveJobTerminalAt = Date.now();
if (ctx.onSpawn) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
return;
}
// First terminal reading — do not latch yet; next tick confirms.
keepaliveTick++;
if (ctx.onSpawn && (keepaliveTick === 1 || keepaliveTick % 12 === 0)) {
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
}
return;
}
consecutiveTerminalReadings = 0;
} catch (err: unknown) {
// Only treat 404 (Job deleted) as terminal. Transient 5xx or
// connection resets should NOT permanently disable the keepalive —
@@ -928,9 +982,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// Shared signal: when job completion resolves, tell the log
// streamer to stop reconnecting.
const logStopSignal = { stopped: false };
// Shared dedup filter: created here so the one-shot fallback can
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
const logDedup = new LogLineDedupFilter();
const [logResult, completionResult] = await Promise.allSettled([
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal),
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup),
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then((r) => {
logStopSignal.stopped = true;
return r;
@@ -958,7 +1015,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
// from the beginning of the log, giving us the full output.
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
// the authoritative parse happens once below after all fallbacks complete).
const hasResultEvent = stdout.includes('"type":"result"');
const hasResultEvent = stdout.split("\n").some((l) => { try { return JSON.parse(l).type === "result"; } catch { return false; } });
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
if (needsOneShot) {
if (!stdout.trim()) {
@@ -967,9 +1024,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
if (!stdout.trim() && oneShotLogs.trim()) {
stdout = oneShotLogs;
await onLog("stdout", stdout);
const deduped = logDedup.filter(stdout) + logDedup.flush();
if (deduped) await onLog("stdout", deduped);
} else if (oneShotLogs && oneShotLogs.length > stdout.length) {
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
const deduped = logDedup.filter(oneShotLogs) + logDedup.flush();
if (deduped) await onLog("stdout", deduped);
stdout = oneShotLogs;
}
}
+26 -4
View File
@@ -438,10 +438,10 @@ describe("buildJobManifest", () => {
it("uses configured resource overrides", () => {
ctx.config = {
resources: {
requests: { cpu: "500m", memory: "1Gi" },
limits: { cpu: "2000m", memory: "4Gi" },
},
"resources.requests.cpu": "500m",
"resources.requests.memory": "1Gi",
"resources.limits.cpu": "2000m",
"resources.limits.memory": "4Gi",
};
const { job } = buildJobManifest({ ctx, selfPod });
const resources = job.spec?.template?.spec?.containers[0]?.resources;
@@ -802,6 +802,28 @@ describe("buildJobManifest", () => {
expect(filterScript).toContain("tool_result");
});
it("filter script truncates without corrupting multi-byte UTF-8", () => {
// "中" is U+4E2D, 3 bytes in UTF-8: E4 B8 AD
// With MAX=5, two "中" (6 bytes) should truncate to one (3 bytes), not
// produce a replacement character from slicing mid-codepoint.
const setup = buildRtkSetupCommands(5);
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
// Extract the trunc function from the filter script and evaluate it
const fnMatch = filterScript.match(/(function trunc\(s\)\{.*\})(?=const tr=)/);
expect(fnMatch).toBeTruthy();
// eslint-disable-next-line no-eval
const trunc = eval(`(()=>{const MAX=5;${fnMatch![1]};return trunc;})()`);
const result = trunc("中中");
expect(result).not.toContain("");
expect(result).toContain("中");
expect(result).toContain("truncated by paperclip-rtk");
// Should report bytes from the actual truncation point, not MAX
expect(result).toContain("3 bytes truncated");
});
it("filter script handles array content (block format)", () => {
const setup = buildRtkSetupCommands(50000);
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
+11 -10
View File
@@ -47,7 +47,8 @@ export function buildRtkSetupCommands(maxOutputBytes: number): string {
`if(typeof s!=='string')return s;`,
`const b=Buffer.from(s,'utf-8');`,
`if(b.length<=MAX)return s;`,
`return b.slice(0,MAX).toString('utf-8')+'\\n[...'+(b.length-MAX)+' bytes truncated by paperclip-rtk]';`,
`let e=MAX;if(e>0){let p=e-1;while(p>0&&(b[p]&0xC0)===0x80)p--;const l=b[p];let n=1;if((l&0xE0)===0xC0)n=2;else if((l&0xF0)===0xE0)n=3;else if((l&0xF8)===0xF0)n=4;if(p+n>e)e=p;}`,
`return b.slice(0,e).toString('utf-8')+'\\n[...'+(b.length-e)+' bytes truncated by paperclip-rtk]';`,
`}`,
`const tr=o&&(o.tool_response||o.tool_result);`,
`if(tr){`,
@@ -202,7 +203,9 @@ export interface JobBuildResult {
}
function sanitizeForK8sName(value: string, maxLen = 16): string {
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
// Trim trailing hyphens after slicing so names don't end with `-` when
// truncation lands on a hyphen boundary (finding #16, FAR-15).
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen).replace(/-+$/, "");
}
/**
@@ -345,7 +348,6 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
const extraArgs = asStringArray(config.extraArgs);
const timeoutSec = asNumber(config.timeoutSec, 0);
const ttlSeconds = asNumber(config.ttlSecondsAfterFinished, 300);
const resources = parseObject(config.resources);
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
const extraLabels = parseKeyValueConfig(config.labels);
@@ -427,17 +429,16 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
// Build env vars
const envVars = buildEnvVars(ctx, selfPod, config);
// Resource defaults
const resourceRequests = parseObject(resources.requests);
const resourceLimits = parseObject(resources.limits);
// Resource defaults — UI stores dotted keys (e.g. "resources.requests.cpu")
// as flat config entries, so read them directly from config with the dotted key.
const containerResources: k8s.V1ResourceRequirements = {
requests: {
cpu: asString(resourceRequests.cpu, "1000m"),
memory: asString(resourceRequests.memory, "2Gi"),
cpu: asString(config["resources.requests.cpu"], "1000m"),
memory: asString(config["resources.requests.memory"], "2Gi"),
},
limits: {
cpu: asString(resourceLimits.cpu, "4000m"),
memory: asString(resourceLimits.memory, "8Gi"),
cpu: asString(config["resources.limits.cpu"], "4000m"),
memory: asString(config["resources.limits.memory"], "8Gi"),
},
};
+6 -1
View File
@@ -106,7 +106,12 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise<SelfPodIn
throw new Error(`claude_k8s: pod ${hostname} has no spec`);
}
const mainContainer = spec.containers[0];
// Match the Paperclip container by name ("paperclip") to avoid service-mesh
// sidecars or other injected containers being picked up as the source of
// truth for the Job spec (finding #9, FAR-15). Fall back to the first
// container if no name match is found (matches prior behavior).
const mainContainer =
spec.containers.find((c) => c.name === "paperclip") ?? spec.containers[0];
if (!mainContainer?.image) {
throw new Error(`claude_k8s: pod ${hostname} has no container image`);
}
+15 -6
View File
@@ -9,9 +9,12 @@ export function parseClaudeStreamJson(stdout: string) {
let model = "";
let finalResult: Record<string, unknown> | null = null;
const assistantTexts: string[] = [];
// Belt-and-braces dedup: track seen text blocks to filter duplicates
// caused by log stream reconnects replaying overlapping windows.
const seenTexts = new Set<string>();
// Belt-and-braces dedup: key by (message.id, textIndex) so a session that
// legitimately emits the same text twice in different turns isn't collapsed
// (finding #11, FAR-15). The log-dedup filter handles reconnect overlaps
// at the line level; this guard only needs to protect against the same
// message block being parsed twice.
const seenBlocks = new Set<string>();
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
@@ -29,14 +32,20 @@ export function parseClaudeStreamJson(stdout: string) {
if (type === "assistant") {
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
const message = parseObject(event.message);
const messageId = asString(message.id, "");
const content = Array.isArray(message.content) ? message.content : [];
for (const entry of content) {
for (let i = 0; i < content.length; i++) {
const entry = content[i];
if (typeof entry !== "object" || entry === null || Array.isArray(entry)) continue;
const block = entry as Record<string, unknown>;
if (asString(block.type, "") === "text") {
const text = asString(block.text, "");
if (text && !seenTexts.has(text)) {
seenTexts.add(text);
if (!text) continue;
// Prefer (messageId, index) when the message has an id; fall back
// to text content when it doesn't (legacy/partial events).
const key = messageId ? `${messageId}:${i}` : `text:${text}`;
if (!seenBlocks.has(key)) {
seenBlocks.add(key);
assistantTexts.push(text);
}
}
+16 -9
View File
@@ -85,8 +85,13 @@ async function checkRbac(
{ resource: "jobs", group: "batch", verb: "create", code: "k8s_rbac_job_create", label: "create Jobs" },
{ resource: "jobs", group: "batch", verb: "delete", code: "k8s_rbac_job_delete", label: "delete Jobs" },
{ resource: "jobs", group: "batch", verb: "get", code: "k8s_rbac_job_get", label: "get Jobs" },
{ resource: "jobs", group: "batch", verb: "list", code: "k8s_rbac_job_list", label: "list Jobs" },
{ resource: "pods", group: "", verb: "list", code: "k8s_rbac_pod_list", label: "list Pods" },
{ resource: "pods/log", group: "", verb: "get", code: "k8s_rbac_pod_log", label: "get Pod logs" },
{ resource: "secrets", group: "", verb: "create", code: "k8s_rbac_secret_create", label: "create Secrets" },
{ resource: "secrets", group: "", verb: "delete", code: "k8s_rbac_secret_delete", label: "delete Secrets" },
{ resource: "secrets", group: "", verb: "get", code: "k8s_rbac_secret_get", label: "get Secrets" },
{ resource: "persistentvolumeclaims", group: "", verb: "get", code: "k8s_rbac_pvc_get", label: "get PersistentVolumeClaims" },
];
for (const check of rbacChecks) {
@@ -221,16 +226,18 @@ export async function testEnvironment(
// 2. Target namespace exists
const nsOk = await checkNamespace(namespace, selfPod.namespace, checks, kubeconfigPath);
if (!nsOk) {
return { adapterType: ctx.adapterType, status: summarizeStatus(checks), checks, testedAt: new Date().toISOString() };
}
// 3-5. Run remaining checks in parallel
await Promise.all([
checkRbac(namespace, checks, kubeconfigPath),
checkSecret(namespace, secretRef, checks, kubeconfigPath),
checkPvc(selfPod, checks, kubeconfigPath),
]);
// 3-5. Run remaining checks even if namespace check failed so operators see
// all issues at once instead of fixing them one at a time.
if (nsOk) {
await Promise.all([
checkRbac(namespace, checks, kubeconfigPath),
checkSecret(namespace, secretRef, checks, kubeconfigPath),
checkPvc(selfPod, checks, kubeconfigPath),
]);
} else {
await checkRbac(namespace, checks, kubeconfigPath);
}
return {
adapterType: ctx.adapterType,