feat: extract pipeline core for library consumption (#282)

* feat: extract pipeline core for library consumption

* fix: chmod workspace directory for container write access

* fix: resolve playwright output dir relative to deliverables parent

* feat: add multi-provider LLM support via ProviderConfig

* fix: resolve model overrides via options.model, remove unused model env passthrough

* fix: use ANTHROPIC_AUTH_TOKEN for custom base URL and router auth

* fix: skip env-based credential validation when providerConfig is present

* fix: support large UID/GID values for AD/LDAP users in container
This commit is contained in:
ezl-keygraph
2026-04-10 04:53:36 +05:30
committed by GitHub
parent f6fd1edad6
commit 1f6dfd7e17
32 changed files with 616 additions and 106 deletions
+82 -9
View File
@@ -22,6 +22,7 @@ import { AuditSession } from '../audit/index.js';
import type { ResumeAttempt } from '../audit/metrics-tracker.js';
import type { SessionMetadata } from '../audit/utils.js';
import type { WorkflowSummary } from '../audit/workflow-logger.js';
import type { ContainerConfig, ProviderConfig } from '../types/config.js';
import { getContainer, getOrCreateContainer, removeContainer } from '../services/container.js';
import { classifyErrorForTemporal, PentestError } from '../services/error-handling.js';
import { ExploitationCheckerService } from '../services/exploitation-checker.js';
@@ -34,9 +35,10 @@ import type { AgentName } from '../types/agents.js';
import { ALL_AGENTS } from '../types/agents.js';
import { ErrorCode } from '../types/errors.js';
import { isErr } from '../types/result.js';
import { DEFAULT_DELIVERABLES_SUBDIR, deliverablesDir } from '../paths.js';
import { fileExists, readJson } from '../utils/file-io.js';
import { createActivityLogger } from './activity-logger.js';
import type { AgentMetrics, ResumeState } from './shared.js';
import type { AgentMetrics, PipelineState, ResumeState } from './shared.js';
// Max lengths to prevent Temporal protobuf buffer overflow
const MAX_ERROR_MESSAGE_LENGTH = 2000;
@@ -49,6 +51,9 @@ const HEARTBEAT_INTERVAL_MS = 2000;
/**
* Input for all agent activities.
*
* Config fields are optional with sensible defaults. When provided, they
* flow through to getOrCreateContainer() for path and credential configuration.
*/
export interface ActivityInput {
webUrl: string;
@@ -58,6 +63,16 @@ export interface ActivityInput {
pipelineTestingMode?: boolean;
workflowId: string;
sessionId: string;
// Config fields — serializable, read by getOrCreateContainer()
configYAML?: string;
apiKey?: string;
deliverablesSubdir?: string;
auditDir?: string;
promptDir?: string;
sastSarifPath?: string;
skipGitCheck?: boolean;
providerConfig?: ProviderConfig;
}
/**
@@ -92,6 +107,19 @@ function buildSessionMetadata(input: ActivityInput): SessionMetadata {
};
}
/**
* Build ContainerConfig from ActivityInput, falling back to defaults.
*/
function buildContainerConfig(input: ActivityInput): ContainerConfig {
return {
deliverablesSubdir: input.deliverablesSubdir ?? DEFAULT_DELIVERABLES_SUBDIR,
auditDir: input.auditDir ?? './workspaces',
...(input.apiKey !== undefined && { apiKey: input.apiKey }),
...(input.promptDir !== undefined && { promptDir: input.promptDir }),
...(input.providerConfig !== undefined && { providerConfig: input.providerConfig }),
};
}
/**
* Core activity implementation using services.
*
@@ -117,7 +145,7 @@ async function runAgentActivity(agentName: AgentName, input: ActivityInput): Pro
// 1. Build session metadata and get/create container
const sessionMetadata = buildSessionMetadata(input);
const container = getOrCreateContainer(workflowId, sessionMetadata);
const container = getOrCreateContainer(workflowId, sessionMetadata, buildContainerConfig(input));
// 2. Create audit session for THIS agent execution
// NOTE: Each agent needs its own AuditSession because AuditSession uses
@@ -126,7 +154,7 @@ async function runAgentActivity(agentName: AgentName, input: ActivityInput): Pro
await auditSession.initialize(workflowId);
// 3. Execute agent via service (throws PentestError on failure)
const deliverablesPath = path.join(repoPath, '.shannon', 'deliverables');
const deliverablesPath = deliverablesDir(repoPath, container.config.deliverablesSubdir);
const endResult = await container.agentExecution.executeOrThrow(
agentName,
{
@@ -136,6 +164,14 @@ async function runAgentActivity(agentName: AgentName, input: ActivityInput): Pro
configPath,
pipelineTestingMode,
attemptNumber,
...(input.apiKey !== undefined && { apiKey: input.apiKey }),
...(input.providerConfig !== undefined && { providerConfig: input.providerConfig }),
...(input.promptDir !== undefined && {
promptDir: path.isAbsolute(input.promptDir)
? input.promptDir
: path.resolve(process.env.SHANNON_WORKER_ROOT ?? process.cwd(), input.promptDir),
}),
...(input.configYAML !== undefined && { configYAML: input.configYAML }),
},
auditSession,
logger,
@@ -270,7 +306,7 @@ export async function runPreflightValidation(input: ActivityInput): Promise<void
const logger = createActivityLogger();
logger.info('Running preflight validation...', { attempt: attemptNumber });
const result = await runPreflightChecks(input.webUrl, input.repoPath, input.configPath, logger);
const result = await runPreflightChecks(input.webUrl, input.repoPath, input.configPath, logger, input.skipGitCheck, input.apiKey, input.providerConfig);
if (isErr(result)) {
const classified = classifyErrorForTemporal(result.error);
@@ -318,7 +354,7 @@ export async function runPreflightValidation(input: ActivityInput): Promise<void
* Idempotent — skips if .git already exists (resume case).
*/
export async function initDeliverableGit(input: ActivityInput): Promise<void> {
const deliverablesPath = path.join(input.repoPath, '.shannon', 'deliverables');
const deliverablesPath = deliverablesDir(input.repoPath, input.deliverablesSubdir);
await fs.mkdir(deliverablesPath, { recursive: true });
// Check for .git directly inside deliverables, not parent repo's .git
@@ -382,7 +418,9 @@ export async function checkExploitationQueue(input: ActivityInput, vulnType: Vul
const existingContainer = getContainer(workflowId);
const checker = existingContainer?.exploitationChecker ?? new ExploitationCheckerService();
return checker.checkQueue(vulnType, repoPath, logger);
// Pass deliverablesPath (not repoPath) — validators expect the deliverables directory
const delivPath = deliverablesDir(repoPath, input.deliverablesSubdir);
return checker.checkQueue(vulnType, delivPath, logger);
}
interface SessionJson {
@@ -411,6 +449,7 @@ export async function loadResumeState(
workspaceName: string,
expectedUrl: string,
expectedRepoPath: string,
deliverablesSubdir?: string,
): Promise<ResumeState> {
// 1. Validate workspace exists
const sessionPath = path.join('./workspaces', workspaceName, 'session.json');
@@ -453,7 +492,7 @@ export async function loadResumeState(
}
const deliverableFilename = AGENTS[agentName].deliverableFilename;
const deliverablePath = `${expectedRepoPath}/.shannon/deliverables/${deliverableFilename}`;
const deliverablePath = path.join(deliverablesDir(expectedRepoPath, deliverablesSubdir), deliverableFilename);
const deliverableExists = await fileExists(deliverablePath);
if (!deliverableExists) {
@@ -487,7 +526,7 @@ export async function loadResumeState(
}
// 5. Find the most recent checkpoint commit
const deliverablesPath = path.join(expectedRepoPath, '.shannon', 'deliverables');
const deliverablesPath = deliverablesDir(expectedRepoPath, deliverablesSubdir);
const checkpointHash = await findLatestCommit(deliverablesPath, checkpoints);
const originalWorkflowId = session.session.originalWorkflowId || session.session.id;
@@ -540,8 +579,9 @@ export async function restoreGitCheckpoint(
repoPath: string,
checkpointHash: string,
incompleteAgents: AgentName[],
deliverablesSubdir?: string,
): Promise<void> {
const deliverablesPath = path.join(repoPath, '.shannon', 'deliverables');
const deliverablesPath = deliverablesDir(repoPath, deliverablesSubdir);
const logger = createActivityLogger();
logger.info(`Restoring deliverables to ${checkpointHash}...`);
@@ -665,3 +705,36 @@ export async function logWorkflowComplete(input: ActivityInput, summary: Workflo
// 6. Clean up container
removeContainer(workflowId);
}
/**
* Merge external findings into the exploitation queue for a vulnerability type.
*
* Delegates to the FindingsProvider registered in the DI container.
* Default: no-op returning { mergedCount: 0 }.
* Consumers can override this activity at the worker level with custom findings integration.
*/
export async function mergeFindingsIntoQueue(
input: ActivityInput,
vulnType: VulnType,
): Promise<{ mergedCount: number }> {
const container = getContainer(input.workflowId);
if (!container?.findingsProvider) return { mergedCount: 0 };
return container.findingsProvider.mergeFindingsIntoQueue(input.repoPath, vulnType, input);
}
/**
* Persist pipeline state after an agent completes.
*
* Delegates to the CheckpointProvider registered in the DI container.
* Default: no-op. Consumers can override this activity at the worker level with custom persistence.
*/
export async function saveCheckpoint(
input: ActivityInput,
agentName: string,
phase: string,
state: PipelineState,
): Promise<void> {
const container = getContainer(input.workflowId);
if (!container?.checkpointProvider) return;
return container.checkpointProvider.onAgentComplete(agentName, phase, state);
}
+17
View File
@@ -0,0 +1,17 @@
/**
* Pipeline entry point — re-exports the extracted pipeline function and shared types.
*
* Consumers import from this module to call the pipeline as a library function
* within their own workflow context.
*/
export { pentestPipeline } from './workflows.js';
export type {
AgentMetrics,
PipelineInput,
PipelineState,
PipelineSummary,
ResumeState,
VulnExploitPipelineResult,
} from './shared.js';
export type { ActivityInput } from './activities.js';
+16 -2
View File
@@ -2,7 +2,8 @@ import { defineQuery } from '@temporalio/workflow';
export type { AgentMetrics } from '../types/metrics.js';
import type { PipelineConfig } from '../types/config.js';
import type { DistributedConfig, PipelineConfig, ProviderConfig } from '../types/config.js';
import type { ErrorCode } from '../types/errors.js';
import type { AgentMetrics } from '../types/metrics.js';
export interface PipelineInput {
@@ -16,6 +17,18 @@ export interface PipelineInput {
sessionId?: string; // Workspace directory name (distinct from workflowId for named workspaces)
resumeFromWorkspace?: string; // Workspace name to resume from
terminatedWorkflows?: string[]; // Workflows terminated during resume
// Config fields — serializable, flow through to ActivityInput → getOrCreateContainer()
configYAML?: string; // Raw YAML string (parsed in activity, not workflow — workflow sandbox can't use Node.js)
configData?: DistributedConfig; // Pre-parsed config (bypasses file loading)
apiKey?: string; // API key override (avoids process.env mutation)
deliverablesSubdir?: string; // Override deliverables path (default: '.shannon/deliverables')
auditDir?: string; // Override audit log directory (default: './workspaces')
promptDir?: string; // Override prompt template directory
sastSarifPath?: string; // Path to SARIF file (gates SAST-enhanced mode)
checkpointsEnabled?: boolean; // Enable checkpoint activities (default: false)
skipGitCheck?: boolean; // Skip .git directory validation in preflight (e.g. when .git is removed after clone)
providerConfig?: ProviderConfig; // LLM provider configuration (Bedrock, Vertex, LiteLLM, etc.)
}
export interface ResumeState {
@@ -34,12 +47,13 @@ export interface PipelineSummary {
}
export interface PipelineState {
status: 'running' | 'completed' | 'failed';
status: 'running' | 'completed' | 'failed' | 'cancelled';
currentPhase: string | null;
currentAgent: string | null;
completedAgents: string[];
failedAgent: string | null;
error: string | null;
errorCode?: ErrorCode;
startTime: number;
agentMetrics: Record<string, AgentMetrics>;
summary: PipelineSummary | null;
+1 -1
View File
@@ -19,7 +19,7 @@ import type { PipelineState } from './shared.js';
* safely imported into Temporal workflows. The caller must ensure
* state.summary is set before calling (via computeSummary).
*/
export function toWorkflowSummary(state: PipelineState, status: 'completed' | 'failed'): WorkflowSummary {
export function toWorkflowSummary(state: PipelineState, status: 'completed' | 'failed' | 'cancelled'): WorkflowSummary {
// state.summary must be computed before calling this mapper
const summary = state.summary;
if (!summary) {
+5 -4
View File
@@ -35,6 +35,7 @@ import { bundleWorkflowCode, NativeConnection, Worker } from '@temporalio/worker
import dotenv from 'dotenv';
import { sanitizeHostname } from '../audit/utils.js';
import { parseConfig } from '../config-parser.js';
import { deliverablesDir } from '../paths.js';
import type { PipelineConfig } from '../types/config.js';
import { fileExists, readJson } from '../utils/file-io.js';
import * as activities from './activities.js';
@@ -360,13 +361,13 @@ async function waitForWorkflowResult(
// === Deliverables Copy ===
function copyDeliverables(repoPath: string, outputPath: string): void {
const deliverablesDir = path.join(repoPath, '.shannon', 'deliverables');
if (!fs.existsSync(deliverablesDir)) {
const outputDir = deliverablesDir(repoPath);
if (!fs.existsSync(outputDir)) {
console.log('No deliverables directory found, skipping copy');
return;
}
const files = fs.readdirSync(deliverablesDir);
const files = fs.readdirSync(outputDir);
if (files.length === 0) {
console.log('No deliverables to copy');
return;
@@ -376,7 +377,7 @@ function copyDeliverables(repoPath: string, outputPath: string): void {
for (const file of files) {
if (file === '.git') continue;
const src = path.join(deliverablesDir, file);
const src = path.join(outputDir, file);
const dest = path.join(outputPath, file);
fs.cpSync(src, dest, { recursive: true });
}
@@ -9,6 +9,39 @@
* Pure functions with no side effects — safe for Temporal workflow sandbox.
*/
import { ErrorCode } from '../types/errors.js';
/**
* Maps an ApplicationFailure type string to a structured ErrorCode.
*
* Activities classify errors via classifyErrorForTemporal() and throw
* ApplicationFailure with a type string. This function maps those strings
* to stable ErrorCode values so consumers can switch on codes instead of
* string-matching error messages.
*/
const ERROR_TYPE_TO_CODE: Record<string, ErrorCode> = {
AuthenticationError: ErrorCode.AUTH_FAILED,
BillingError: ErrorCode.BILLING_ERROR,
RateLimitError: ErrorCode.API_RATE_LIMITED,
ConfigurationError: ErrorCode.CONFIG_VALIDATION_FAILED,
OutputValidationError: ErrorCode.OUTPUT_VALIDATION_FAILED,
AgentExecutionError: ErrorCode.AGENT_EXECUTION_FAILED,
GitError: ErrorCode.GIT_CHECKPOINT_FAILED,
InvalidTargetError: ErrorCode.TARGET_UNREACHABLE,
};
export function classifyErrorCode(error: unknown): ErrorCode | undefined {
let current: unknown = error;
while (current instanceof Error) {
if ('type' in current && typeof (current as { type: unknown }).type === 'string') {
const code = ERROR_TYPE_TO_CODE[(current as { type: string }).type];
if (code) return code;
}
current = (current as { cause?: unknown }).cause;
}
return undefined;
}
/** Maps Temporal error type strings to actionable remediation hints. */
const REMEDIATION_HINTS: Record<string, string> = {
AuthenticationError: 'Verify ANTHROPIC_API_KEY or CLAUDE_CODE_OAUTH_TOKEN in .env is valid and not expired.',
+75 -5
View File
@@ -23,7 +23,14 @@
* - Graceful failure handling: pipelines continue if one fails
*/
import { log, proxyActivities, setHandler, workflowInfo } from '@temporalio/workflow';
import {
ApplicationFailure,
isCancellation,
log,
proxyActivities,
setHandler,
workflowInfo,
} from '@temporalio/workflow';
import type { AgentName, VulnType } from '../types/agents.js';
import { ALL_AGENTS } from '../types/agents.js';
import type * as activities from './activities.js';
@@ -39,7 +46,7 @@ import {
type VulnExploitPipelineResult,
} from './shared.js';
import { toWorkflowSummary } from './summary-mapper.js';
import { formatWorkflowError } from './workflow-errors.js';
import { classifyErrorCode, formatWorkflowError } from './workflow-errors.js';
// Retry configuration for production (long intervals for billing recovery)
const PRODUCTION_RETRY = {
@@ -127,7 +134,28 @@ function computeSummary(state: PipelineState): PipelineSummary {
};
}
export async function pentestPipelineWorkflow(input: PipelineInput): Promise<PipelineState> {
/**
* Core pipeline orchestration. Coordinates the pentest pipeline stages.
*
* IMPORTANT: This function uses Temporal workflow APIs internally (proxyActivities,
* queries). It can ONLY be called from within a Temporal workflow execution.
* Do not call from standalone scripts or activity code.
*/
export async function pentestPipeline(input: PipelineInput): Promise<PipelineState> {
// Validate repoPath: reject traversal attempts and require absolute path
if (!input.repoPath || input.repoPath.includes('..')) {
throw ApplicationFailure.nonRetryable(
`Invalid repoPath: path traversal not allowed (received: ${input.repoPath ?? '<empty>'})`,
'ConfigurationError',
);
}
if (!input.repoPath.startsWith('/')) {
throw ApplicationFailure.nonRetryable(
`Invalid repoPath: absolute path required (received: ${input.repoPath})`,
'ConfigurationError',
);
}
const { workflowId } = workflowInfo();
// Select activity proxy based on mode: testing (fast), subscription (extended), or default
@@ -176,20 +204,29 @@ export async function pentestPipelineWorkflow(input: PipelineInput): Promise<Pip
...(input.pipelineTestingMode !== undefined && {
pipelineTestingMode: input.pipelineTestingMode,
}),
// Config fields — flow through to getOrCreateContainer()
...(input.configYAML !== undefined && { configYAML: input.configYAML }),
...(input.apiKey !== undefined && { apiKey: input.apiKey }),
...(input.deliverablesSubdir !== undefined && { deliverablesSubdir: input.deliverablesSubdir }),
...(input.auditDir !== undefined && { auditDir: input.auditDir }),
...(input.promptDir !== undefined && { promptDir: input.promptDir }),
...(input.sastSarifPath !== undefined && { sastSarifPath: input.sastSarifPath }),
...(input.skipGitCheck !== undefined && { skipGitCheck: input.skipGitCheck }),
...(input.providerConfig !== undefined && { providerConfig: input.providerConfig }),
};
let resumeState: ResumeState | null = null;
if (input.resumeFromWorkspace) {
// 1. Load resume state (validates workspace, cross-checks deliverables)
resumeState = await a.loadResumeState(input.resumeFromWorkspace, input.webUrl, input.repoPath);
resumeState = await a.loadResumeState(input.resumeFromWorkspace, input.webUrl, input.repoPath, input.deliverablesSubdir);
// 2. Restore git workspace and clean up incomplete deliverables
const incompleteAgents = ALL_AGENTS.filter(
(agentName) => !resumeState?.completedAgents.includes(agentName),
) as AgentName[];
await a.restoreGitCheckpoint(input.repoPath, resumeState.checkpointHash, incompleteAgents);
await a.restoreGitCheckpoint(input.repoPath, resumeState.checkpointHash, incompleteAgents, input.deliverablesSubdir);
// 3. Short-circuit if all agents already completed
if (resumeState.completedAgents.length === ALL_AGENTS.length) {
@@ -228,6 +265,9 @@ export async function pentestPipelineWorkflow(input: PipelineInput): Promise<Pip
await a.logPhaseTransition(activityInput, phaseName, 'start');
state.agentMetrics[agentName] = await runAgent(activityInput);
state.completedAgents.push(agentName);
if (input.checkpointsEnabled) {
await a.saveCheckpoint(activityInput, agentName, phaseName, state);
}
await a.logPhaseTransition(activityInput, phaseName, 'complete');
} else {
log.info(`Skipping ${agentName} (already complete)`);
@@ -392,10 +432,16 @@ export async function pentestPipelineWorkflow(input: PipelineInput): Promise<Pip
let vulnMetrics: AgentMetrics | null = null;
if (!shouldSkip(vulnAgentName)) {
vulnMetrics = await runVulnAgent();
if (input.checkpointsEnabled) {
await a.saveCheckpoint(activityInput, vulnAgentName, 'vulnerability-analysis', state);
}
} else {
log.info(`Skipping ${vulnAgentName} (already complete)`);
}
// 1.5. Merge external findings (SAST, SCA, etc.) into exploitation queue
await a.mergeFindingsIntoQueue(activityInput, vulnType);
// 2. Check exploitation queue for actionable findings
const decision = await a.checkExploitationQueue(activityInput, vulnType);
@@ -404,6 +450,9 @@ export async function pentestPipelineWorkflow(input: PipelineInput): Promise<Pip
if (decision.shouldExploit) {
if (!shouldSkip(exploitAgentName)) {
exploitMetrics = await runExploitAgent();
if (input.checkpointsEnabled) {
await a.saveCheckpoint(activityInput, exploitAgentName, 'exploitation', state);
}
} else {
log.info(`Skipping ${exploitAgentName} (already complete)`);
}
@@ -454,6 +503,9 @@ export async function pentestPipelineWorkflow(input: PipelineInput): Promise<Pip
// Then run the report agent to add executive summary and clean up
state.agentMetrics.report = await a.runReportAgent(activityInput);
state.completedAgents.push('report');
if (input.checkpointsEnabled) {
await a.saveCheckpoint(activityInput, 'report', 'reporting', state);
}
// Inject model metadata into the final report
await a.injectReportMetadataActivity(activityInput);
@@ -474,9 +526,22 @@ export async function pentestPipelineWorkflow(input: PipelineInput): Promise<Pip
return state;
} catch (error) {
// Cancellation: return structured state instead of throwing
if (isCancellation(error)) {
state.status = 'cancelled';
state.error = `Cancelled during phase: ${state.currentPhase ?? 'unknown'}`;
state.summary = computeSummary(state);
await a.logWorkflowComplete(activityInput, toWorkflowSummary(state, 'cancelled'));
return state;
}
state.status = 'failed';
state.failedAgent = state.currentAgent;
state.error = formatWorkflowError(error, state.currentPhase, state.currentAgent);
const errorCode = classifyErrorCode(error);
if (errorCode) {
state.errorCode = errorCode;
}
state.summary = computeSummary(state);
// Log workflow failure summary
@@ -485,3 +550,8 @@ export async function pentestPipelineWorkflow(input: PipelineInput): Promise<Pip
throw error;
}
}
/** OSS workflow entry point — thin shell around the extracted pipeline function. */
export async function pentestPipelineWorkflow(input: PipelineInput): Promise<PipelineState> {
return pentestPipeline(input);
}