fix: P0 correctness fixes from FAR-104/FAR-105 analysis
1. Inherit envFrom and env.valueFrom from self pod — secrets wired via valueFrom.secretKeyRef or envFrom.secretRef are now forwarded to Job pods, fixing credentials silently dropped for K8s-idiomatic secret patterns (e.g. ANTHROPIC_API_KEY via Secret). 2. Distinguish 404 vs transient errors in keepalive — only mark the keepalive as terminal on 404 (Job deleted). Transient 5xx/connection errors are logged and retried on the next tick, preventing premature reaper kills during API instability. 3. Fail closed on concurrency-guard read failure — a failing listNamespacedJob now returns k8s_concurrency_guard_unreachable instead of silently proceeding, protecting against zombie Jobs on shared PVCs. 4. Bound the waitForJobCompletion re-check — pass a 60s timeout instead of polling forever, preventing indefinite hangs when the K8s API is degraded. Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
+35
-10
@@ -356,8 +356,20 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch {
|
} catch (err: unknown) {
|
||||||
// If we can't check, proceed — the heartbeat service enforces concurrency too
|
// If we can't list jobs, fail closed — the K8s concurrency guard is the
|
||||||
|
// only thing preventing zombie Jobs on a shared PVC from corrupting
|
||||||
|
// sessions. 404 (namespace not found) is treated as a hard failure;
|
||||||
|
// other errors (5xx, network) are also surfaced.
|
||||||
|
const msg = err instanceof Error ? err.message : String(err);
|
||||||
|
await onLog("stderr", `[paperclip] Concurrency guard failed: unable to list jobs: ${msg}\n`);
|
||||||
|
return {
|
||||||
|
exitCode: null,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage: `Concurrency guard unreachable: ${msg}`,
|
||||||
|
errorCode: "k8s_concurrency_guard_unreachable",
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build Job manifest
|
// Build Job manifest
|
||||||
@@ -486,11 +498,21 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||||||
keepaliveJobTerminal = true;
|
keepaliveJobTerminal = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} catch {
|
} catch (err: unknown) {
|
||||||
// Job may have been deleted out from under us, or the API call
|
// Only treat 404 (Job deleted) as terminal. Transient 5xx or
|
||||||
// transiently failed. Either way, do not refresh updatedAt —
|
// connection resets should NOT permanently disable the keepalive —
|
||||||
// either the Job really is gone, or the next tick will re-check.
|
// the next tick will re-check and the reaper uses the staleness
|
||||||
keepaliveJobTerminal = true;
|
// window as a safety net.
|
||||||
|
const statusCode = (err as { response?: { statusCode?: number } })?.response?.statusCode
|
||||||
|
?? (err as { statusCode?: number })?.statusCode;
|
||||||
|
if (statusCode === 404) {
|
||||||
|
keepaliveJobTerminal = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Log transient errors but leave keepaliveJobTerminal false so
|
||||||
|
// the next tick retries.
|
||||||
|
const msg = err instanceof Error ? err.message : String(err);
|
||||||
|
void onLog("stderr", `[paperclip] keepalive: transient error checking job status: ${msg}\n`).catch(() => {});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -550,11 +572,14 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||||||
} else {
|
} else {
|
||||||
// waitForJobCompletion threw — re-check job state to avoid returning
|
// waitForJobCompletion threw — re-check job state to avoid returning
|
||||||
// while the job is still running (which would cause UI staleness and
|
// while the job is still running (which would cause UI staleness and
|
||||||
// concurrency errors on retry).
|
// concurrency errors on retry). Use a bounded timeout (60s) so we
|
||||||
|
// don't hang the heartbeat indefinitely if the K8s API is degraded.
|
||||||
jobTimedOut = false;
|
jobTimedOut = false;
|
||||||
const actualState = await waitForJobCompletion(namespace, jobName, 0, kubeconfigPath);
|
const RECHECK_TIMEOUT_MS = 60_000;
|
||||||
|
const actualState = await waitForJobCompletion(namespace, jobName, RECHECK_TIMEOUT_MS, kubeconfigPath);
|
||||||
if (actualState.timedOut) {
|
if (actualState.timedOut) {
|
||||||
// Truly a timeout after re-check — treat as timed out.
|
// Re-check itself timed out — the job may still be running.
|
||||||
|
// Return an error so the UI knows the run is not done.
|
||||||
jobTimedOut = true;
|
jobTimedOut = true;
|
||||||
} else if (!actualState.succeeded) {
|
} else if (!actualState.succeeded) {
|
||||||
// Job still not terminal — the completion error was likely transient.
|
// Job still not terminal — the completion error was likely transient.
|
||||||
|
|||||||
@@ -24,6 +24,8 @@ function makeSelfPod(overrides: Partial<SelfPodInfo> = {}): SelfPodInfo {
|
|||||||
pvcClaimName: "paperclip-data",
|
pvcClaimName: "paperclip-data",
|
||||||
secretVolumes: [],
|
secretVolumes: [],
|
||||||
inheritedEnv: {},
|
inheritedEnv: {},
|
||||||
|
inheritedEnvValueFrom: [],
|
||||||
|
inheritedEnvFrom: [],
|
||||||
...overrides,
|
...overrides,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -331,6 +333,50 @@ describe("buildJobManifest", () => {
|
|||||||
const apiUrl = job.spec?.template?.spec?.containers[0]?.env?.find((e) => e.name === "PAPERCLIP_API_URL");
|
const apiUrl = job.spec?.template?.spec?.containers[0]?.env?.find((e) => e.name === "PAPERCLIP_API_URL");
|
||||||
expect(apiUrl?.value).toBe("http://paperclip:8080");
|
expect(apiUrl?.value).toBe("http://paperclip:8080");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("includes valueFrom env vars from selfPod", () => {
|
||||||
|
selfPod.inheritedEnvValueFrom = [
|
||||||
|
{ name: "ANTHROPIC_API_KEY", valueFrom: { secretKeyRef: { name: "api-keys", key: "anthropic" } } },
|
||||||
|
];
|
||||||
|
const { job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
const envList = job.spec?.template?.spec?.containers[0]?.env ?? [];
|
||||||
|
const apiKeyEntry = envList.find((e) => e.name === "ANTHROPIC_API_KEY");
|
||||||
|
expect(apiKeyEntry?.valueFrom?.secretKeyRef?.name).toBe("api-keys");
|
||||||
|
expect(apiKeyEntry?.valueFrom?.secretKeyRef?.key).toBe("anthropic");
|
||||||
|
expect(apiKeyEntry?.value).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("literal env overrides valueFrom with the same name", () => {
|
||||||
|
selfPod.inheritedEnv = { MY_VAR: "literal-value" };
|
||||||
|
selfPod.inheritedEnvValueFrom = [
|
||||||
|
{ name: "MY_VAR", valueFrom: { secretKeyRef: { name: "sec", key: "k" } } },
|
||||||
|
];
|
||||||
|
const { job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
const envList = job.spec?.template?.spec?.containers[0]?.env ?? [];
|
||||||
|
const myVar = envList.filter((e) => e.name === "MY_VAR");
|
||||||
|
expect(myVar).toHaveLength(1);
|
||||||
|
expect(myVar[0]?.value).toBe("literal-value");
|
||||||
|
expect(myVar[0]?.valueFrom).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("includes envFrom sources from selfPod on the container", () => {
|
||||||
|
selfPod.inheritedEnvFrom = [
|
||||||
|
{ secretRef: { name: "api-secrets" } },
|
||||||
|
{ configMapRef: { name: "app-config" } },
|
||||||
|
];
|
||||||
|
const { job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
const container = job.spec?.template?.spec?.containers[0];
|
||||||
|
expect(container?.envFrom).toHaveLength(2);
|
||||||
|
expect(container?.envFrom?.[0]?.secretRef?.name).toBe("api-secrets");
|
||||||
|
expect(container?.envFrom?.[1]?.configMapRef?.name).toBe("app-config");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("omits envFrom when selfPod has none", () => {
|
||||||
|
selfPod.inheritedEnvFrom = [];
|
||||||
|
const { job } = buildJobManifest({ ctx, selfPod });
|
||||||
|
const container = job.spec?.template?.spec?.containers[0];
|
||||||
|
expect(container?.envFrom).toBeUndefined();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("resources", () => {
|
describe("resources", () => {
|
||||||
|
|||||||
@@ -148,12 +148,22 @@ function buildEnvVars(
|
|||||||
// HOME must be /paperclip to match PVC mount and enable session resume
|
// HOME must be /paperclip to match PVC mount and enable session resume
|
||||||
merged.HOME = "/paperclip";
|
merged.HOME = "/paperclip";
|
||||||
|
|
||||||
// Convert to V1EnvVar array
|
// Convert literal env to V1EnvVar array
|
||||||
const envVars: k8s.V1EnvVar[] = Object.entries(merged).map(([name, value]) => ({
|
const envVars: k8s.V1EnvVar[] = Object.entries(merged).map(([name, value]) => ({
|
||||||
name,
|
name,
|
||||||
value,
|
value,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
// Append valueFrom entries from the Deployment container (secretKeyRef,
|
||||||
|
// configMapKeyRef, fieldRef, etc.). Skip any whose name was already set
|
||||||
|
// by a literal value — the literal value wins (same precedence as above).
|
||||||
|
const literalNames = new Set(Object.keys(merged));
|
||||||
|
for (const entry of selfPod.inheritedEnvValueFrom) {
|
||||||
|
if (!literalNames.has(entry.name)) {
|
||||||
|
envVars.push(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return envVars;
|
return envVars;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -377,6 +387,7 @@ export function buildJobManifest(input: JobBuildInput): JobBuildResult {
|
|||||||
workingDir,
|
workingDir,
|
||||||
command: ["sh", "-c", mainCommand],
|
command: ["sh", "-c", mainCommand],
|
||||||
env: envVars,
|
env: envVars,
|
||||||
|
...(selfPod.inheritedEnvFrom.length > 0 ? { envFrom: selfPod.inheritedEnvFrom } : {}),
|
||||||
volumeMounts,
|
volumeMounts,
|
||||||
securityContext,
|
securityContext,
|
||||||
resources: containerResources,
|
resources: containerResources,
|
||||||
|
|||||||
@@ -20,8 +20,12 @@ export interface SelfPodInfo {
|
|||||||
dnsConfig: k8s.V1PodDNSConfig | undefined;
|
dnsConfig: k8s.V1PodDNSConfig | undefined;
|
||||||
pvcClaimName: string | null;
|
pvcClaimName: string | null;
|
||||||
secretVolumes: SelfPodSecretVolume[];
|
secretVolumes: SelfPodSecretVolume[];
|
||||||
/** Env vars inherited from the Deployment container. */
|
/** Env vars inherited from the Deployment container (literal name/value pairs). */
|
||||||
inheritedEnv: Record<string, string>;
|
inheritedEnv: Record<string, string>;
|
||||||
|
/** Env vars with valueFrom (secretKeyRef, configMapKeyRef, etc.) from the Deployment container. */
|
||||||
|
inheritedEnvValueFrom: k8s.V1EnvVar[];
|
||||||
|
/** envFrom sources (secretRef, configMapRef) from the Deployment container. */
|
||||||
|
inheritedEnvFrom: k8s.V1EnvFromSource[];
|
||||||
}
|
}
|
||||||
|
|
||||||
let cachedSelfPod: SelfPodInfo | null = null;
|
let cachedSelfPod: SelfPodInfo | null = null;
|
||||||
@@ -134,12 +138,21 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise<SelfPodIn
|
|||||||
// Collect env vars from the pod spec's container definition.
|
// Collect env vars from the pod spec's container definition.
|
||||||
// Agent config env (set in buildEnvVars) will override these.
|
// Agent config env (set in buildEnvVars) will override these.
|
||||||
const inheritedEnv: Record<string, string> = {};
|
const inheritedEnv: Record<string, string> = {};
|
||||||
|
const inheritedEnvValueFrom: k8s.V1EnvVar[] = [];
|
||||||
for (const envItem of mainContainer.env ?? []) {
|
for (const envItem of mainContainer.env ?? []) {
|
||||||
if (!envItem.name) continue;
|
if (!envItem.name) continue;
|
||||||
const value = envItem.value ?? "";
|
if (envItem.valueFrom) {
|
||||||
if (value) inheritedEnv[envItem.name] = value;
|
// Preserve valueFrom entries (secretKeyRef, configMapKeyRef, fieldRef, etc.)
|
||||||
|
inheritedEnvValueFrom.push({ name: envItem.name, valueFrom: envItem.valueFrom });
|
||||||
|
} else {
|
||||||
|
const value = envItem.value ?? "";
|
||||||
|
if (value) inheritedEnv[envItem.name] = value;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Capture envFrom sources (secretRef, configMapRef) from the container spec
|
||||||
|
const inheritedEnvFrom: k8s.V1EnvFromSource[] = mainContainer.envFrom ?? [];
|
||||||
|
|
||||||
cachedSelfPod = {
|
cachedSelfPod = {
|
||||||
namespace,
|
namespace,
|
||||||
image: mainContainer.image,
|
image: mainContainer.image,
|
||||||
@@ -150,6 +163,8 @@ export async function getSelfPodInfo(kubeconfigPath?: string): Promise<SelfPodIn
|
|||||||
pvcClaimName,
|
pvcClaimName,
|
||||||
secretVolumes,
|
secretVolumes,
|
||||||
inheritedEnv,
|
inheritedEnv,
|
||||||
|
inheritedEnvValueFrom,
|
||||||
|
inheritedEnvFrom,
|
||||||
};
|
};
|
||||||
|
|
||||||
return cachedSelfPod;
|
return cachedSelfPod;
|
||||||
|
|||||||
Reference in New Issue
Block a user