diff --git a/src/server/execute.ts b/src/server/execute.ts index 1b5f6e2..18af32c 100644 --- a/src/server/execute.ts +++ b/src/server/execute.ts @@ -1,9 +1,11 @@ import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils"; import { inferOpenAiCompatibleBiller, redactHomePathUserSegments } from "@paperclipai/adapter-utils"; -import { asString, asNumber, asBoolean, parseObject } from "@paperclipai/adapter-utils/server-utils"; +import { asString, asNumber, asBoolean, parseObject, readPaperclipRuntimeSkillEntries, resolvePaperclipDesiredSkillNames } from "@paperclipai/adapter-utils/server-utils"; +import { readFile } from "node:fs/promises"; import { parseOpenCodeJsonl, isOpenCodeUnknownSessionError, + isOpenCodeStepLimitResult, } from "./parse.js"; import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js"; import { buildJobManifest } from "./job-manifest.js"; @@ -127,13 +129,28 @@ async function streamPodLogs( kubeconfigPath?: string, ): Promise { const logApi = getLogApi(kubeconfigPath); - const chunks: string[] = []; + const parts: string[] = []; + let lineBuffer = ""; const writable = new Writable({ write(chunk: Buffer, _encoding, callback) { - const text = redactHomePathUserSegments(chunk.toString("utf-8")); - chunks.push(text); - void onLog("stdout", text).then(() => callback(), callback); + const incoming = lineBuffer + chunk.toString("utf-8"); + const nlIdx = incoming.lastIndexOf("\n"); + if (nlIdx === -1) { + // No complete line yet — buffer until newline arrives + lineBuffer = incoming; + callback(); + return; + } + lineBuffer = incoming.slice(nlIdx + 1); + // Redact each complete line individually to avoid path splits across chunk boundaries + const redacted = incoming + .slice(0, nlIdx + 1) + .split("\n") + .map((line) => redactHomePathUserSegments(line)) + .join("\n"); + parts.push(redacted); + void onLog("stdout", redacted).then(() => callback(), callback); }, }); @@ -146,7 +163,14 @@ async function streamPodLogs( // follow may fail if the container already exited } - return chunks.join(""); + // Flush any partial line that never received a trailing newline + if (lineBuffer) { + const redacted = redactHomePathUserSegments(lineBuffer); + parts.push(redacted); + await onLog("stdout", redacted); + } + + return parts.join(""); } async function readPodLogs( @@ -264,9 +288,45 @@ export async function execute(ctx: AdapterExecutionContext): Promise e.key === key); + if (entry?.source) { + try { + const text = (await readFile(entry.source, "utf-8")).trim(); + if (text) skillTexts.push(text); + } catch { + // skip unreadable skill files — non-fatal + } + } + } + if (skillTexts.length > 0) skillsBundleContent = skillTexts.join("\n\n---\n\n"); + } catch { + // non-fatal: skill bundle is optional + } + const { job, jobName, namespace, prompt, opencodeArgs, promptMetrics } = buildJobManifest({ ctx, selfPod, + instructionsContent: instructionsContent || undefined, + skillsBundleContent: skillsBundleContent || undefined, }); if (onMeta) { @@ -412,6 +472,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise l.trim()).find(Boolean) ?? ""; const fallbackErrorMessage = parsedError || firstStderrLine || `OpenCode exited with code ${synthesizedExitCode ?? -1}`; @@ -434,6 +502,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise { expect(result.job.spec?.template?.spec?.restartPolicy).toBe("Never"); }); + + it("applies nodeSelector from key=value textarea string", () => { + const ctx = { ...mockCtx, config: { nodeSelector: "kubernetes.io/arch=amd64\nkubernetes.io/os=linux" } }; + const result = buildJobManifest({ ctx, selfPod: mockSelfPod }); + + expect(result.job.spec?.template?.spec?.nodeSelector).toEqual({ + "kubernetes.io/arch": "amd64", + "kubernetes.io/os": "linux", + }); + }); + + it("applies nodeSelector from JSON object string", () => { + const ctx = { ...mockCtx, config: { nodeSelector: '{"node-type":"gpu"}' } }; + const result = buildJobManifest({ ctx, selfPod: mockSelfPod }); + + expect(result.job.spec?.template?.spec?.nodeSelector).toEqual({ "node-type": "gpu" }); + }); + + it("applies nodeSelector from plain object config", () => { + const ctx = { ...mockCtx, config: { nodeSelector: { "zone": "us-east-1" } } }; + const result = buildJobManifest({ ctx, selfPod: mockSelfPod }); + + expect(result.job.spec?.template?.spec?.nodeSelector).toEqual({ zone: "us-east-1" }); + }); + + it("ignores blank lines and comments in nodeSelector textarea", () => { + const ctx = { + ...mockCtx, + config: { nodeSelector: "# comment\n\nkubernetes.io/arch=amd64\n" }, + }; + const result = buildJobManifest({ ctx, selfPod: mockSelfPod }); + + expect(result.job.spec?.template?.spec?.nodeSelector).toEqual({ "kubernetes.io/arch": "amd64" }); + }); }); diff --git a/src/server/job-manifest.ts b/src/server/job-manifest.ts index dfe2ec5..8043693 100644 --- a/src/server/job-manifest.ts +++ b/src/server/job-manifest.ts @@ -17,6 +17,10 @@ import type { SelfPodInfo } from "./k8s-client.js"; export interface JobBuildInput { ctx: AdapterExecutionContext; selfPod: SelfPodInfo; + /** Content of the agent's instructions file (e.g. AGENTS.md), prepended to the prompt. */ + instructionsContent?: string; + /** Concatenated content of desired skill markdown files, prepended after instructions. */ + skillsBundleContent?: string; } export interface JobBuildResult { @@ -28,6 +32,46 @@ export interface JobBuildResult { promptMetrics: Record; } +/** + * Parse a config field that may be a JSON object, a plain object, or a textarea + * with "key=value" lines (one per line). Used for nodeSelector and labels. + */ +function parseKeyValueOrObject(value: unknown): Record { + if (value && typeof value === "object" && !Array.isArray(value)) { + return Object.fromEntries( + Object.entries(value as Record) + .filter(([, v]) => typeof v === "string") + .map(([k, v]) => [k, v as string]), + ); + } + if (typeof value !== "string") return {}; + const text = value.trim(); + if (!text) return {}; + try { + const parsed = JSON.parse(text); + if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { + return Object.fromEntries( + Object.entries(parsed as Record) + .filter(([, v]) => typeof v === "string") + .map(([k, v]) => [k, v as string]), + ); + } + } catch { + // fall through to key=value parsing + } + const result: Record = {}; + for (const line of text.split(/\r?\n/)) { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith("#")) continue; + const eqIdx = trimmed.indexOf("="); + if (eqIdx === -1) continue; + const key = trimmed.slice(0, eqIdx).trim(); + const val = trimmed.slice(eqIdx + 1).trim(); + if (key) result[key] = val; + } + return result; +} + function sanitizeForK8sName(value: string): string { return value.toLowerCase().replace(/[^a-z0-9-]/g, "").slice(0, 8); } @@ -145,9 +189,9 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { const timeoutSec = asNumber(config.timeoutSec, 0); const ttlSeconds = asNumber(config.ttlSecondsAfterFinished, 300); const resources = parseObject(config.resources); - const nodeSelector = parseObject(config.nodeSelector); + const nodeSelector = parseKeyValueOrObject(config.nodeSelector); const tolerations = Array.isArray(config.tolerations) ? config.tolerations : []; - const extraLabels = parseObject(config.labels); + const extraLabels = parseKeyValueOrObject(config.labels); // Resolve working directory const workspaceContext = parseObject(context.paperclipWorkspace); @@ -185,7 +229,11 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { const shouldUseResumeDeltaPrompt = Boolean(runtimeSessionId) && wakePrompt.length > 0; const renderedPrompt = shouldUseResumeDeltaPrompt ? "" : renderTemplate(promptTemplate, templateData); const sessionHandoffNote = asString(context.paperclipSessionHandoffMarkdown, "").trim(); + const instructionsContent = input.instructionsContent?.trim() ?? ""; + const skillsBundleContent = input.skillsBundleContent?.trim() ?? ""; const prompt = joinPromptSections([ + instructionsContent, + skillsBundleContent, renderedBootstrapPrompt, wakePrompt, sessionHandoffNote, @@ -193,6 +241,8 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult { ]); const promptMetrics = { promptChars: prompt.length, + instructionsChars: instructionsContent.length, + skillsBundleChars: skillsBundleContent.length, bootstrapPromptChars: renderedBootstrapPrompt.length, wakePromptChars: wakePrompt.length, sessionHandoffChars: sessionHandoffNote.length, diff --git a/src/server/parse.test.ts b/src/server/parse.test.ts index 0505bb3..d934dc8 100644 --- a/src/server/parse.test.ts +++ b/src/server/parse.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from "vitest"; -import { parseOpenCodeJsonl, isOpenCodeUnknownSessionError } from "./parse.js"; +import { parseOpenCodeJsonl, isOpenCodeUnknownSessionError, isOpenCodeStepLimitResult } from "./parse.js"; describe("parseOpenCodeJsonl", () => { it("parses text messages", () => { @@ -119,6 +119,37 @@ describe("parseOpenCodeJsonl", () => { }); }); +describe("isOpenCodeStepLimitResult", () => { + it("returns true for step_finish with reason max_turns", () => { + const stdout = JSON.stringify({ type: "step_finish", part: { reason: "max_turns", tokens: {} } }); + expect(isOpenCodeStepLimitResult(stdout)).toBe(true); + }); + + it("returns true for step_finish with reason max_steps", () => { + const stdout = JSON.stringify({ type: "step_finish", part: { reason: "max_steps", tokens: {} } }); + expect(isOpenCodeStepLimitResult(stdout)).toBe(true); + }); + + it("returns true for step_finish with reason step_limit", () => { + const stdout = JSON.stringify({ type: "step_finish", part: { reason: "step_limit", tokens: {} } }); + expect(isOpenCodeStepLimitResult(stdout)).toBe(true); + }); + + it("returns false for step_finish with reason end_turn", () => { + const stdout = JSON.stringify({ type: "step_finish", part: { reason: "end_turn", tokens: {} } }); + expect(isOpenCodeStepLimitResult(stdout)).toBe(false); + }); + + it("returns false with no step_finish events", () => { + const stdout = JSON.stringify({ type: "text", part: { text: "Hello" } }); + expect(isOpenCodeStepLimitResult(stdout)).toBe(false); + }); + + it("returns false for empty stdout", () => { + expect(isOpenCodeStepLimitResult("")).toBe(false); + }); +}); + describe("isOpenCodeUnknownSessionError", () => { it("detects 'unknown session' in stdout", () => { const stdout = "Error: unknown session"; diff --git a/src/server/parse.ts b/src/server/parse.ts index de049fa..4751a7a 100644 --- a/src/server/parse.ts +++ b/src/server/parse.ts @@ -88,6 +88,23 @@ export function parseOpenCodeJsonl(stdout: string) { }; } +export function isOpenCodeStepLimitResult(stdout: string): boolean { + for (const rawLine of stdout.split(/\r?\n/)) { + const line = rawLine.trim(); + if (!line) continue; + const event = parseJson(line); + if (!event) continue; + if (asString(event.type, "") === "step_finish") { + const part = parseObject(event.part); + const reason = asString(part.reason, "").toLowerCase(); + if (reason === "max_turns" || reason === "max_steps" || reason === "step_limit") { + return true; + } + } + } + return false; +} + export function isOpenCodeUnknownSessionError(stdout: string, stderr: string): boolean { const haystack = `${stdout}\n${stderr}` .split(/\r?\n/) diff --git a/src/ui-parser.test.ts b/src/ui-parser.test.ts new file mode 100644 index 0000000..409f29a --- /dev/null +++ b/src/ui-parser.test.ts @@ -0,0 +1,149 @@ +import { describe, it, expect } from "vitest"; +import { parseStdoutLine } from "./ui-parser.js"; + +const TS = "2026-01-01T00:00:00.000Z"; + +describe("parseStdoutLine", () => { + it("returns empty for blank lines", () => { + expect(parseStdoutLine("", TS)).toEqual([]); + expect(parseStdoutLine(" ", TS)).toEqual([]); + }); + + it("returns stdout kind for non-JSON input", () => { + const entries = parseStdoutLine("plain text", TS); + expect(entries).toHaveLength(1); + expect(entries[0].kind).toBe("stdout"); + }); + + it("maps text event to assistant kind", () => { + const line = JSON.stringify({ type: "text", part: { text: "Hello" } }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(1); + expect(entries[0].kind).toBe("assistant"); + expect((entries[0] as { kind: "assistant"; text: string }).text).toBe("Hello"); + }); + + it("maps standalone thinking event to thinking kind", () => { + const line = JSON.stringify({ type: "thinking", part: { thinking: "My reasoning" } }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(1); + expect(entries[0].kind).toBe("thinking"); + expect((entries[0] as unknown as { text: string }).text).toBe("My reasoning"); + }); + + it("maps thinking block inside assistant event to thinking kind", () => { + const line = JSON.stringify({ + type: "assistant", + part: { + message: { + content: [{ type: "thinking", thinking: "Inner reasoning" }], + }, + }, + }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(1); + expect(entries[0].kind).toBe("thinking"); + expect((entries[0] as unknown as { text: string }).text).toBe("Inner reasoning"); + }); + + it("collects both text and thinking blocks from assistant event", () => { + const line = JSON.stringify({ + type: "assistant", + part: { + message: { + content: [ + { type: "thinking", thinking: "Let me think" }, + { type: "text", text: "Here is my answer" }, + ], + }, + }, + }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(2); + expect(entries[0].kind).toBe("thinking"); + expect(entries[1].kind).toBe("assistant"); + }); + + it("maps user event tool_result to tool_result kind", () => { + const line = JSON.stringify({ + type: "user", + part: { + message: { + content: [ + { + type: "tool_result", + tool_use_id: "tu_abc", + content: "File contents here", + }, + ], + }, + }, + }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(1); + expect(entries[0].kind).toBe("tool_result"); + const tr = entries[0] as { kind: "tool_result"; toolUseId: string; content: string; isError: boolean }; + expect(tr.toolUseId).toBe("tu_abc"); + expect(tr.content).toBe("File contents here"); + expect(tr.isError).toBe(false); + }); + + it("maps user event tool_result with array content", () => { + const line = JSON.stringify({ + type: "user", + part: { + message: { + content: [ + { + type: "tool_result", + tool_use_id: "tu_xyz", + content: [{ type: "text", text: "part1" }, { type: "text", text: "part2" }], + }, + ], + }, + }, + }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(1); + expect(entries[0].kind).toBe("tool_result"); + expect((entries[0] as { content: string }).content).toBe("part1part2"); + }); + + it("returns empty for user event with no content", () => { + const line = JSON.stringify({ type: "user", part: {} }); + const entries = parseStdoutLine(line, TS); + expect(entries).toEqual([]); + }); + + it("maps tool_use completed to tool_result kind", () => { + const line = JSON.stringify({ + type: "tool_use", + part: { tool: "read_file", id: "tu_1", state: { status: "completed", output: "ok" } }, + }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(1); + expect(entries[0].kind).toBe("tool_result"); + }); + + it("maps step_finish to result kind", () => { + const line = JSON.stringify({ + type: "step_finish", + part: { reason: "end_turn", tokens: { input: 10, output: 5, cache: { read: 0 } }, cost: 0.001 }, + }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(1); + expect(entries[0].kind).toBe("result"); + }); + + it("maps error event to stderr kind", () => { + const line = JSON.stringify({ type: "error", error: { message: "Something broke" } }); + const entries = parseStdoutLine(line, TS); + expect(entries).toHaveLength(1); + expect(entries[0].kind).toBe("stderr"); + }); + + it("ignores thinking event with empty text", () => { + const line = JSON.stringify({ type: "thinking", part: { thinking: " " } }); + expect(parseStdoutLine(line, TS)).toEqual([]); + }); +}); diff --git a/src/ui-parser.ts b/src/ui-parser.ts index 24e8f11..4520869 100644 --- a/src/ui-parser.ts +++ b/src/ui-parser.ts @@ -9,6 +9,7 @@ type TranscriptEntry = | { kind: "assistant"; ts: string; text: string; delta?: boolean } + | { kind: "thinking"; ts: string; text: string } | { kind: "tool_call"; ts: string; name: string; input: unknown; toolUseId?: string } | { kind: "tool_result"; ts: string; toolUseId: string; toolName?: string; content: string; isError: boolean } | { kind: "result"; ts: string; text: string; inputTokens: number; outputTokens: number; cachedTokens: number; costUsd: number; subtype: string; isError: boolean; errors: string[] } @@ -155,7 +156,14 @@ export function parseStdoutLine(line: string, ts: string): TranscriptEntry[] { return [{ kind: "system", ts, text: "Starting step…" }]; } - // Assistant message (nested content blocks) + // Standalone thinking event (extended reasoning) + if (type === "thinking") { + const text = asString(part.thinking ?? part.text, "").trim(); + if (text) return [{ kind: "thinking", ts, text }]; + return []; + } + + // Assistant message (nested content blocks — text and thinking) if (type === "assistant") { const content = part.message ?? part; const contentRecord = asRecord(content); @@ -163,13 +171,57 @@ export function parseStdoutLine(line: string, ts: string): TranscriptEntry[] { const contentArr = Array.isArray(contentRecord.content) ? contentRecord.content : [contentRecord.content]; + const entries: TranscriptEntry[] = []; for (const item of contentArr) { const itemRecord = asRecord(item); if (itemRecord.type === "text" && typeof itemRecord.text === "string") { const text = (itemRecord.text as string).trim(); - if (text) return [{ kind: "assistant", ts, text }]; + if (text) entries.push({ kind: "assistant", ts, text }); + } else if (itemRecord.type === "thinking" && typeof itemRecord.thinking === "string") { + const text = (itemRecord.thinking as string).trim(); + if (text) entries.push({ kind: "thinking", ts, text }); } } + return entries; + } + return []; + } + + // User turn — surface tool_result blocks so they appear in the transcript + if (type === "user") { + const content = part.message ?? part; + const contentRecord = asRecord(content); + if (contentRecord.content) { + const contentArr = Array.isArray(contentRecord.content) + ? contentRecord.content + : [contentRecord.content]; + const entries: TranscriptEntry[] = []; + for (const item of contentArr) { + const itemRecord = asRecord(item); + if (itemRecord.type === "tool_result") { + const toolUseId = asString(itemRecord.tool_use_id ?? itemRecord.toolUseId ?? "", ""); + const contentVal = itemRecord.content; + let resultText = ""; + if (typeof contentVal === "string") { + resultText = contentVal.trim(); + } else if (Array.isArray(contentVal)) { + resultText = contentVal + .map((c) => asString(asRecord(c).text, "")) + .join("") + .trim(); + } + if (toolUseId || resultText) { + entries.push({ + kind: "tool_result", + ts, + toolUseId: toolUseId || "unknown", + content: resultText, + isError: false, + }); + } + } + } + return entries; } return []; }