Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e86b14a677 | |||
| 98f3821f91 | |||
| 21a02da00f | |||
| 346f5cc1df | |||
| ef73586a41 |
Generated
+2
-2
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.33",
|
||||
"version": "0.1.34",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.33",
|
||||
"version": "0.1.34",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@kubernetes/client-node": "^1.0.0",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "paperclip-adapter-claude-k8s",
|
||||
"version": "0.1.33",
|
||||
"version": "0.1.34",
|
||||
"description": "Paperclip adapter plugin that runs Claude Code agents as Kubernetes Jobs",
|
||||
"license": "MIT",
|
||||
"repository": {
|
||||
|
||||
+70
-10
@@ -176,7 +176,7 @@ async function waitForPod(
|
||||
const containerStatuses = pod.status?.containerStatuses ?? [];
|
||||
|
||||
// Log phase transitions
|
||||
const statusKey = `${phase}:${initStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.terminated?.reason ?? "ok").join(",")}:${containerStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.running ? "running" : "waiting").join(",")}`;
|
||||
const statusKey = `${phase}:${initStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.terminated?.reason ?? "ok").join(",")}:${containerStatuses.map((s) => s.state?.waiting?.reason ?? (s.state?.running ? "running" : "waiting")).join(",")}`;
|
||||
if (statusKey !== lastStatus) {
|
||||
const details: string[] = [`phase=${phase}`];
|
||||
for (const init of initStatuses) {
|
||||
@@ -301,7 +301,10 @@ export async function streamPodLogsOnce(
|
||||
if (stopSignal.stopped) {
|
||||
if (!writable.destroyed) writable.destroy();
|
||||
if (!bailTimer && bailResolve) {
|
||||
bailTimer = setTimeout(bailResolve, LOG_STREAM_BAIL_TIMEOUT_MS);
|
||||
bailTimer = setTimeout(() => {
|
||||
onLog("stderr", "[paperclip] Log stream bail timer fired — forcing return\n").catch(() => {});
|
||||
bailResolve!();
|
||||
}, LOG_STREAM_BAIL_TIMEOUT_MS);
|
||||
}
|
||||
}
|
||||
}, 200);
|
||||
@@ -345,6 +348,7 @@ async function streamPodLogs(
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
stopSignal?: { stopped: boolean },
|
||||
dedup?: LogLineDedupFilter,
|
||||
): Promise<string> {
|
||||
const allChunks: string[] = [];
|
||||
let attempt = 0;
|
||||
@@ -354,7 +358,7 @@ async function streamPodLogs(
|
||||
let lastLogReceivedAt = Math.floor(Date.now() / 1000);
|
||||
// Shared across reconnects so replayed lines inside the `sinceSeconds`
|
||||
// overlap window are dropped before they reach the streaming UI (FAR-123).
|
||||
const dedup = new LogLineDedupFilter();
|
||||
if (!dedup) dedup = new LogLineDedupFilter();
|
||||
|
||||
while (!stopSignal?.stopped) {
|
||||
if (attempt >= MAX_LOG_RECONNECT_ATTEMPTS) {
|
||||
@@ -751,11 +755,18 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
}
|
||||
|
||||
// Create the Job
|
||||
let createdJobUid: string | undefined;
|
||||
try {
|
||||
await batchApi.createNamespacedJob({ namespace, body: job });
|
||||
const created = await batchApi.createNamespacedJob({ namespace, body: job });
|
||||
createdJobUid = created.metadata?.uid;
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
|
||||
if (promptSecret) {
|
||||
try {
|
||||
await coreApi.deleteNamespacedSecret({ name: promptSecret.name, namespace: promptSecret.namespace });
|
||||
} catch { /* best-effort */ }
|
||||
}
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
@@ -765,6 +776,35 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
};
|
||||
}
|
||||
|
||||
// Attach ownerReference so K8s GC cleans up the Secret if the process
|
||||
// crashes before the finally block runs.
|
||||
if (promptSecret && createdJobUid) {
|
||||
try {
|
||||
await coreApi.patchNamespacedSecret({
|
||||
name: promptSecret.name,
|
||||
namespace: promptSecret.namespace,
|
||||
body: [
|
||||
{
|
||||
op: "add",
|
||||
path: "/metadata/ownerReferences",
|
||||
value: [
|
||||
{
|
||||
apiVersion: "batch/v1",
|
||||
kind: "Job",
|
||||
name: jobName,
|
||||
uid: createdJobUid,
|
||||
blockOwnerDeletion: false,
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
});
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Warning: failed to set ownerReference on prompt Secret: ${msg}\n`);
|
||||
}
|
||||
}
|
||||
|
||||
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
|
||||
}
|
||||
|
||||
@@ -853,6 +893,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
let keepaliveTick = 0;
|
||||
let keepaliveJobTerminal = false;
|
||||
let keepaliveJobTerminalAt: number | null = null;
|
||||
let consecutiveTerminalReadings = 0;
|
||||
keepaliveTimer = setInterval(() => {
|
||||
// Fire-and-forget the async work; setInterval callbacks must be
|
||||
// synchronous or the timer will drift.
|
||||
@@ -875,19 +916,32 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
}
|
||||
|
||||
// Verify the Job is still alive before announcing or refreshing.
|
||||
// Require two consecutive terminal readings before latching to
|
||||
// guard against a stale K8s API cache returning a false terminal
|
||||
// status on a single read (finding #5, FAR-15).
|
||||
try {
|
||||
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
|
||||
const terminal = job.status?.conditions?.some(
|
||||
(c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True",
|
||||
);
|
||||
if (terminal) {
|
||||
keepaliveJobTerminal = true;
|
||||
keepaliveJobTerminalAt = Date.now();
|
||||
if (ctx.onSpawn) {
|
||||
consecutiveTerminalReadings++;
|
||||
if (consecutiveTerminalReadings >= 2) {
|
||||
keepaliveJobTerminal = true;
|
||||
keepaliveJobTerminalAt = Date.now();
|
||||
if (ctx.onSpawn) {
|
||||
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
|
||||
}
|
||||
return;
|
||||
}
|
||||
// First terminal reading — do not latch yet; next tick confirms.
|
||||
keepaliveTick++;
|
||||
if (ctx.onSpawn && (keepaliveTick === 1 || keepaliveTick % 12 === 0)) {
|
||||
void ctx.onSpawn({ pid: process.pid, processGroupId: null, startedAt: new Date().toISOString() }).catch(() => {});
|
||||
}
|
||||
return;
|
||||
}
|
||||
consecutiveTerminalReadings = 0;
|
||||
} catch (err: unknown) {
|
||||
// Only treat 404 (Job deleted) as terminal. Transient 5xx or
|
||||
// connection resets should NOT permanently disable the keepalive —
|
||||
@@ -928,9 +982,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// Shared signal: when job completion resolves, tell the log
|
||||
// streamer to stop reconnecting.
|
||||
const logStopSignal = { stopped: false };
|
||||
// Shared dedup filter: created here so the one-shot fallback can
|
||||
// reuse it and avoid pushing already-sent lines to the UI (finding #6, FAR-15).
|
||||
const logDedup = new LogLineDedupFilter();
|
||||
|
||||
const [logResult, completionResult] = await Promise.allSettled([
|
||||
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal),
|
||||
streamPodLogs(namespace, podName, wrappedOnLog, kubeconfigPath, logStopSignal, logDedup),
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath).then((r) => {
|
||||
logStopSignal.stopped = true;
|
||||
return r;
|
||||
@@ -958,7 +1015,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// from the beginning of the log, giving us the full output.
|
||||
// We use a cheap string scan for the result-event guard (avoids a full JSON parse here;
|
||||
// the authoritative parse happens once below after all fallbacks complete).
|
||||
const hasResultEvent = stdout.includes('"type":"result"');
|
||||
const hasResultEvent = stdout.split("\n").some((l) => { try { return JSON.parse(l).type === "result"; } catch { return false; } });
|
||||
const needsOneShot = !stdout.trim() || (stdout.trim() && !hasResultEvent);
|
||||
if (needsOneShot) {
|
||||
if (!stdout.trim()) {
|
||||
@@ -967,9 +1024,12 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
const oneShotLogs = await readPodLogs(namespace, podName, kubeconfigPath);
|
||||
if (!stdout.trim() && oneShotLogs.trim()) {
|
||||
stdout = oneShotLogs;
|
||||
await onLog("stdout", stdout);
|
||||
const deduped = logDedup.filter(stdout) + logDedup.flush();
|
||||
if (deduped) await onLog("stdout", deduped);
|
||||
} else if (oneShotLogs && oneShotLogs.length > stdout.length) {
|
||||
await onLog("stdout", `[paperclip] Log stream captured partial output — supplemental one-shot read returned more content.\n`);
|
||||
const deduped = logDedup.filter(oneShotLogs) + logDedup.flush();
|
||||
if (deduped) await onLog("stdout", deduped);
|
||||
stdout = oneShotLogs;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -438,10 +438,10 @@ describe("buildJobManifest", () => {
|
||||
|
||||
it("uses configured resource overrides", () => {
|
||||
ctx.config = {
|
||||
resources: {
|
||||
requests: { cpu: "500m", memory: "1Gi" },
|
||||
limits: { cpu: "2000m", memory: "4Gi" },
|
||||
},
|
||||
"resources.requests.cpu": "500m",
|
||||
"resources.requests.memory": "1Gi",
|
||||
"resources.limits.cpu": "2000m",
|
||||
"resources.limits.memory": "4Gi",
|
||||
};
|
||||
const { job } = buildJobManifest({ ctx, selfPod });
|
||||
const resources = job.spec?.template?.spec?.containers[0]?.resources;
|
||||
@@ -802,6 +802,28 @@ describe("buildJobManifest", () => {
|
||||
expect(filterScript).toContain("tool_result");
|
||||
});
|
||||
|
||||
it("filter script truncates without corrupting multi-byte UTF-8", () => {
|
||||
// "中" is U+4E2D, 3 bytes in UTF-8: E4 B8 AD
|
||||
// With MAX=5, two "中" (6 bytes) should truncate to one (3 bytes), not
|
||||
// produce a replacement character from slicing mid-codepoint.
|
||||
const setup = buildRtkSetupCommands(5);
|
||||
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
|
||||
const filterScript = Buffer.from(b64Matches[0]![1], "base64").toString("utf-8");
|
||||
|
||||
// Extract the trunc function from the filter script and evaluate it
|
||||
const fnMatch = filterScript.match(/(function trunc\(s\)\{.*\})(?=const tr=)/);
|
||||
expect(fnMatch).toBeTruthy();
|
||||
// eslint-disable-next-line no-eval
|
||||
const trunc = eval(`(()=>{const MAX=5;${fnMatch![1]};return trunc;})()`);
|
||||
|
||||
const result = trunc("中中");
|
||||
expect(result).not.toContain("�");
|
||||
expect(result).toContain("中");
|
||||
expect(result).toContain("truncated by paperclip-rtk");
|
||||
// Should report bytes from the actual truncation point, not MAX
|
||||
expect(result).toContain("3 bytes truncated");
|
||||
});
|
||||
|
||||
it("filter script handles array content (block format)", () => {
|
||||
const setup = buildRtkSetupCommands(50000);
|
||||
const b64Matches = [...setup.matchAll(/Buffer\.from\('([A-Za-z0-9+/=]+)','base64'\)/g)];
|
||||
|
||||
+11
-10
@@ -47,7 +47,8 @@ export function buildRtkSetupCommands(maxOutputBytes: number): string {
|
||||
`if(typeof s!=='string')return s;`,
|
||||
`const b=Buffer.from(s,'utf-8');`,
|
||||
`if(b.length<=MAX)return s;`,
|
||||
`return b.slice(0,MAX).toString('utf-8')+'\\n[...'+(b.length-MAX)+' bytes truncated by paperclip-rtk]';`,
|
||||
`let e=MAX;if(e>0){let p=e-1;while(p>0&&(b[p]&0xC0)===0x80)p--;const l=b[p];let n=1;if((l&0xE0)===0xC0)n=2;else if((l&0xF0)===0xE0)n=3;else if((l&0xF8)===0xF0)n=4;if(p+n>e)e=p;}`,
|
||||
`return b.slice(0,e).toString('utf-8')+'\\n[...'+(b.length-e)+' bytes truncated by paperclip-rtk]';`,
|
||||
`}`,
|
||||
`const tr=o&&(o.tool_response||o.tool_result);`,
|
||||
`if(tr){`,
|
||||
@@ -202,7 +203,9 @@ export interface JobBuildResult {
|
||||
}
|
||||
|
||||
function sanitizeForK8sName(value: string, maxLen = 16): string {
|
||||
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen);
|
||||
// Trim trailing hyphens after slicing so names don't end with `-` when
|
||||
// truncation lands on a hyphen boundary (finding #16, FAR-15).
|
||||
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, maxLen).replace(/-+$/, "");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -345,7 +348,6 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
const extraArgs = asStringArray(config.extraArgs);
|
||||
const timeoutSec = asNumber(config.timeoutSec, 0);
|
||||
const ttlSeconds = asNumber(config.ttlSecondsAfterFinished, 300);
|
||||
const resources = parseObject(config.resources);
|
||||
const nodeSelector = parseKeyValueConfig(config.nodeSelector);
|
||||
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
|
||||
const extraLabels = parseKeyValueConfig(config.labels);
|
||||
@@ -427,17 +429,16 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
// Build env vars
|
||||
const envVars = buildEnvVars(ctx, selfPod, config);
|
||||
|
||||
// Resource defaults
|
||||
const resourceRequests = parseObject(resources.requests);
|
||||
const resourceLimits = parseObject(resources.limits);
|
||||
// Resource defaults — UI stores dotted keys (e.g. "resources.requests.cpu")
|
||||
// as flat config entries, so read them directly from config with the dotted key.
|
||||
const containerResources: k8s.V1ResourceRequirements = {
|
||||
requests: {
|
||||
cpu: asString(resourceRequests.cpu, "1000m"),
|
||||
memory: asString(resourceRequests.memory, "2Gi"),
|
||||
cpu: asString(config["resources.requests.cpu"], "1000m"),
|
||||
memory: asString(config["resources.requests.memory"], "2Gi"),
|
||||
},
|
||||
limits: {
|
||||
cpu: asString(resourceLimits.cpu, "4000m"),
|
||||
memory: asString(resourceLimits.memory, "8Gi"),
|
||||
cpu: asString(config["resources.limits.cpu"], "4000m"),
|
||||
memory: asString(config["resources.limits.memory"], "8Gi"),
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -106,7 +106,12 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise<SelfPodIn
|
||||
throw new Error(`claude_k8s: pod ${hostname} has no spec`);
|
||||
}
|
||||
|
||||
const mainContainer = spec.containers[0];
|
||||
// Match the Paperclip container by name ("paperclip") to avoid service-mesh
|
||||
// sidecars or other injected containers being picked up as the source of
|
||||
// truth for the Job spec (finding #9, FAR-15). Fall back to the first
|
||||
// container if no name match is found (matches prior behavior).
|
||||
const mainContainer =
|
||||
spec.containers.find((c) => c.name === "paperclip") ?? spec.containers[0];
|
||||
if (!mainContainer?.image) {
|
||||
throw new Error(`claude_k8s: pod ${hostname} has no container image`);
|
||||
}
|
||||
|
||||
+15
-6
@@ -9,9 +9,12 @@ export function parseClaudeStreamJson(stdout: string) {
|
||||
let model = "";
|
||||
let finalResult: Record<string, unknown> | null = null;
|
||||
const assistantTexts: string[] = [];
|
||||
// Belt-and-braces dedup: track seen text blocks to filter duplicates
|
||||
// caused by log stream reconnects replaying overlapping windows.
|
||||
const seenTexts = new Set<string>();
|
||||
// Belt-and-braces dedup: key by (message.id, textIndex) so a session that
|
||||
// legitimately emits the same text twice in different turns isn't collapsed
|
||||
// (finding #11, FAR-15). The log-dedup filter handles reconnect overlaps
|
||||
// at the line level; this guard only needs to protect against the same
|
||||
// message block being parsed twice.
|
||||
const seenBlocks = new Set<string>();
|
||||
|
||||
for (const rawLine of stdout.split(/\r?\n/)) {
|
||||
const line = rawLine.trim();
|
||||
@@ -29,14 +32,20 @@ export function parseClaudeStreamJson(stdout: string) {
|
||||
if (type === "assistant") {
|
||||
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
|
||||
const message = parseObject(event.message);
|
||||
const messageId = asString(message.id, "");
|
||||
const content = Array.isArray(message.content) ? message.content : [];
|
||||
for (const entry of content) {
|
||||
for (let i = 0; i < content.length; i++) {
|
||||
const entry = content[i];
|
||||
if (typeof entry !== "object" || entry === null || Array.isArray(entry)) continue;
|
||||
const block = entry as Record<string, unknown>;
|
||||
if (asString(block.type, "") === "text") {
|
||||
const text = asString(block.text, "");
|
||||
if (text && !seenTexts.has(text)) {
|
||||
seenTexts.add(text);
|
||||
if (!text) continue;
|
||||
// Prefer (messageId, index) when the message has an id; fall back
|
||||
// to text content when it doesn't (legacy/partial events).
|
||||
const key = messageId ? `${messageId}:${i}` : `text:${text}`;
|
||||
if (!seenBlocks.has(key)) {
|
||||
seenBlocks.add(key);
|
||||
assistantTexts.push(text);
|
||||
}
|
||||
}
|
||||
|
||||
+16
-9
@@ -85,8 +85,13 @@ async function checkRbac(
|
||||
{ resource: "jobs", group: "batch", verb: "create", code: "k8s_rbac_job_create", label: "create Jobs" },
|
||||
{ resource: "jobs", group: "batch", verb: "delete", code: "k8s_rbac_job_delete", label: "delete Jobs" },
|
||||
{ resource: "jobs", group: "batch", verb: "get", code: "k8s_rbac_job_get", label: "get Jobs" },
|
||||
{ resource: "jobs", group: "batch", verb: "list", code: "k8s_rbac_job_list", label: "list Jobs" },
|
||||
{ resource: "pods", group: "", verb: "list", code: "k8s_rbac_pod_list", label: "list Pods" },
|
||||
{ resource: "pods/log", group: "", verb: "get", code: "k8s_rbac_pod_log", label: "get Pod logs" },
|
||||
{ resource: "secrets", group: "", verb: "create", code: "k8s_rbac_secret_create", label: "create Secrets" },
|
||||
{ resource: "secrets", group: "", verb: "delete", code: "k8s_rbac_secret_delete", label: "delete Secrets" },
|
||||
{ resource: "secrets", group: "", verb: "get", code: "k8s_rbac_secret_get", label: "get Secrets" },
|
||||
{ resource: "persistentvolumeclaims", group: "", verb: "get", code: "k8s_rbac_pvc_get", label: "get PersistentVolumeClaims" },
|
||||
];
|
||||
|
||||
for (const check of rbacChecks) {
|
||||
@@ -221,16 +226,18 @@ export async function testEnvironment(
|
||||
|
||||
// 2. Target namespace exists
|
||||
const nsOk = await checkNamespace(namespace, selfPod.namespace, checks, kubeconfigPath);
|
||||
if (!nsOk) {
|
||||
return { adapterType: ctx.adapterType, status: summarizeStatus(checks), checks, testedAt: new Date().toISOString() };
|
||||
}
|
||||
|
||||
// 3-5. Run remaining checks in parallel
|
||||
await Promise.all([
|
||||
checkRbac(namespace, checks, kubeconfigPath),
|
||||
checkSecret(namespace, secretRef, checks, kubeconfigPath),
|
||||
checkPvc(selfPod, checks, kubeconfigPath),
|
||||
]);
|
||||
// 3-5. Run remaining checks even if namespace check failed so operators see
|
||||
// all issues at once instead of fixing them one at a time.
|
||||
if (nsOk) {
|
||||
await Promise.all([
|
||||
checkRbac(namespace, checks, kubeconfigPath),
|
||||
checkSecret(namespace, secretRef, checks, kubeconfigPath),
|
||||
checkPvc(selfPod, checks, kubeconfigPath),
|
||||
]);
|
||||
} else {
|
||||
await checkRbac(namespace, checks, kubeconfigPath);
|
||||
}
|
||||
|
||||
return {
|
||||
adapterType: ctx.adapterType,
|
||||
|
||||
Reference in New Issue
Block a user