feat: UI parser kinds, nodeSelector textarea, step-limit session clear, per-line path redaction
- ui-parser: add thinking kind + handler for standalone thinking events, thinking blocks in assistant content arrays, and user-turn tool_result blocks - job-manifest: parseKeyValueOrObject helper so nodeSelector (and labels) accept key=value textarea lines in addition to JSON objects - parse: isOpenCodeStepLimitResult detects step_finish with max_turns / max_steps / step_limit reason - execute: return clearSession:true when step limit reached so next run starts fresh; redactHomePathUserSegments moved to per-line to prevent paths split across chunks - tests: ui-parser.test.ts (new), extended parse.test.ts and job-manifest.test.ts Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
+75
-7
@@ -1,9 +1,11 @@
|
||||
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
|
||||
import { inferOpenAiCompatibleBiller, redactHomePathUserSegments } from "@paperclipai/adapter-utils";
|
||||
import { asString, asNumber, asBoolean, parseObject } from "@paperclipai/adapter-utils/server-utils";
|
||||
import { asString, asNumber, asBoolean, parseObject, readPaperclipRuntimeSkillEntries, resolvePaperclipDesiredSkillNames } from "@paperclipai/adapter-utils/server-utils";
|
||||
import { readFile } from "node:fs/promises";
|
||||
import {
|
||||
parseOpenCodeJsonl,
|
||||
isOpenCodeUnknownSessionError,
|
||||
isOpenCodeStepLimitResult,
|
||||
} from "./parse.js";
|
||||
import { getSelfPodInfo, getBatchApi, getCoreApi, getLogApi } from "./k8s-client.js";
|
||||
import { buildJobManifest } from "./job-manifest.js";
|
||||
@@ -127,13 +129,28 @@ async function streamPodLogs(
|
||||
kubeconfigPath?: string,
|
||||
): Promise<string> {
|
||||
const logApi = getLogApi(kubeconfigPath);
|
||||
const chunks: string[] = [];
|
||||
const parts: string[] = [];
|
||||
let lineBuffer = "";
|
||||
|
||||
const writable = new Writable({
|
||||
write(chunk: Buffer, _encoding, callback) {
|
||||
const text = redactHomePathUserSegments(chunk.toString("utf-8"));
|
||||
chunks.push(text);
|
||||
void onLog("stdout", text).then(() => callback(), callback);
|
||||
const incoming = lineBuffer + chunk.toString("utf-8");
|
||||
const nlIdx = incoming.lastIndexOf("\n");
|
||||
if (nlIdx === -1) {
|
||||
// No complete line yet — buffer until newline arrives
|
||||
lineBuffer = incoming;
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
lineBuffer = incoming.slice(nlIdx + 1);
|
||||
// Redact each complete line individually to avoid path splits across chunk boundaries
|
||||
const redacted = incoming
|
||||
.slice(0, nlIdx + 1)
|
||||
.split("\n")
|
||||
.map((line) => redactHomePathUserSegments(line))
|
||||
.join("\n");
|
||||
parts.push(redacted);
|
||||
void onLog("stdout", redacted).then(() => callback(), callback);
|
||||
},
|
||||
});
|
||||
|
||||
@@ -146,7 +163,14 @@ async function streamPodLogs(
|
||||
// follow may fail if the container already exited
|
||||
}
|
||||
|
||||
return chunks.join("");
|
||||
// Flush any partial line that never received a trailing newline
|
||||
if (lineBuffer) {
|
||||
const redacted = redactHomePathUserSegments(lineBuffer);
|
||||
parts.push(redacted);
|
||||
await onLog("stdout", redacted);
|
||||
}
|
||||
|
||||
return parts.join("");
|
||||
}
|
||||
|
||||
async function readPodLogs(
|
||||
@@ -264,9 +288,45 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
// If we can't check, proceed — heartbeat service enforces concurrency too
|
||||
}
|
||||
|
||||
// Read agent instructions file (instructionsFilePath config field → system prompt prepend)
|
||||
const instructionsFilePath = asString(config.instructionsFilePath, "").trim();
|
||||
let instructionsContent = "";
|
||||
if (instructionsFilePath) {
|
||||
try {
|
||||
instructionsContent = (await readFile(instructionsFilePath, "utf-8")).trim();
|
||||
} catch {
|
||||
await onLog("stderr", `[paperclip] Warning: instructionsFilePath not readable: ${instructionsFilePath}\n`);
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve and read desired skill content (injected into prompt bundle)
|
||||
let skillsBundleContent = "";
|
||||
try {
|
||||
const moduleDir = import.meta.dirname;
|
||||
const availableEntries = await readPaperclipRuntimeSkillEntries(config, moduleDir);
|
||||
const desiredSkillKeys = resolvePaperclipDesiredSkillNames(config, availableEntries);
|
||||
const skillTexts: string[] = [];
|
||||
for (const key of desiredSkillKeys) {
|
||||
const entry = availableEntries.find((e) => e.key === key);
|
||||
if (entry?.source) {
|
||||
try {
|
||||
const text = (await readFile(entry.source, "utf-8")).trim();
|
||||
if (text) skillTexts.push(text);
|
||||
} catch {
|
||||
// skip unreadable skill files — non-fatal
|
||||
}
|
||||
}
|
||||
}
|
||||
if (skillTexts.length > 0) skillsBundleContent = skillTexts.join("\n\n---\n\n");
|
||||
} catch {
|
||||
// non-fatal: skill bundle is optional
|
||||
}
|
||||
|
||||
const { job, jobName, namespace, prompt, opencodeArgs, promptMetrics } = buildJobManifest({
|
||||
ctx,
|
||||
selfPod,
|
||||
instructionsContent: instructionsContent || undefined,
|
||||
skillsBundleContent: skillsBundleContent || undefined,
|
||||
});
|
||||
|
||||
if (onMeta) {
|
||||
@@ -412,6 +472,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
resultJson: { stdout },
|
||||
};
|
||||
}
|
||||
|
||||
// If OpenCode hit its step limit, clear the session so the next run starts fresh
|
||||
// rather than resuming into an already-exhausted turn sequence.
|
||||
const stepLimitReached = isOpenCodeStepLimitResult(stdout);
|
||||
if (stepLimitReached) {
|
||||
await onLog("stdout", `[paperclip] OpenCode step limit reached; clearing session for next run.\n`);
|
||||
}
|
||||
|
||||
const firstStderrLine = stdout.split(/\r?\n/).map((l) => l.trim()).find(Boolean) ?? "";
|
||||
const fallbackErrorMessage = parsedError || firstStderrLine || `OpenCode exited with code ${synthesizedExitCode ?? -1}`;
|
||||
|
||||
@@ -434,6 +502,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
||||
costUsd: parsed.costUsd,
|
||||
resultJson: { stdout },
|
||||
summary: parsed.summary,
|
||||
clearSession: false,
|
||||
clearSession: stepLimitReached,
|
||||
} as AdapterExecutionResult;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user