@@ -0,0 +1,451 @@
|
||||
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
|
||||
import { asString, asNumber, asBoolean, parseObject } from "@paperclipai/adapter-utils/server-utils";
|
||||
|
||||
function inferOpenAiCompatibleBiller(env: Record<string, string>, _fallback: string | null): string | null {
|
||||
if (env.OPENROUTER_API_KEY) return "openrouter";
|
||||
if (env.OPENAI_BASE_URL?.includes("openrouter")) return "openrouter";
|
||||
if (env.OPENAI_API_KEY) return "openai";
|
||||
return null;
|
||||
}
|
||||
import {
|
||||
parseOpenCodeJsonl,
|
||||
isOpenCodeUnknownSessionError,
|
||||
} from "./parse.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
|
||||
import { buildJobManifest } from "./job-manifest.js";
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import { Writable } from "node:stream";
|
||||
|
||||
const POLL_INTERVAL_MS = 2000;
|
||||
|
||||
function parseModelProvider(model: string | null): string | null {
|
||||
if (!model) return null;
|
||||
const trimmed = model.trim();
|
||||
if (!trimmed.includes("/")) return null;
|
||||
return trimmed.slice(0, trimmed.indexOf("/")).trim() || null;
|
||||
}
|
||||
|
||||
async function waitForPod(
|
||||
namespace: string,
|
||||
jobName: string,
|
||||
timeoutMs: number,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
): Promise<string> {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
const labelSelector = `job-name=${jobName}`;
|
||||
|
||||
await onLog("stdout", `[paperclip] Waiting for pod to be scheduled (job: ${jobName})...\n`);
|
||||
|
||||
let lastStatus = "";
|
||||
while (Date.now() < deadline) {
|
||||
const podList = await coreApi.listNamespacedPod({
|
||||
namespace,
|
||||
labelSelector,
|
||||
});
|
||||
const pod = podList.items[0];
|
||||
|
||||
if (!pod) {
|
||||
if (lastStatus !== "no-pod") {
|
||||
await onLog("stdout", `[paperclip] Waiting for Job controller to create pod...\n`);
|
||||
lastStatus = "no-pod";
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||
continue;
|
||||
}
|
||||
|
||||
const podName = pod.metadata?.name ?? "unknown";
|
||||
const phase = pod.status?.phase ?? "Unknown";
|
||||
const initStatuses = pod.status?.initContainerStatuses ?? [];
|
||||
const containerStatuses = pod.status?.containerStatuses ?? [];
|
||||
|
||||
const statusKey = `${phase}:${initStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.terminated?.reason ?? "ok").join(",")}:${containerStatuses.map((s) => s.state?.waiting?.reason ?? s.state?.running ? "running" : "waiting").join(",")}`;
|
||||
if (statusKey !== lastStatus) {
|
||||
const details: string[] = [`phase=${phase}`];
|
||||
for (const init of initStatuses) {
|
||||
if (init.state?.waiting) details.push(`init/${init.name}: waiting (${init.state.waiting.reason ?? "unknown"})`);
|
||||
else if (init.state?.running) details.push(`init/${init.name}: running`);
|
||||
else if (init.state?.terminated) details.push(`init/${init.name}: done (exit ${init.state.terminated.exitCode})`);
|
||||
}
|
||||
for (const cs of containerStatuses) {
|
||||
if (cs.state?.waiting) details.push(`${cs.name}: waiting (${cs.state.waiting.reason ?? "unknown"})`);
|
||||
else if (cs.state?.running) details.push(`${cs.name}: running`);
|
||||
}
|
||||
await onLog("stdout", `[paperclip] Pod ${podName}: ${details.join(", ")}\n`);
|
||||
lastStatus = statusKey;
|
||||
}
|
||||
|
||||
if (phase === "Running" || phase === "Succeeded" || phase === "Failed") {
|
||||
return podName;
|
||||
}
|
||||
|
||||
const allInitsDone = initStatuses.length > 0 && initStatuses.every(
|
||||
(s) => s.state?.terminated?.exitCode === 0,
|
||||
);
|
||||
const mainRunning = containerStatuses.some((s) => s.state?.running);
|
||||
if (allInitsDone && mainRunning) {
|
||||
return podName;
|
||||
}
|
||||
|
||||
for (const init of initStatuses) {
|
||||
const terminated = init.state?.terminated;
|
||||
if (terminated && (terminated.exitCode ?? 0) !== 0) {
|
||||
throw new Error(`Init container "${init.name}" failed with exit code ${terminated.exitCode}: ${terminated.reason ?? terminated.message ?? "unknown"}`);
|
||||
}
|
||||
const waiting = init.state?.waiting;
|
||||
if (waiting?.reason === "ErrImagePull" || waiting?.reason === "ImagePullBackOff") {
|
||||
throw new Error(`Init container "${init.name}" image pull failed: ${waiting.message ?? waiting.reason}`);
|
||||
}
|
||||
if (waiting?.reason === "CrashLoopBackOff") {
|
||||
throw new Error(`Init container "${init.name}" crash loop: ${waiting.message ?? waiting.reason}`);
|
||||
}
|
||||
}
|
||||
|
||||
const conditions = pod.status?.conditions ?? [];
|
||||
const unschedulable = conditions.find(
|
||||
(c) => c.type === "PodScheduled" && c.status === "False" && c.reason === "Unschedulable",
|
||||
);
|
||||
if (unschedulable) {
|
||||
throw new Error(`Pod unschedulable: ${unschedulable.message ?? "insufficient resources"}`);
|
||||
}
|
||||
|
||||
for (const cs of containerStatuses) {
|
||||
const waiting = cs.state?.waiting;
|
||||
if (waiting?.reason === "ErrImagePull" || waiting?.reason === "ImagePullBackOff") {
|
||||
throw new Error(`Image pull failed for "${cs.name}": ${waiting.message ?? waiting.reason}`);
|
||||
}
|
||||
if (waiting?.reason === "CrashLoopBackOff") {
|
||||
throw new Error(`Container "${cs.name}" crash loop: ${waiting.message ?? waiting.reason}`);
|
||||
}
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||
}
|
||||
|
||||
throw new Error(`Timed out waiting for pod to be scheduled (${Math.round(timeoutMs / 1000)}s)`);
|
||||
}
|
||||
|
||||
async function streamPodLogs(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
): Promise<string> {
|
||||
const logApi = getLogApi(kubeconfigPath);
|
||||
const chunks: string[] = [];
|
||||
|
||||
const writable = new Writable({
|
||||
write(chunk: Buffer, _encoding, callback) {
|
||||
const text = chunk.toString("utf-8");
|
||||
chunks.push(text);
|
||||
void onLog("stdout", text).then(() => callback(), callback);
|
||||
},
|
||||
});
|
||||
|
||||
try {
|
||||
await logApi.log(namespace, podName, "opencode", writable, {
|
||||
follow: true,
|
||||
pretty: false,
|
||||
});
|
||||
} catch {
|
||||
// follow may fail if the container already exited
|
||||
}
|
||||
|
||||
return chunks.join("");
|
||||
}
|
||||
|
||||
async function readPodLogs(
|
||||
namespace: string,
|
||||
podName: string,
|
||||
kubeconfigPath?: string,
|
||||
): Promise<string> {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
try {
|
||||
const log = await coreApi.readNamespacedPodLog({
|
||||
name: podName,
|
||||
namespace,
|
||||
container: "opencode",
|
||||
});
|
||||
return typeof log === "string" ? log : "";
|
||||
} catch {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForJobCompletion(
|
||||
namespace: string,
|
||||
jobName: string,
|
||||
timeoutMs: number,
|
||||
kubeconfigPath?: string,
|
||||
): Promise<{ succeeded: boolean; timedOut: boolean }> {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : 0;
|
||||
|
||||
while (deadline === 0 || Date.now() < deadline) {
|
||||
const job = await batchApi.readNamespacedJob({ name: jobName, namespace });
|
||||
const conditions = job.status?.conditions ?? [];
|
||||
|
||||
const complete = conditions.find((c) => c.type === "Complete" && c.status === "True");
|
||||
if (complete) return { succeeded: true, timedOut: false };
|
||||
|
||||
const failed = conditions.find((c) => c.type === "Failed" && c.status === "True");
|
||||
if (failed) {
|
||||
const isDeadlineExceeded = failed.reason === "DeadlineExceeded";
|
||||
return { succeeded: false, timedOut: isDeadlineExceeded };
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||
}
|
||||
|
||||
return { succeeded: false, timedOut: true };
|
||||
}
|
||||
|
||||
async function getPodExitCode(namespace: string, jobName: string, kubeconfigPath?: string): Promise<number | null> {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
const podList = await coreApi.listNamespacedPod({
|
||||
namespace,
|
||||
labelSelector: `job-name=${jobName}`,
|
||||
});
|
||||
const pod = podList.items[0];
|
||||
if (!pod) return null;
|
||||
|
||||
const containerStatus = pod.status?.containerStatuses?.find((s) => s.name === "opencode");
|
||||
return containerStatus?.state?.terminated?.exitCode ?? null;
|
||||
}
|
||||
|
||||
async function cleanupJob(
|
||||
namespace: string,
|
||||
jobName: string,
|
||||
onLog: AdapterExecutionContext["onLog"],
|
||||
kubeconfigPath?: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
await batchApi.deleteNamespacedJob({
|
||||
name: jobName,
|
||||
namespace,
|
||||
body: { propagationPolicy: "Background" },
|
||||
});
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Warning: failed to cleanup job ${jobName}: ${msg}\n`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExecutionResult> {
|
||||
const { runId, runtime, config: rawConfig, onLog, onMeta } = ctx;
|
||||
const config = parseObject(rawConfig);
|
||||
const timeoutSec = asNumber(config.timeoutSec, 0);
|
||||
const graceSec = asNumber(config.graceSec, 60);
|
||||
const retainJobs = asBoolean(config.retainJobs, false);
|
||||
const kubeconfigPath = asString(config.kubeconfig, "") || undefined;
|
||||
const model = asString(config.model, "").trim();
|
||||
|
||||
// Guard: single concurrency per agent (shared PVC/session)
|
||||
const agentId = ctx.agent.id;
|
||||
const selfPod = await getSelfPodInfo(kubeconfigPath);
|
||||
const guardNamespace = asString(config.namespace, "") || selfPod.namespace;
|
||||
try {
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
const existing = await batchApi.listNamespacedJob({
|
||||
namespace: guardNamespace,
|
||||
labelSelector: `paperclip.io/agent-id=${agentId},paperclip.io/adapter-type=opencode_k8s`,
|
||||
});
|
||||
const running = existing.items.filter(
|
||||
(j) => !j.status?.conditions?.some((c) => (c.type === "Complete" || c.type === "Failed") && c.status === "True"),
|
||||
);
|
||||
if (running.length > 0) {
|
||||
const names = running.map((j) => j.metadata?.name).join(", ");
|
||||
await onLog("stderr", `[paperclip] Concurrent run blocked: existing Job(s) still running for this agent: ${names}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Concurrent run blocked: Job ${names} is still running for this agent`,
|
||||
errorCode: "k8s_concurrent_run_blocked",
|
||||
};
|
||||
}
|
||||
} catch {
|
||||
// If we can't check, proceed — heartbeat service enforces concurrency too
|
||||
}
|
||||
|
||||
const { job, jobName, namespace, prompt, opencodeArgs, promptMetrics } = buildJobManifest({
|
||||
ctx,
|
||||
selfPod,
|
||||
});
|
||||
|
||||
if (onMeta) {
|
||||
await onMeta({
|
||||
adapterType: "opencode_k8s",
|
||||
command: `kubectl job/${jobName}`,
|
||||
cwd: namespace,
|
||||
commandArgs: opencodeArgs,
|
||||
commandNotes: [
|
||||
`Image: ${job.spec?.template.spec?.containers[0]?.image ?? "unknown"}`,
|
||||
`Namespace: ${namespace}`,
|
||||
`Timeout: ${timeoutSec}s`,
|
||||
],
|
||||
prompt,
|
||||
...(promptMetrics ? { promptMetrics } : {}),
|
||||
context: ctx.context,
|
||||
} as Parameters<typeof onMeta>[0]);
|
||||
}
|
||||
|
||||
const batchApi = getBatchApi(kubeconfigPath);
|
||||
try {
|
||||
await batchApi.createNamespacedJob({ namespace, body: job });
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Failed to create K8s Job: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Failed to create Kubernetes Job: ${msg}`,
|
||||
errorCode: "k8s_job_create_failed",
|
||||
};
|
||||
}
|
||||
|
||||
await onLog("stdout", `[paperclip] Created K8s Job: ${jobName} in namespace ${namespace} (deadline: ${timeoutSec > 0 ? `${timeoutSec}s` : "none"})\n`);
|
||||
|
||||
let stdout = "";
|
||||
let exitCode: number | null = null;
|
||||
let jobTimedOut = false;
|
||||
|
||||
try {
|
||||
const scheduleTimeoutMs = 120_000;
|
||||
let podName: string;
|
||||
try {
|
||||
podName = await waitForPod(namespace, jobName, scheduleTimeoutMs, onLog, kubeconfigPath);
|
||||
await onLog("stdout", `[paperclip] Pod running: ${podName}\n`);
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
await onLog("stderr", `[paperclip] Pod scheduling failed: ${msg}\n`);
|
||||
return {
|
||||
exitCode: null,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: `Pod scheduling failed: ${msg}`,
|
||||
errorCode: "k8s_pod_schedule_failed",
|
||||
};
|
||||
}
|
||||
|
||||
const completionTimeoutMs = timeoutSec > 0 ? (timeoutSec + graceSec) * 1000 : 0;
|
||||
|
||||
const [logResult, completionResult] = await Promise.allSettled([
|
||||
streamPodLogs(namespace, podName, onLog, kubeconfigPath),
|
||||
waitForJobCompletion(namespace, jobName, completionTimeoutMs, kubeconfigPath),
|
||||
]);
|
||||
|
||||
if (logResult.status === "fulfilled") {
|
||||
stdout = logResult.value;
|
||||
}
|
||||
|
||||
if (!stdout.trim()) {
|
||||
await onLog("stdout", `[paperclip] Log stream returned empty — reading pod logs directly...\n`);
|
||||
stdout = await readPodLogs(namespace, podName, kubeconfigPath);
|
||||
if (stdout.trim()) {
|
||||
await onLog("stdout", stdout);
|
||||
}
|
||||
}
|
||||
|
||||
if (completionResult.status === "fulfilled") {
|
||||
jobTimedOut = completionResult.value.timedOut;
|
||||
} else {
|
||||
jobTimedOut = true;
|
||||
}
|
||||
|
||||
exitCode = await getPodExitCode(namespace, jobName, kubeconfigPath);
|
||||
} finally {
|
||||
if (!retainJobs) {
|
||||
await cleanupJob(namespace, jobName, onLog, kubeconfigPath);
|
||||
} else {
|
||||
await onLog("stdout", `[paperclip] Retaining job ${jobName} for debugging (retainJobs=true)\n`);
|
||||
}
|
||||
}
|
||||
|
||||
if (jobTimedOut) {
|
||||
return {
|
||||
exitCode,
|
||||
signal: null,
|
||||
timedOut: true,
|
||||
errorMessage: `Timed out after ${timeoutSec}s`,
|
||||
errorCode: "timeout",
|
||||
};
|
||||
}
|
||||
|
||||
// Parse OpenCode JSONL output
|
||||
const parsed = parseOpenCodeJsonl(stdout);
|
||||
|
||||
const runtimeSessionParams = parseObject(runtime.sessionParams);
|
||||
const fallbackSessionId = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
|
||||
const workspaceContext = parseObject(ctx.context.paperclipWorkspace);
|
||||
const workspaceId = asString(workspaceContext.workspaceId, "") || null;
|
||||
const workspaceRepoUrl = asString(workspaceContext.repoUrl, "") || null;
|
||||
const workspaceRepoRef = asString(workspaceContext.repoRef, "") || null;
|
||||
const cwd = asString(workspaceContext.cwd, "");
|
||||
|
||||
const resolvedSessionId = parsed.sessionId ?? (fallbackSessionId || null);
|
||||
const resolvedSessionParams = resolvedSessionId
|
||||
? {
|
||||
sessionId: resolvedSessionId,
|
||||
...(cwd ? { cwd } : {}),
|
||||
...(workspaceId ? { workspaceId } : {}),
|
||||
...(workspaceRepoUrl ? { repoUrl: workspaceRepoUrl } : {}),
|
||||
...(workspaceRepoRef ? { repoRef: workspaceRepoRef } : {}),
|
||||
} as Record<string, unknown>
|
||||
: null;
|
||||
|
||||
const provider = parseModelProvider(model);
|
||||
// Build a minimal env record for biller inference
|
||||
const billerEnv: Record<string, string> = {};
|
||||
for (const key of ["OPENAI_API_KEY", "OPENAI_BASE_URL", "OPENROUTER_API_KEY"]) {
|
||||
const val = process.env[key];
|
||||
if (val) billerEnv[key] = val;
|
||||
}
|
||||
const biller = inferOpenAiCompatibleBiller(billerEnv, null) ?? provider ?? "unknown";
|
||||
|
||||
const parsedError = typeof parsed.errorMessage === "string" ? parsed.errorMessage.trim() : "";
|
||||
const rawExitCode = exitCode;
|
||||
const synthesizedExitCode = parsedError && (rawExitCode ?? 0) === 0 ? 1 : rawExitCode;
|
||||
const failed = (synthesizedExitCode ?? 0) !== 0;
|
||||
|
||||
// If the session was stale, clear it so the next heartbeat starts fresh
|
||||
if (failed && isOpenCodeUnknownSessionError(stdout, parsedError)) {
|
||||
await onLog("stdout", `[paperclip] OpenCode session is unavailable; clearing for next run.\n`);
|
||||
return {
|
||||
exitCode: synthesizedExitCode,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: parsedError || "Session unavailable",
|
||||
errorCode: "session_unavailable",
|
||||
clearSession: true,
|
||||
resultJson: { stdout },
|
||||
};
|
||||
}
|
||||
const firstStderrLine = stdout.split(/\r?\n/).map((l) => l.trim()).find(Boolean) ?? "";
|
||||
const fallbackErrorMessage = parsedError || firstStderrLine || `OpenCode exited with code ${synthesizedExitCode ?? -1}`;
|
||||
|
||||
return {
|
||||
exitCode: synthesizedExitCode,
|
||||
signal: null,
|
||||
timedOut: false,
|
||||
errorMessage: (synthesizedExitCode ?? 0) === 0 ? null : fallbackErrorMessage,
|
||||
usage: {
|
||||
inputTokens: parsed.usage.inputTokens,
|
||||
outputTokens: parsed.usage.outputTokens,
|
||||
cachedInputTokens: parsed.usage.cachedInputTokens,
|
||||
},
|
||||
sessionId: resolvedSessionId,
|
||||
sessionParams: resolvedSessionParams,
|
||||
sessionDisplayId: resolvedSessionId,
|
||||
provider,
|
||||
model: model || null,
|
||||
billingType: "unknown",
|
||||
costUsd: parsed.costUsd,
|
||||
resultJson: { stdout },
|
||||
summary: parsed.summary,
|
||||
clearSession: false,
|
||||
} as AdapterExecutionResult;
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
import type { ServerAdapterModule } from "@paperclipai/adapter-utils";
|
||||
import { type, models, agentConfigurationDoc } from "../index.js";
|
||||
import { execute } from "./execute.js";
|
||||
import { testEnvironment } from "./test.js";
|
||||
import { sessionCodec } from "./session.js";
|
||||
|
||||
export function createServerAdapter(): ServerAdapterModule {
|
||||
return {
|
||||
type,
|
||||
execute,
|
||||
testEnvironment,
|
||||
sessionCodec,
|
||||
models,
|
||||
supportsLocalAgentJwt: true,
|
||||
agentConfigurationDoc,
|
||||
};
|
||||
}
|
||||
|
||||
export { execute, testEnvironment, sessionCodec };
|
||||
@@ -0,0 +1,381 @@
|
||||
import type * as k8s from "@kubernetes/client-node";
|
||||
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
|
||||
import {
|
||||
asString,
|
||||
asNumber,
|
||||
asBoolean,
|
||||
asStringArray,
|
||||
parseObject,
|
||||
buildPaperclipEnv,
|
||||
renderTemplate,
|
||||
} from "@paperclipai/adapter-utils/server-utils";
|
||||
|
||||
function joinPromptSections(sections: string[], separator = "\n\n"): string {
|
||||
return sections.filter((s) => s.trim().length > 0).join(separator);
|
||||
}
|
||||
|
||||
function stringifyPaperclipWakePayload(wake: unknown): string | null {
|
||||
if (!wake || typeof wake !== "object") return null;
|
||||
try {
|
||||
const json = JSON.stringify(wake);
|
||||
return json === "{}" ? null : json;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function renderPaperclipWakePrompt(wake: unknown, _opts?: { resumedSession?: boolean }): string {
|
||||
if (!wake || typeof wake !== "object") return "";
|
||||
const w = wake as Record<string, unknown>;
|
||||
const reason = typeof w.reason === "string" ? w.reason.trim() : "";
|
||||
const comments = Array.isArray(w.comments) ? w.comments : [];
|
||||
if (!reason && comments.length === 0) return "";
|
||||
const parts: string[] = [];
|
||||
if (reason) parts.push(`Wake reason: ${reason}`);
|
||||
for (const c of comments) {
|
||||
if (typeof c === "object" && c !== null) {
|
||||
const comment = c as Record<string, unknown>;
|
||||
const body = typeof comment.body === "string" ? comment.body.trim() : "";
|
||||
if (body) parts.push(`Comment: ${body}`);
|
||||
}
|
||||
}
|
||||
return parts.join("\n\n");
|
||||
}
|
||||
import type { SelfPodInfo } from "./k8s-client.js";
|
||||
|
||||
export interface JobBuildInput {
|
||||
ctx: AdapterExecutionContext;
|
||||
selfPod: SelfPodInfo;
|
||||
}
|
||||
|
||||
export interface JobBuildResult {
|
||||
job: k8s.V1Job;
|
||||
jobName: string;
|
||||
namespace: string;
|
||||
prompt: string;
|
||||
opencodeArgs: string[];
|
||||
promptMetrics: Record<string, number>;
|
||||
}
|
||||
|
||||
function sanitizeForK8sName(value: string): string {
|
||||
return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, 8);
|
||||
}
|
||||
|
||||
function buildEnvVars(
|
||||
ctx: AdapterExecutionContext,
|
||||
selfPod: SelfPodInfo,
|
||||
config: Record<string, unknown>,
|
||||
): k8s.V1EnvVar[] {
|
||||
const { runId, agent, context } = ctx;
|
||||
const envConfig = parseObject(config.env);
|
||||
|
||||
// Layer 1: PAPERCLIP_* base vars
|
||||
const paperclipEnv = buildPaperclipEnv(agent);
|
||||
paperclipEnv.PAPERCLIP_RUN_ID = runId;
|
||||
|
||||
const setIfPresent = (envKey: string, value: unknown) => {
|
||||
if (typeof value === "string" && value.trim().length > 0) {
|
||||
paperclipEnv[envKey] = value.trim();
|
||||
}
|
||||
};
|
||||
|
||||
setIfPresent("PAPERCLIP_TASK_ID", context.taskId ?? context.issueId);
|
||||
setIfPresent("PAPERCLIP_WAKE_REASON", context.wakeReason);
|
||||
setIfPresent("PAPERCLIP_WAKE_COMMENT_ID", context.wakeCommentId ?? context.commentId);
|
||||
setIfPresent("PAPERCLIP_APPROVAL_ID", context.approvalId);
|
||||
setIfPresent("PAPERCLIP_APPROVAL_STATUS", context.approvalStatus);
|
||||
|
||||
const wakePayloadJson = stringifyPaperclipWakePayload(context.paperclipWake);
|
||||
if (wakePayloadJson) {
|
||||
paperclipEnv.PAPERCLIP_WAKE_PAYLOAD_JSON = wakePayloadJson;
|
||||
}
|
||||
|
||||
const workspaceContext = parseObject(context.paperclipWorkspace);
|
||||
setIfPresent("PAPERCLIP_WORKSPACE_CWD", workspaceContext.cwd);
|
||||
setIfPresent("PAPERCLIP_WORKSPACE_SOURCE", workspaceContext.source);
|
||||
setIfPresent("PAPERCLIP_WORKSPACE_STRATEGY", workspaceContext.strategy);
|
||||
setIfPresent("PAPERCLIP_WORKSPACE_ID", workspaceContext.workspaceId);
|
||||
setIfPresent("PAPERCLIP_WORKSPACE_REPO_URL", workspaceContext.repoUrl);
|
||||
setIfPresent("PAPERCLIP_WORKSPACE_REPO_REF", workspaceContext.repoRef);
|
||||
setIfPresent("PAPERCLIP_WORKSPACE_BRANCH", workspaceContext.branchName);
|
||||
setIfPresent("PAPERCLIP_WORKSPACE_WORKTREE_PATH", workspaceContext.worktreePath);
|
||||
setIfPresent("AGENT_HOME", workspaceContext.agentHome);
|
||||
|
||||
const linkedIssueIds = Array.isArray(context.issueIds)
|
||||
? context.issueIds.filter((v): v is string => typeof v === "string" && v.trim().length > 0)
|
||||
: [];
|
||||
if (linkedIssueIds.length > 0) {
|
||||
paperclipEnv.PAPERCLIP_LINKED_ISSUE_IDS = linkedIssueIds.join(",");
|
||||
}
|
||||
if (Array.isArray(context.paperclipWorkspaces) && context.paperclipWorkspaces.length > 0) {
|
||||
paperclipEnv.PAPERCLIP_WORKSPACES_JSON = JSON.stringify(context.paperclipWorkspaces);
|
||||
}
|
||||
if (Array.isArray(context.paperclipRuntimeServiceIntents) && context.paperclipRuntimeServiceIntents.length > 0) {
|
||||
paperclipEnv.PAPERCLIP_RUNTIME_SERVICE_INTENTS_JSON = JSON.stringify(context.paperclipRuntimeServiceIntents);
|
||||
}
|
||||
if (Array.isArray(context.paperclipRuntimeServices) && context.paperclipRuntimeServices.length > 0) {
|
||||
paperclipEnv.PAPERCLIP_RUNTIME_SERVICES_JSON = JSON.stringify(context.paperclipRuntimeServices);
|
||||
}
|
||||
setIfPresent("PAPERCLIP_RUNTIME_PRIMARY_URL", context.paperclipRuntimePrimaryUrl);
|
||||
|
||||
if (ctx.authToken) {
|
||||
paperclipEnv.PAPERCLIP_API_KEY = ctx.authToken;
|
||||
}
|
||||
|
||||
// Inherit PAPERCLIP_API_URL from Deployment env (in-cluster service URL)
|
||||
if (selfPod.inheritedEnv.PAPERCLIP_API_URL) {
|
||||
paperclipEnv.PAPERCLIP_API_URL = selfPod.inheritedEnv.PAPERCLIP_API_URL;
|
||||
}
|
||||
|
||||
// Layer 3: Inherited from Deployment (Bedrock, API keys, etc.)
|
||||
const merged: Record<string, string> = {
|
||||
...selfPod.inheritedEnv,
|
||||
...paperclipEnv,
|
||||
};
|
||||
|
||||
// Layer 4: User-defined overrides from adapterConfig.env
|
||||
for (const [key, value] of Object.entries(envConfig)) {
|
||||
if (typeof value === "string") merged[key] = value;
|
||||
}
|
||||
|
||||
// OpenCode-specific: prevent project config pollution, always set after user overrides
|
||||
merged.OPENCODE_DISABLE_PROJECT_CONFIG = "true";
|
||||
merged.HOME = "/paperclip";
|
||||
|
||||
// Convert to V1EnvVar array
|
||||
const envVars: k8s.V1EnvVar[] = Object.entries(merged).map(([name, value]) => ({
|
||||
name,
|
||||
value,
|
||||
}));
|
||||
|
||||
return envVars;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the OpenCode runtime config JSON for permission.external_directory=allow.
|
||||
* Returned as a string to be written inside the Job container.
|
||||
*/
|
||||
function buildRuntimeConfigJson(config: Record<string, unknown>): string | null {
|
||||
const skipPermissions = asBoolean(config.dangerouslySkipPermissions, true);
|
||||
if (!skipPermissions) return null;
|
||||
return JSON.stringify({ permission: { external_directory: "allow" } }, null, 2);
|
||||
}
|
||||
|
||||
export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
||||
const { ctx, selfPod } = input;
|
||||
const { runId, agent, runtime, config: rawConfig, context } = ctx;
|
||||
const config = parseObject(rawConfig);
|
||||
|
||||
const namespace = asString(config.namespace, "") || selfPod.namespace;
|
||||
const image = asString(config.image, "") || selfPod.image;
|
||||
const model = asString(config.model, "").trim();
|
||||
const variant = asString(config.variant, "").trim();
|
||||
const extraArgs = asStringArray(config.extraArgs);
|
||||
const timeoutSec = asNumber(config.timeoutSec, 0);
|
||||
const ttlSeconds = asNumber(config.ttlSecondsAfterFinished, 300);
|
||||
const resources = parseObject(config.resources);
|
||||
const nodeSelector = parseObject(config.nodeSelector);
|
||||
const tolerations = Array.isArray(config.tolerations) ? config.tolerations : [];
|
||||
const extraLabels = parseObject(config.labels);
|
||||
|
||||
// Resolve working directory
|
||||
const workspaceContext = parseObject(context.paperclipWorkspace);
|
||||
const workspaceCwd = asString(workspaceContext.cwd, "");
|
||||
const configuredCwd = asString(config.cwd, "");
|
||||
const workingDir = workspaceCwd || configuredCwd || "/paperclip";
|
||||
|
||||
// Job naming
|
||||
const agentSlug = sanitizeForK8sName(agent.id);
|
||||
const runSlug = sanitizeForK8sName(runId);
|
||||
const jobName = `agent-${agentSlug}-${runSlug}`;
|
||||
|
||||
// Build prompt
|
||||
const promptTemplate = asString(
|
||||
config.promptTemplate,
|
||||
"You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.",
|
||||
);
|
||||
const bootstrapPromptTemplate = asString(config.bootstrapPromptTemplate, "");
|
||||
const runtimeSessionParams = parseObject(runtime.sessionParams);
|
||||
const runtimeSessionId = asString(runtimeSessionParams.sessionId, runtime.sessionId ?? "");
|
||||
const templateData = {
|
||||
agentId: agent.id,
|
||||
companyId: agent.companyId,
|
||||
runId,
|
||||
company: { id: agent.companyId },
|
||||
agent,
|
||||
run: { id: runId, source: "on_demand" },
|
||||
context,
|
||||
};
|
||||
const renderedBootstrapPrompt =
|
||||
!runtimeSessionId && bootstrapPromptTemplate.trim().length > 0
|
||||
? renderTemplate(bootstrapPromptTemplate, templateData).trim()
|
||||
: "";
|
||||
const wakePrompt = renderPaperclipWakePrompt(context.paperclipWake, { resumedSession: Boolean(runtimeSessionId) });
|
||||
const shouldUseResumeDeltaPrompt = Boolean(runtimeSessionId) && wakePrompt.length > 0;
|
||||
const renderedPrompt = shouldUseResumeDeltaPrompt ? "" : renderTemplate(promptTemplate, templateData);
|
||||
const sessionHandoffNote = asString(context.paperclipSessionHandoffMarkdown, "").trim();
|
||||
const prompt = joinPromptSections([
|
||||
renderedBootstrapPrompt,
|
||||
wakePrompt,
|
||||
sessionHandoffNote,
|
||||
renderedPrompt,
|
||||
]);
|
||||
const promptMetrics = {
|
||||
promptChars: prompt.length,
|
||||
bootstrapPromptChars: renderedBootstrapPrompt.length,
|
||||
wakePromptChars: wakePrompt.length,
|
||||
sessionHandoffChars: sessionHandoffNote.length,
|
||||
heartbeatPromptChars: renderedPrompt.length,
|
||||
};
|
||||
|
||||
// Build opencode CLI args
|
||||
const opencodeArgs = ["run", "--format", "json"];
|
||||
if (runtimeSessionId) opencodeArgs.push("--session", runtimeSessionId);
|
||||
if (model) opencodeArgs.push("--model", model);
|
||||
if (variant) opencodeArgs.push("--variant", variant);
|
||||
if (extraArgs.length > 0) opencodeArgs.push(...extraArgs);
|
||||
|
||||
// Build env vars
|
||||
const envVars = buildEnvVars(ctx, selfPod, config);
|
||||
|
||||
// Runtime config for permissions
|
||||
const runtimeConfigJson = buildRuntimeConfigJson(config);
|
||||
|
||||
// Resource defaults
|
||||
const resourceRequests = parseObject(resources.requests);
|
||||
const resourceLimits = parseObject(resources.limits);
|
||||
const containerResources: k8s.V1ResourceRequirements = {
|
||||
requests: {
|
||||
cpu: asString(resourceRequests.cpu, "1000m"),
|
||||
memory: asString(resourceRequests.memory, "2Gi"),
|
||||
},
|
||||
limits: {
|
||||
cpu: asString(resourceLimits.cpu, "4000m"),
|
||||
memory: asString(resourceLimits.memory, "8Gi"),
|
||||
},
|
||||
};
|
||||
|
||||
// Labels
|
||||
const labels: Record<string, string> = {
|
||||
"app.kubernetes.io/managed-by": "paperclip",
|
||||
"app.kubernetes.io/component": "agent-job",
|
||||
"paperclip.io/agent-id": agent.id,
|
||||
"paperclip.io/run-id": runId,
|
||||
"paperclip.io/company-id": agent.companyId,
|
||||
"paperclip.io/adapter-type": "opencode_k8s",
|
||||
};
|
||||
for (const [key, value] of Object.entries(extraLabels)) {
|
||||
if (typeof value === "string") labels[key] = value;
|
||||
}
|
||||
|
||||
// Volumes
|
||||
const volumes: k8s.V1Volume[] = [{ name: "prompt", emptyDir: {} }];
|
||||
const volumeMounts: k8s.V1VolumeMount[] = [{ name: "prompt", mountPath: "/tmp/prompt" }];
|
||||
|
||||
if (selfPod.pvcClaimName) {
|
||||
volumes.push({
|
||||
name: "data",
|
||||
persistentVolumeClaim: { claimName: selfPod.pvcClaimName },
|
||||
});
|
||||
volumeMounts.push({ name: "data", mountPath: "/paperclip" });
|
||||
}
|
||||
|
||||
// Mount secret volumes inherited from the Deployment pod
|
||||
for (const sv of selfPod.secretVolumes) {
|
||||
volumes.push({
|
||||
name: sv.volumeName,
|
||||
secret: { secretName: sv.secretName, defaultMode: sv.defaultMode, optional: true },
|
||||
});
|
||||
volumeMounts.push({
|
||||
name: sv.volumeName,
|
||||
mountPath: sv.mountPath,
|
||||
readOnly: true,
|
||||
});
|
||||
}
|
||||
|
||||
const securityContext: k8s.V1SecurityContext = {
|
||||
capabilities: { drop: ["ALL"] },
|
||||
readOnlyRootFilesystem: false,
|
||||
runAsNonRoot: true,
|
||||
runAsUser: 1000,
|
||||
allowPrivilegeEscalation: false,
|
||||
};
|
||||
|
||||
const podSecurityContext: k8s.V1PodSecurityContext = {
|
||||
runAsNonRoot: true,
|
||||
runAsUser: 1000,
|
||||
runAsGroup: 1000,
|
||||
fsGroup: 1000,
|
||||
};
|
||||
|
||||
// Build the main container command
|
||||
// 1. Optionally write opencode runtime config for permission bypass
|
||||
// 2. Pipe prompt into opencode
|
||||
const opencodeArgsEscaped = opencodeArgs.map((a) => `'${a.replace(/'/g, "'\\''")}'`).join(" ");
|
||||
const configSetup = runtimeConfigJson
|
||||
? `mkdir -p ~/.config/opencode && echo '${runtimeConfigJson.replace(/'/g, "'\\''")}' > ~/.config/opencode/opencode.json && `
|
||||
: "";
|
||||
const mainCommand = `${configSetup}cat /tmp/prompt/prompt.txt | opencode ${opencodeArgsEscaped}`;
|
||||
|
||||
const job: k8s.V1Job = {
|
||||
apiVersion: "batch/v1",
|
||||
kind: "Job",
|
||||
metadata: {
|
||||
name: jobName,
|
||||
namespace,
|
||||
labels,
|
||||
annotations: {
|
||||
"paperclip.io/adapter-type": "opencode_k8s",
|
||||
"paperclip.io/agent-name": agent.name,
|
||||
},
|
||||
},
|
||||
spec: {
|
||||
backoffLimit: 0,
|
||||
...(timeoutSec > 0 ? { activeDeadlineSeconds: timeoutSec } : {}),
|
||||
ttlSecondsAfterFinished: ttlSeconds,
|
||||
template: {
|
||||
metadata: { labels },
|
||||
spec: {
|
||||
restartPolicy: "Never",
|
||||
serviceAccountName: asString(config.serviceAccountName, "") || undefined,
|
||||
securityContext: podSecurityContext,
|
||||
...(selfPod.imagePullSecrets.length > 0 ? { imagePullSecrets: selfPod.imagePullSecrets } : {}),
|
||||
...(selfPod.dnsConfig ? { dnsConfig: selfPod.dnsConfig } : {}),
|
||||
...(Object.keys(nodeSelector).length > 0 ? { nodeSelector: nodeSelector as Record<string, string> } : {}),
|
||||
...(tolerations.length > 0 ? { tolerations: tolerations as k8s.V1Toleration[] } : {}),
|
||||
initContainers: [
|
||||
{
|
||||
name: "write-prompt",
|
||||
image: "busybox:1.36",
|
||||
imagePullPolicy: "IfNotPresent",
|
||||
command: ["sh", "-c", "echo \"$PROMPT_CONTENT\" > /tmp/prompt/prompt.txt"],
|
||||
env: [{ name: "PROMPT_CONTENT", value: prompt }],
|
||||
volumeMounts: [{ name: "prompt", mountPath: "/tmp/prompt" }],
|
||||
securityContext,
|
||||
resources: {
|
||||
requests: { cpu: "10m", memory: "16Mi" },
|
||||
limits: { cpu: "100m", memory: "64Mi" },
|
||||
},
|
||||
},
|
||||
],
|
||||
containers: [
|
||||
{
|
||||
name: "opencode",
|
||||
image,
|
||||
imagePullPolicy: asString(config.imagePullPolicy, "IfNotPresent"),
|
||||
workingDir,
|
||||
command: ["sh", "-c", mainCommand],
|
||||
env: envVars,
|
||||
volumeMounts,
|
||||
securityContext,
|
||||
resources: containerResources,
|
||||
},
|
||||
],
|
||||
volumes,
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
return { job, jobName, namespace, prompt, opencodeArgs, promptMetrics };
|
||||
}
|
||||
@@ -0,0 +1,172 @@
|
||||
import * as k8s from "@kubernetes/client-node";
|
||||
import { readFileSync } from "node:fs";
|
||||
|
||||
/**
|
||||
* Cached self-pod introspection result. Queried once on first execute(),
|
||||
* then reused for all subsequent Job builds so every Job inherits the
|
||||
* Deployment's image, imagePullSecrets, DNS config, and PVC claim.
|
||||
*/
|
||||
export interface SelfPodSecretVolume {
|
||||
volumeName: string;
|
||||
secretName: string;
|
||||
mountPath: string;
|
||||
defaultMode: number | undefined;
|
||||
}
|
||||
|
||||
export interface SelfPodInfo {
|
||||
namespace: string;
|
||||
image: string;
|
||||
imagePullSecrets: Array<{ name: string }>;
|
||||
dnsConfig: k8s.V1PodDNSConfig | undefined;
|
||||
pvcClaimName: string | null;
|
||||
secretVolumes: SelfPodSecretVolume[];
|
||||
/** Env vars inherited from the Deployment container. */
|
||||
inheritedEnv: Record<string, string>;
|
||||
}
|
||||
|
||||
/** Keys forwarded from the Deployment container env into Job pods. */
|
||||
const INHERITED_ENV_KEYS = [
|
||||
"CLAUDE_CODE_USE_BEDROCK",
|
||||
"AWS_REGION",
|
||||
"AWS_BEARER_TOKEN_BEDROCK",
|
||||
"ANTHROPIC_API_KEY",
|
||||
"OPENAI_API_KEY",
|
||||
"PAPERCLIP_API_URL",
|
||||
];
|
||||
|
||||
let cachedSelfPod: SelfPodInfo | null = null;
|
||||
|
||||
/**
|
||||
* Cache keyed by kubeconfig path (empty string = in-cluster).
|
||||
* Supports multiple agents with different kubeconfigs.
|
||||
*/
|
||||
const kcCache = new Map<string, k8s.KubeConfig>();
|
||||
|
||||
function getKubeConfig(kubeconfigPath?: string): k8s.KubeConfig {
|
||||
const key = kubeconfigPath ?? "";
|
||||
let kc = kcCache.get(key);
|
||||
if (!kc) {
|
||||
kc = new k8s.KubeConfig();
|
||||
if (kubeconfigPath) {
|
||||
kc.loadFromFile(kubeconfigPath);
|
||||
} else {
|
||||
kc.loadFromCluster();
|
||||
}
|
||||
kcCache.set(key, kc);
|
||||
}
|
||||
return kc;
|
||||
}
|
||||
|
||||
export function getBatchApi(kubeconfigPath?: string): k8s.BatchV1Api {
|
||||
return getKubeConfig(kubeconfigPath).makeApiClient(k8s.BatchV1Api);
|
||||
}
|
||||
|
||||
export function getCoreApi(kubeconfigPath?: string): k8s.CoreV1Api {
|
||||
return getKubeConfig(kubeconfigPath).makeApiClient(k8s.CoreV1Api);
|
||||
}
|
||||
|
||||
export function getAuthzApi(kubeconfigPath?: string): k8s.AuthorizationV1Api {
|
||||
return getKubeConfig(kubeconfigPath).makeApiClient(k8s.AuthorizationV1Api);
|
||||
}
|
||||
|
||||
export function getLogApi(kubeconfigPath?: string): k8s.Log {
|
||||
return new k8s.Log(getKubeConfig(kubeconfigPath));
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the current pod's namespace. Checks (in order):
|
||||
* 1. PAPERCLIP_NAMESPACE env var (set explicitly in Deployment)
|
||||
* 2. Service account namespace file (standard in-cluster path)
|
||||
* 3. POD_NAMESPACE env var (Downward API convention)
|
||||
* Falls back to "default" only if none of the above are available.
|
||||
*/
|
||||
function readInClusterNamespace(): string {
|
||||
const fromEnv = process.env.PAPERCLIP_NAMESPACE ?? process.env.POD_NAMESPACE;
|
||||
if (fromEnv?.trim()) return fromEnv.trim();
|
||||
try {
|
||||
return readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "utf-8").trim();
|
||||
} catch {
|
||||
return "default";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Query the K8s API for our own pod spec and cache the result.
|
||||
* Extracts image, imagePullSecrets, dnsConfig, PVC claim name,
|
||||
* and environment variables to forward to Job pods.
|
||||
*/
|
||||
export async function getSelfPodInfo(kubeconfigPath?: string): Promise<SelfPodInfo> {
|
||||
if (cachedSelfPod) return cachedSelfPod;
|
||||
|
||||
const hostname = process.env.HOSTNAME;
|
||||
if (!hostname) {
|
||||
throw new Error("claude_k8s: HOSTNAME env var not set — cannot introspect running pod");
|
||||
}
|
||||
|
||||
const namespace = readInClusterNamespace();
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
const pod = await coreApi.readNamespacedPod({ name: hostname, namespace });
|
||||
|
||||
const spec = pod.spec;
|
||||
if (!spec) {
|
||||
throw new Error(`claude_k8s: pod ${hostname} has no spec`);
|
||||
}
|
||||
|
||||
const mainContainer = spec.containers[0];
|
||||
if (!mainContainer?.image) {
|
||||
throw new Error(`claude_k8s: pod ${hostname} has no container image`);
|
||||
}
|
||||
|
||||
// Find PVC claim name from volumes mounted at /paperclip
|
||||
let pvcClaimName: string | null = null;
|
||||
const dataMount = mainContainer.volumeMounts?.find(
|
||||
(vm) => vm.mountPath === "/paperclip",
|
||||
);
|
||||
if (dataMount) {
|
||||
const volume = spec.volumes?.find((v) => v.name === dataMount.name);
|
||||
pvcClaimName = volume?.persistentVolumeClaim?.claimName ?? null;
|
||||
}
|
||||
|
||||
// Discover secret volumes mounted on the main container
|
||||
const secretVolumes: SelfPodSecretVolume[] = [];
|
||||
for (const vm of mainContainer.volumeMounts ?? []) {
|
||||
const vol = spec.volumes?.find((v) => v.name === vm.name);
|
||||
if (vol?.secret?.secretName) {
|
||||
secretVolumes.push({
|
||||
volumeName: vm.name,
|
||||
secretName: vol.secret.secretName,
|
||||
mountPath: vm.mountPath,
|
||||
defaultMode: vol.secret.defaultMode,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Collect inherited env vars from process.env (these came from the Deployment spec)
|
||||
const inheritedEnv: Record<string, string> = {};
|
||||
for (const key of INHERITED_ENV_KEYS) {
|
||||
const value = process.env[key];
|
||||
if (value !== undefined) {
|
||||
inheritedEnv[key] = value;
|
||||
}
|
||||
}
|
||||
|
||||
cachedSelfPod = {
|
||||
namespace,
|
||||
image: mainContainer.image,
|
||||
imagePullSecrets: (spec.imagePullSecrets ?? []).map((s) => ({
|
||||
name: s.name ?? "",
|
||||
})).filter((s) => s.name.length > 0),
|
||||
dnsConfig: spec.dnsConfig,
|
||||
pvcClaimName,
|
||||
secretVolumes,
|
||||
inheritedEnv,
|
||||
};
|
||||
|
||||
return cachedSelfPod;
|
||||
}
|
||||
|
||||
/** Reset cached state — useful for tests. */
|
||||
export function resetCache(): void {
|
||||
kcCache.clear();
|
||||
cachedSelfPod = null;
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
import { asNumber, asString, parseJson, parseObject } from "@paperclipai/adapter-utils/server-utils";
|
||||
|
||||
function errorText(value: unknown): string {
|
||||
if (typeof value === "string") return value;
|
||||
const rec = parseObject(value);
|
||||
const message = asString(rec.message, "").trim();
|
||||
if (message) return message;
|
||||
const data = parseObject(rec.data);
|
||||
const nestedMessage = asString(data.message, "").trim();
|
||||
if (nestedMessage) return nestedMessage;
|
||||
const name = asString(rec.name, "").trim();
|
||||
if (name) return name;
|
||||
const code = asString(rec.code, "").trim();
|
||||
if (code) return code;
|
||||
try {
|
||||
return JSON.stringify(rec);
|
||||
} catch {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
export function parseOpenCodeJsonl(stdout: string) {
|
||||
let sessionId: string | null = null;
|
||||
const messages: string[] = [];
|
||||
const errors: string[] = [];
|
||||
const usage = {
|
||||
inputTokens: 0,
|
||||
cachedInputTokens: 0,
|
||||
outputTokens: 0,
|
||||
};
|
||||
let costUsd = 0;
|
||||
|
||||
for (const rawLine of stdout.split(/\r?\n/)) {
|
||||
const line = rawLine.trim();
|
||||
if (!line) continue;
|
||||
|
||||
const event = parseJson(line);
|
||||
if (!event) continue;
|
||||
|
||||
const currentSessionId = asString(event.sessionID, "").trim();
|
||||
if (currentSessionId) sessionId = currentSessionId;
|
||||
|
||||
const type = asString(event.type, "");
|
||||
|
||||
if (type === "text") {
|
||||
const part = parseObject(event.part);
|
||||
const text = asString(part.text, "").trim();
|
||||
if (text) messages.push(text);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type === "step_finish") {
|
||||
const part = parseObject(event.part);
|
||||
const tokens = parseObject(part.tokens);
|
||||
const cache = parseObject(tokens.cache);
|
||||
usage.inputTokens += asNumber(tokens.input, 0);
|
||||
usage.cachedInputTokens += asNumber(cache.read, 0);
|
||||
usage.outputTokens += asNumber(tokens.output, 0) + asNumber(tokens.reasoning, 0);
|
||||
costUsd += asNumber(part.cost, 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type === "tool_use") {
|
||||
const part = parseObject(event.part);
|
||||
const state = parseObject(part.state);
|
||||
if (asString(state.status, "") === "error") {
|
||||
const text = asString(state.error, "").trim();
|
||||
if (text) errors.push(text);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type === "error") {
|
||||
const text = errorText(event.error ?? event.message).trim();
|
||||
if (text) errors.push(text);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
sessionId,
|
||||
summary: messages.join("\n\n").trim(),
|
||||
usage,
|
||||
costUsd,
|
||||
errorMessage: errors.length > 0 ? errors.join("\n") : null,
|
||||
};
|
||||
}
|
||||
|
||||
export function isOpenCodeUnknownSessionError(stdout: string, stderr: string): boolean {
|
||||
const haystack = `${stdout}\n${stderr}`
|
||||
.split(/\r?\n/)
|
||||
.map((line) => line.trim())
|
||||
.filter(Boolean)
|
||||
.join("\n");
|
||||
|
||||
return /unknown\s+session|session\b.*\bnot\s+found|resource\s+not\s+found:.*[\\/]session[\\/].*\.json|notfounderror|no session/i.test(
|
||||
haystack,
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
import type { AdapterSessionCodec } from "@paperclipai/adapter-utils";
|
||||
|
||||
function readNonEmptyString(value: unknown): string | null {
|
||||
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
|
||||
}
|
||||
|
||||
export const sessionCodec: AdapterSessionCodec = {
|
||||
deserialize(raw: unknown) {
|
||||
if (typeof raw !== "object" || raw === null || Array.isArray(raw)) return null;
|
||||
const record = raw as Record<string, unknown>;
|
||||
const sessionId =
|
||||
readNonEmptyString(record.sessionId) ??
|
||||
readNonEmptyString(record.session_id) ??
|
||||
readNonEmptyString(record.sessionID);
|
||||
if (!sessionId) return null;
|
||||
const cwd =
|
||||
readNonEmptyString(record.cwd) ??
|
||||
readNonEmptyString(record.workdir) ??
|
||||
readNonEmptyString(record.folder);
|
||||
const workspaceId = readNonEmptyString(record.workspaceId) ?? readNonEmptyString(record.workspace_id);
|
||||
const repoUrl = readNonEmptyString(record.repoUrl) ?? readNonEmptyString(record.repo_url);
|
||||
const repoRef = readNonEmptyString(record.repoRef) ?? readNonEmptyString(record.repo_ref);
|
||||
return {
|
||||
sessionId,
|
||||
...(cwd ? { cwd } : {}),
|
||||
...(workspaceId ? { workspaceId } : {}),
|
||||
...(repoUrl ? { repoUrl } : {}),
|
||||
...(repoRef ? { repoRef } : {}),
|
||||
};
|
||||
},
|
||||
serialize(params: Record<string, unknown> | null) {
|
||||
if (!params) return null;
|
||||
const sessionId =
|
||||
readNonEmptyString(params.sessionId) ??
|
||||
readNonEmptyString(params.session_id) ??
|
||||
readNonEmptyString(params.sessionID);
|
||||
if (!sessionId) return null;
|
||||
const cwd =
|
||||
readNonEmptyString(params.cwd) ??
|
||||
readNonEmptyString(params.workdir) ??
|
||||
readNonEmptyString(params.folder);
|
||||
const workspaceId = readNonEmptyString(params.workspaceId) ?? readNonEmptyString(params.workspace_id);
|
||||
const repoUrl = readNonEmptyString(params.repoUrl) ?? readNonEmptyString(params.repo_url);
|
||||
const repoRef = readNonEmptyString(params.repoRef) ?? readNonEmptyString(params.repo_ref);
|
||||
return {
|
||||
sessionId,
|
||||
...(cwd ? { cwd } : {}),
|
||||
...(workspaceId ? { workspaceId } : {}),
|
||||
...(repoUrl ? { repoUrl } : {}),
|
||||
...(repoRef ? { repoRef } : {}),
|
||||
};
|
||||
},
|
||||
getDisplayId(params: Record<string, unknown> | null) {
|
||||
if (!params) return null;
|
||||
return (
|
||||
readNonEmptyString(params.sessionId) ??
|
||||
readNonEmptyString(params.session_id) ??
|
||||
readNonEmptyString(params.sessionID)
|
||||
);
|
||||
},
|
||||
};
|
||||
@@ -0,0 +1,241 @@
|
||||
import type {
|
||||
AdapterEnvironmentCheck,
|
||||
AdapterEnvironmentTestContext,
|
||||
AdapterEnvironmentTestResult,
|
||||
} from "@paperclipai/adapter-utils";
|
||||
import { asString, parseObject } from "@paperclipai/adapter-utils/server-utils";
|
||||
import { getSelfPodInfo, getCoreApi, getAuthzApi } from "./k8s-client.js";
|
||||
|
||||
function summarizeStatus(checks: AdapterEnvironmentCheck[]): AdapterEnvironmentTestResult["status"] {
|
||||
if (checks.some((c) => c.level === "error")) return "fail";
|
||||
if (checks.some((c) => c.level === "warn")) return "warn";
|
||||
return "pass";
|
||||
}
|
||||
|
||||
async function checkApiReachable(checks: AdapterEnvironmentCheck[], kubeconfigPath?: string): Promise<boolean> {
|
||||
try {
|
||||
const selfPod = await getSelfPodInfo(kubeconfigPath);
|
||||
checks.push({
|
||||
code: "k8s_api_reachable",
|
||||
level: "info",
|
||||
message: `Kubernetes API reachable; running in namespace ${selfPod.namespace}`,
|
||||
detail: `Image: ${selfPod.image}`,
|
||||
});
|
||||
return true;
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
checks.push({
|
||||
code: "k8s_api_unreachable",
|
||||
level: "error",
|
||||
message: `Cannot reach Kubernetes API: ${msg}`,
|
||||
hint: "Ensure the pod has a valid service account token mounted and the API server is accessible.",
|
||||
});
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function checkNamespace(
|
||||
namespace: string,
|
||||
selfPodNamespace: string,
|
||||
checks: AdapterEnvironmentCheck[],
|
||||
kubeconfigPath?: string,
|
||||
): Promise<boolean> {
|
||||
// If targeting the same namespace we're running in, skip the cluster-scoped
|
||||
// readNamespace call — we know it exists, and the SA may lack cluster-level
|
||||
// namespace get permissions.
|
||||
if (namespace === selfPodNamespace) {
|
||||
checks.push({
|
||||
code: "k8s_namespace_exists",
|
||||
level: "info",
|
||||
message: `Target namespace is the pod namespace: ${namespace}`,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
try {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
await coreApi.readNamespace({ name: namespace });
|
||||
checks.push({
|
||||
code: "k8s_namespace_exists",
|
||||
level: "info",
|
||||
message: `Target namespace exists: ${namespace}`,
|
||||
});
|
||||
return true;
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
checks.push({
|
||||
code: "k8s_namespace_check_failed",
|
||||
level: "warn",
|
||||
message: `Cannot verify namespace "${namespace}": ${msg}`,
|
||||
hint: "The service account may lack cluster-level namespace read permissions. The namespace may still be usable — verify RBAC checks below.",
|
||||
});
|
||||
// Don't block on this — RBAC checks below will catch actual permission issues
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
async function checkRbac(
|
||||
namespace: string,
|
||||
checks: AdapterEnvironmentCheck[],
|
||||
kubeconfigPath?: string,
|
||||
): Promise<void> {
|
||||
const authzApi = getAuthzApi(kubeconfigPath);
|
||||
|
||||
const rbacChecks = [
|
||||
{ resource: "jobs", group: "batch", verb: "create", code: "k8s_rbac_job_create", label: "create Jobs" },
|
||||
{ resource: "jobs", group: "batch", verb: "delete", code: "k8s_rbac_job_delete", label: "delete Jobs" },
|
||||
{ resource: "jobs", group: "batch", verb: "get", code: "k8s_rbac_job_get", label: "get Jobs" },
|
||||
{ resource: "pods", group: "", verb: "list", code: "k8s_rbac_pod_list", label: "list Pods" },
|
||||
{ resource: "pods/log", group: "", verb: "get", code: "k8s_rbac_pod_log", label: "get Pod logs" },
|
||||
];
|
||||
|
||||
for (const check of rbacChecks) {
|
||||
try {
|
||||
const review = await authzApi.createSelfSubjectAccessReview({
|
||||
body: {
|
||||
apiVersion: "authorization.k8s.io/v1",
|
||||
kind: "SelfSubjectAccessReview",
|
||||
spec: {
|
||||
resourceAttributes: {
|
||||
namespace,
|
||||
verb: check.verb,
|
||||
resource: check.resource,
|
||||
group: check.group,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
if (review.status?.allowed) {
|
||||
checks.push({
|
||||
code: check.code,
|
||||
level: "info",
|
||||
message: `RBAC: allowed to ${check.label} in ${namespace}`,
|
||||
});
|
||||
} else {
|
||||
checks.push({
|
||||
code: check.code,
|
||||
level: "error",
|
||||
message: `RBAC: not allowed to ${check.label} in ${namespace}`,
|
||||
hint: `Grant the service account permission to ${check.verb} ${check.resource} in namespace ${namespace}.`,
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
checks.push({
|
||||
code: check.code,
|
||||
level: "warn",
|
||||
message: `RBAC check failed for ${check.label}: ${msg}`,
|
||||
hint: "SelfSubjectAccessReview may not be available; verify permissions manually.",
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function checkSecret(
|
||||
namespace: string,
|
||||
secretName: string,
|
||||
checks: AdapterEnvironmentCheck[],
|
||||
kubeconfigPath?: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
await coreApi.readNamespacedSecret({ name: secretName, namespace });
|
||||
checks.push({
|
||||
code: "k8s_secret_exists",
|
||||
level: "info",
|
||||
message: `Secret "${secretName}" exists in namespace ${namespace}`,
|
||||
});
|
||||
} catch {
|
||||
checks.push({
|
||||
code: "k8s_secret_missing",
|
||||
level: "warn",
|
||||
message: `Secret "${secretName}" not found in namespace ${namespace}`,
|
||||
hint: `Ensure the paperclip-secrets Secret exists with keys for ANTHROPIC_API_KEY and/or AWS_BEARER_TOKEN_BEDROCK.`,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function checkPvc(
|
||||
selfPod: { pvcClaimName: string | null; namespace: string },
|
||||
checks: AdapterEnvironmentCheck[],
|
||||
kubeconfigPath?: string,
|
||||
): Promise<void> {
|
||||
if (!selfPod.pvcClaimName) {
|
||||
checks.push({
|
||||
code: "k8s_pvc_not_detected",
|
||||
level: "warn",
|
||||
message: "No PVC detected on /paperclip mount — session resume and workspace sharing will not work.",
|
||||
hint: "Ensure the Paperclip Deployment has a PVC mounted at /paperclip with ReadWriteMany access mode.",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const coreApi = getCoreApi(kubeconfigPath);
|
||||
const pvc = await coreApi.readNamespacedPersistentVolumeClaim({
|
||||
name: selfPod.pvcClaimName,
|
||||
namespace: selfPod.namespace,
|
||||
});
|
||||
const accessModes = pvc.spec?.accessModes ?? [];
|
||||
const isRwx = accessModes.includes("ReadWriteMany");
|
||||
if (isRwx) {
|
||||
checks.push({
|
||||
code: "k8s_pvc_rwx",
|
||||
level: "info",
|
||||
message: `PVC "${selfPod.pvcClaimName}" has ReadWriteMany access — Job pods can mount it.`,
|
||||
});
|
||||
} else {
|
||||
checks.push({
|
||||
code: "k8s_pvc_not_rwx",
|
||||
level: "warn",
|
||||
message: `PVC "${selfPod.pvcClaimName}" access modes: ${accessModes.join(", ")}. ReadWriteMany is required for Job pods to share the volume.`,
|
||||
hint: "Change the PVC accessMode to ReadWriteMany in Helm values.",
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
checks.push({
|
||||
code: "k8s_pvc_check_failed",
|
||||
level: "warn",
|
||||
message: `Could not read PVC "${selfPod.pvcClaimName}": ${msg}`,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export async function testEnvironment(
|
||||
ctx: AdapterEnvironmentTestContext,
|
||||
): Promise<AdapterEnvironmentTestResult> {
|
||||
const checks: AdapterEnvironmentCheck[] = [];
|
||||
const config = parseObject(ctx.config);
|
||||
const secretRef = asString(config.secretRef, "paperclip-secrets");
|
||||
const kubeconfigPath = asString(config.kubeconfig, "") || undefined;
|
||||
|
||||
// 1. K8s API reachable + self-pod introspection
|
||||
const apiOk = await checkApiReachable(checks, kubeconfigPath);
|
||||
if (!apiOk) {
|
||||
return { adapterType: ctx.adapterType, status: summarizeStatus(checks), checks, testedAt: new Date().toISOString() };
|
||||
}
|
||||
|
||||
const selfPod = await getSelfPodInfo(kubeconfigPath);
|
||||
const namespace = asString(config.namespace, "") || selfPod.namespace;
|
||||
|
||||
// 2. Target namespace exists
|
||||
const nsOk = await checkNamespace(namespace, selfPod.namespace, checks, kubeconfigPath);
|
||||
if (!nsOk) {
|
||||
return { adapterType: ctx.adapterType, status: summarizeStatus(checks), checks, testedAt: new Date().toISOString() };
|
||||
}
|
||||
|
||||
// 3-5. Run remaining checks in parallel
|
||||
await Promise.all([
|
||||
checkRbac(namespace, checks, kubeconfigPath),
|
||||
checkSecret(namespace, secretRef, checks, kubeconfigPath),
|
||||
checkPvc(selfPod, checks, kubeconfigPath),
|
||||
]);
|
||||
|
||||
return {
|
||||
adapterType: ctx.adapterType,
|
||||
status: summarizeStatus(checks),
|
||||
checks,
|
||||
testedAt: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user