refactor: extract helpers from long functions in client, workflows, and agent-execution

- client.ts: extract parseCliArgs, resolveWorkspace, buildPipelineInput, display helpers, waitForWorkflowResult from startPipeline
- workflows.ts: extract runSequentialPhase, buildPipelineConfigs, aggregatePipelineResults to reduce workflow body
- agent-execution.ts: add failAgent private method to deduplicate rollback+audit+error pattern in steps 6-8
This commit is contained in:
ajmallesh
2026-02-16 18:53:22 -08:00
parent 413c47af5c
commit d696a7584b
3 changed files with 388 additions and 329 deletions
+69 -58
View File
@@ -23,7 +23,7 @@
import type { ActivityLogger } from '../types/activity-logger.js';
import { Result, ok, err, isErr } from '../types/result.js';
import { ErrorCode } from '../types/errors.js';
import { ErrorCode, type PentestErrorType } from '../types/errors.js';
import { PentestError } from './error-handling.js';
import { isSpendingCapBehavior } from '../utils/billing-detection.js';
import { AGENTS } from '../session-manager.js';
@@ -56,6 +56,17 @@ export interface AgentExecutionInput {
attemptNumber: number;
}
interface FailAgentOpts {
attemptNumber: number;
result: ClaudePromptResult;
rollbackReason: string;
errorMessage: string;
errorCode: ErrorCode;
category: PentestErrorType;
retryable: boolean;
context: Record<string, unknown>;
}
/**
* Service for executing agents with full lifecycle management.
*
@@ -152,73 +163,43 @@ export class AgentExecutionService {
if (result.success && (result.turns ?? 0) <= 2 && (result.cost || 0) === 0) {
const resultText = result.result || '';
if (isSpendingCapBehavior(result.turns ?? 0, result.cost || 0, resultText)) {
await rollbackGitWorkspace(repoPath, 'spending cap detected', logger);
const endResult: AgentEndResult = {
attemptNumber,
duration_ms: result.duration,
cost_usd: 0,
success: false,
model: result.model,
error: `Spending cap likely reached: ${resultText.slice(0, 100)}`,
};
await auditSession.endAgent(agentName, endResult);
return err(
new PentestError(
`Spending cap likely reached: ${resultText.slice(0, 100)}`,
'billing',
true, // Retryable with long backoff
{ agentName, turns: result.turns, cost: result.cost },
ErrorCode.SPENDING_CAP_REACHED
)
);
return this.failAgent(agentName, repoPath, auditSession, logger, {
attemptNumber, result,
rollbackReason: 'spending cap detected',
errorMessage: `Spending cap likely reached: ${resultText.slice(0, 100)}`,
errorCode: ErrorCode.SPENDING_CAP_REACHED,
category: 'billing',
retryable: true,
context: { agentName, turns: result.turns, cost: result.cost },
});
}
}
// 7. Handle execution failure
if (!result.success) {
await rollbackGitWorkspace(repoPath, 'execution failure', logger);
const endResult: AgentEndResult = {
attemptNumber,
duration_ms: result.duration,
cost_usd: result.cost || 0,
success: false,
model: result.model,
error: result.error || 'Execution failed',
};
await auditSession.endAgent(agentName, endResult);
return err(
new PentestError(
result.error || 'Agent execution failed',
'validation',
result.retryable ?? true,
{ agentName, originalError: result.error },
ErrorCode.AGENT_EXECUTION_FAILED
)
);
return this.failAgent(agentName, repoPath, auditSession, logger, {
attemptNumber, result,
rollbackReason: 'execution failure',
errorMessage: result.error || 'Agent execution failed',
errorCode: ErrorCode.AGENT_EXECUTION_FAILED,
category: 'validation',
retryable: result.retryable ?? true,
context: { agentName, originalError: result.error },
});
}
// 8. Validate output
const validationPassed = await validateAgentOutput(result, agentName, repoPath, logger);
if (!validationPassed) {
await rollbackGitWorkspace(repoPath, 'validation failure', logger);
const endResult: AgentEndResult = {
attemptNumber,
duration_ms: result.duration,
cost_usd: result.cost || 0,
success: false,
model: result.model,
error: 'Output validation failed',
};
await auditSession.endAgent(agentName, endResult);
return err(
new PentestError(
`Agent ${agentName} failed output validation`,
'validation',
true, // Retryable - agent may succeed on retry
{ agentName, deliverableFilename: AGENTS[agentName].deliverableFilename },
ErrorCode.OUTPUT_VALIDATION_FAILED
)
);
return this.failAgent(agentName, repoPath, auditSession, logger, {
attemptNumber, result,
rollbackReason: 'validation failure',
errorMessage: `Agent ${agentName} failed output validation`,
errorCode: ErrorCode.OUTPUT_VALIDATION_FAILED,
category: 'validation',
retryable: true,
context: { agentName, deliverableFilename: AGENTS[agentName].deliverableFilename },
});
}
// 9. Success - commit deliverables, then capture checkpoint hash
@@ -238,6 +219,36 @@ export class AgentExecutionService {
return ok(endResult);
}
private async failAgent(
agentName: AgentName,
repoPath: string,
auditSession: AuditSession,
logger: ActivityLogger,
opts: FailAgentOpts
): Promise<Result<AgentEndResult, PentestError>> {
await rollbackGitWorkspace(repoPath, opts.rollbackReason, logger);
const endResult: AgentEndResult = {
attemptNumber: opts.attemptNumber,
duration_ms: opts.result.duration,
cost_usd: opts.result.cost || 0,
success: false,
model: opts.result.model,
error: opts.errorMessage,
};
await auditSession.endAgent(agentName, endResult);
return err(
new PentestError(
opts.errorMessage,
opts.category,
opts.retryable,
opts.context,
opts.errorCode
)
);
}
/**
* Execute an agent, throwing PentestError on failure.
*
+215 -153
View File
@@ -26,7 +26,7 @@
* TEMPORAL_ADDRESS - Temporal server address (default: localhost:7233)
*/
import { Connection, Client, WorkflowNotFoundError } from '@temporalio/client';
import { Connection, Client, WorkflowNotFoundError, type WorkflowHandle } from '@temporalio/client';
import dotenv from 'dotenv';
import { displaySplashScreen } from '../splash-screen.js';
import { sanitizeHostname } from '../audit/utils.js';
@@ -139,47 +139,58 @@ function showUsage(): void {
);
}
async function startPipeline(): Promise<void> {
const args = process.argv.slice(2);
// === CLI Argument Parsing ===
if (args.includes('--help') || args.includes('-h') || args.length === 0) {
interface CliArgs {
webUrl: string;
repoPath: string;
configPath?: string;
outputPath?: string;
displayOutputPath?: string;
pipelineTestingMode: boolean;
customWorkflowId?: string;
waitForCompletion: boolean;
resumeFromWorkspace?: string;
}
function parseCliArgs(argv: string[]): CliArgs {
if (argv.includes('--help') || argv.includes('-h') || argv.length === 0) {
showUsage();
process.exit(0);
}
// Parse arguments
let webUrl: string | undefined;
let repoPath: string | undefined;
let configPath: string | undefined;
let outputPath: string | undefined;
let displayOutputPath: string | undefined; // Host path for display purposes
let displayOutputPath: string | undefined;
let pipelineTestingMode = false;
let customWorkflowId: string | undefined;
let waitForCompletion = false;
let resumeFromWorkspace: string | undefined;
for (let i = 0; i < args.length; i++) {
const arg = args[i];
for (let i = 0; i < argv.length; i++) {
const arg = argv[i];
if (arg === '--config') {
const nextArg = args[i + 1];
const nextArg = argv[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
configPath = nextArg;
i++;
}
} else if (arg === '--output') {
const nextArg = args[i + 1];
const nextArg = argv[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
outputPath = nextArg;
i++;
}
} else if (arg === '--display-output') {
const nextArg = args[i + 1];
const nextArg = argv[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
displayOutputPath = nextArg;
i++;
}
} else if (arg === '--workflow-id') {
const nextArg = args[i + 1];
const nextArg = argv[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
customWorkflowId = nextArg;
i++;
@@ -187,7 +198,7 @@ async function startPipeline(): Promise<void> {
} else if (arg === '--pipeline-testing') {
pipelineTestingMode = true;
} else if (arg === '--workspace') {
const nextArg = args[i + 1];
const nextArg = argv[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
resumeFromWorkspace = nextArg;
i++;
@@ -209,7 +220,189 @@ async function startPipeline(): Promise<void> {
process.exit(1);
}
// Display splash screen
return {
webUrl, repoPath, pipelineTestingMode, waitForCompletion,
...(configPath && { configPath }),
...(outputPath && { outputPath }),
...(displayOutputPath && { displayOutputPath }),
...(customWorkflowId && { customWorkflowId }),
...(resumeFromWorkspace && { resumeFromWorkspace }),
};
}
// === Workspace Resolution ===
interface WorkspaceResolution {
workflowId: string;
sessionId: string;
isResume: boolean;
terminatedWorkflows: string[];
}
async function resolveWorkspace(
client: Client,
args: CliArgs
): Promise<WorkspaceResolution> {
if (!args.resumeFromWorkspace) {
const hostname = sanitizeHostname(args.webUrl);
const workflowId = args.customWorkflowId || `${hostname}_shannon-${Date.now()}`;
return {
workflowId,
sessionId: workflowId,
isResume: false,
terminatedWorkflows: [],
};
}
const workspace = args.resumeFromWorkspace;
const sessionPath = path.join('./audit-logs', workspace, 'session.json');
const workspaceExists = await fileExists(sessionPath);
if (workspaceExists) {
console.log('=== RESUME MODE ===');
console.log(`Workspace: ${workspace}\n`);
const terminatedWorkflows = await terminateExistingWorkflows(client, workspace);
if (terminatedWorkflows.length > 0) {
console.log(`Terminated ${terminatedWorkflows.length} previous workflow(s)\n`);
}
const session = await readJson<SessionJson>(sessionPath);
if (session.session.webUrl !== args.webUrl) {
console.error('ERROR: URL mismatch with workspace');
console.error(` Workspace URL: ${session.session.webUrl}`);
console.error(` Provided URL: ${args.webUrl}`);
process.exit(1);
}
return {
workflowId: `${workspace}_resume_${Date.now()}`,
sessionId: workspace,
isResume: true,
terminatedWorkflows,
};
}
if (!isValidWorkspaceName(workspace)) {
console.error(`ERROR: Invalid workspace name: "${workspace}"`);
console.error(' Must be 1-128 characters, alphanumeric/hyphens/underscores, starting with alphanumeric');
process.exit(1);
}
console.log('=== NEW NAMED WORKSPACE ===');
console.log(`Workspace: ${workspace}\n`);
return {
workflowId: `${workspace}_shannon-${Date.now()}`,
sessionId: workspace,
isResume: false,
terminatedWorkflows: [],
};
}
// === Pipeline Input Construction ===
function buildPipelineInput(args: CliArgs, workspace: WorkspaceResolution): PipelineInput {
return {
webUrl: args.webUrl,
repoPath: args.repoPath,
workflowId: workspace.workflowId,
sessionId: workspace.sessionId,
...(args.configPath && { configPath: args.configPath }),
...(args.outputPath && { outputPath: args.outputPath }),
...(args.pipelineTestingMode && { pipelineTestingMode: args.pipelineTestingMode }),
...(workspace.isResume && args.resumeFromWorkspace && { resumeFromWorkspace: args.resumeFromWorkspace }),
...(workspace.terminatedWorkflows.length > 0 && { terminatedWorkflows: workspace.terminatedWorkflows }),
};
}
// === Display Helpers ===
function displayWorkflowInfo(args: CliArgs, workspace: WorkspaceResolution): void {
console.log(`✓ Workflow started: ${workspace.workflowId}`);
if (workspace.isResume) {
console.log(` (Resuming workspace: ${workspace.sessionId})`);
}
console.log();
console.log(` Target: ${args.webUrl}`);
console.log(` Repository: ${args.repoPath}`);
console.log(` Workspace: ${workspace.sessionId}`);
if (args.configPath) {
console.log(` Config: ${args.configPath}`);
}
if (args.displayOutputPath) {
console.log(` Output: ${args.displayOutputPath}`);
}
if (args.pipelineTestingMode) {
console.log(` Mode: Pipeline Testing`);
}
console.log();
}
function displayMonitoringInfo(args: CliArgs, workspace: WorkspaceResolution): void {
const effectiveDisplayPath = args.displayOutputPath || args.outputPath || './audit-logs';
const outputDir = `${effectiveDisplayPath}/${workspace.sessionId}`;
console.log('Monitor progress:');
console.log(` Web UI: http://localhost:8233/namespaces/default/workflows/${workspace.workflowId}`);
console.log(` Logs: ./shannon logs ID=${workspace.workflowId}`);
console.log();
console.log('Output:');
console.log(` Reports: ${outputDir}`);
console.log();
}
// === Workflow Result Handling ===
async function waitForWorkflowResult(
handle: WorkflowHandle<(input: PipelineInput) => Promise<PipelineState>>,
workspace: WorkspaceResolution
): Promise<void> {
const progressInterval = setInterval(async () => {
try {
const progress = await handle.query<PipelineProgress>(PROGRESS_QUERY);
const elapsed = Math.floor(progress.elapsedMs / 1000);
console.log(
`[${elapsed}s] Phase: ${progress.currentPhase || 'unknown'} | Agent: ${progress.currentAgent || 'none'} | Completed: ${progress.completedAgents.length}/13`
);
} catch {
// Workflow may have completed
}
}, 30000);
try {
const result = await handle.result();
clearInterval(progressInterval);
console.log('\nPipeline completed successfully!');
if (result.summary) {
console.log(`Duration: ${Math.floor(result.summary.totalDurationMs / 1000)}s`);
console.log(`Agents completed: ${result.summary.agentCount}`);
console.log(`Total turns: ${result.summary.totalTurns}`);
console.log(`Run cost: $${result.summary.totalCostUsd.toFixed(4)}`);
if (workspace.isResume) {
try {
const session = await readJson<SessionJson>(
path.join('./audit-logs', workspace.sessionId, 'session.json')
);
console.log(`Cumulative cost: $${session.metrics.total_cost_usd.toFixed(4)}`);
} catch {
// Non-fatal, skip cumulative cost display
}
}
}
} catch (error) {
clearInterval(progressInterval);
console.error('\nPipeline failed:', error);
process.exit(1);
}
}
// === Main Entry Point ===
async function startPipeline(): Promise<void> {
const args = parseCliArgs(process.argv.slice(2));
await displaySplashScreen();
const address = process.env.TEMPORAL_ADDRESS || 'localhost:7233';
@@ -219,155 +412,24 @@ async function startPipeline(): Promise<void> {
const client = new Client({ connection });
try {
let terminatedWorkflows: string[] = [];
let workflowId: string;
let sessionId: string; // Workspace name (persistent directory)
let isResume = false;
const workspace = await resolveWorkspace(client, args);
const input = buildPipelineInput(args, workspace);
if (resumeFromWorkspace) {
const sessionPath = path.join('./audit-logs', resumeFromWorkspace, 'session.json');
const workspaceExists = await fileExists(sessionPath);
if (workspaceExists) {
isResume = true;
console.log('=== RESUME MODE ===');
console.log(`Workspace: ${resumeFromWorkspace}\n`);
// Terminate any running workflows for this workspace
terminatedWorkflows = await terminateExistingWorkflows(client, resumeFromWorkspace);
if (terminatedWorkflows.length > 0) {
console.log(`Terminated ${terminatedWorkflows.length} previous workflow(s)\n`);
}
// Validate URL matches workspace
const session = await readJson<SessionJson>(sessionPath);
if (session.session.webUrl !== webUrl) {
console.error('ERROR: URL mismatch with workspace');
console.error(` Workspace URL: ${session.session.webUrl}`);
console.error(` Provided URL: ${webUrl}`);
process.exit(1);
}
// Generate resume workflow ID
workflowId = `${resumeFromWorkspace}_resume_${Date.now()}`;
sessionId = resumeFromWorkspace;
} else {
if (!isValidWorkspaceName(resumeFromWorkspace)) {
console.error(`ERROR: Invalid workspace name: "${resumeFromWorkspace}"`);
console.error(' Must be 1-128 characters, alphanumeric/hyphens/underscores, starting with alphanumeric');
process.exit(1);
}
console.log('=== NEW NAMED WORKSPACE ===');
console.log(`Workspace: ${resumeFromWorkspace}\n`);
workflowId = `${resumeFromWorkspace}_shannon-${Date.now()}`;
sessionId = resumeFromWorkspace;
}
} else {
const hostname = sanitizeHostname(webUrl);
workflowId = customWorkflowId || `${hostname}_shannon-${Date.now()}`;
sessionId = workflowId;
}
const input: PipelineInput = {
webUrl,
repoPath,
workflowId,
sessionId,
...(configPath && { configPath }),
...(outputPath && { outputPath }),
...(pipelineTestingMode && { pipelineTestingMode }),
...(isResume && resumeFromWorkspace && { resumeFromWorkspace }),
...(terminatedWorkflows.length > 0 && { terminatedWorkflows }),
};
// Use displayOutputPath (host path) if provided, otherwise fall back to outputPath or default
const effectiveDisplayPath = displayOutputPath || outputPath || './audit-logs';
const outputDir = `${effectiveDisplayPath}/${sessionId}`;
console.log(`✓ Workflow started: ${workflowId}`);
if (isResume) {
console.log(` (Resuming workspace: ${sessionId})`);
}
console.log();
console.log(` Target: ${webUrl}`);
console.log(` Repository: ${repoPath}`);
console.log(` Workspace: ${sessionId}`);
if (configPath) {
console.log(` Config: ${configPath}`);
}
if (displayOutputPath) {
console.log(` Output: ${displayOutputPath}`);
}
if (pipelineTestingMode) {
console.log(` Mode: Pipeline Testing`);
}
console.log();
// Start workflow by name (not by importing the function)
const handle = await client.workflow.start<(input: PipelineInput) => Promise<PipelineState>>(
'pentestPipelineWorkflow',
{
taskQueue: 'shannon-pipeline',
workflowId,
workflowId: workspace.workflowId,
args: [input],
}
);
if (!waitForCompletion) {
console.log('Monitor progress:');
console.log(` Web UI: http://localhost:8233/namespaces/default/workflows/${workflowId}`);
console.log(` Logs: ./shannon logs ID=${workflowId}`);
console.log();
console.log('Output:');
console.log(` Reports: ${outputDir}`);
console.log();
return;
}
displayWorkflowInfo(args, workspace);
// Poll for progress every 30 seconds
const progressInterval = setInterval(async () => {
try {
const progress = await handle.query<PipelineProgress>(PROGRESS_QUERY);
const elapsed = Math.floor(progress.elapsedMs / 1000);
console.log(
`[${elapsed}s] Phase: ${progress.currentPhase || 'unknown'} | Agent: ${progress.currentAgent || 'none'} | Completed: ${progress.completedAgents.length}/13`
);
} catch {
// Workflow may have completed
}
}, 30000);
try {
const result = await handle.result();
clearInterval(progressInterval);
console.log('\nPipeline completed successfully!');
if (result.summary) {
console.log(`Duration: ${Math.floor(result.summary.totalDurationMs / 1000)}s`);
console.log(`Agents completed: ${result.summary.agentCount}`);
console.log(`Total turns: ${result.summary.totalTurns}`);
console.log(`Run cost: $${result.summary.totalCostUsd.toFixed(4)}`);
// Show cumulative cost from session.json (includes all resume attempts)
if (isResume) {
try {
const session = await readJson<SessionJson>(
path.join('./audit-logs', sessionId, 'session.json')
);
console.log(`Cumulative cost: $${session.metrics.total_cost_usd.toFixed(4)}`);
} catch {
// Non-fatal, skip cumulative cost display
}
}
}
} catch (error) {
clearInterval(progressInterval);
console.error('\nPipeline failed:', error);
process.exit(1);
if (args.waitForCompletion) {
await waitForWorkflowResult(handle, workspace);
} else {
displayMonitoringInfo(args, workspace);
}
} finally {
await connection.close();
+104 -118
View File
@@ -186,96 +186,34 @@ export async function pentestPipelineWorkflow(
return resumeState?.completedAgents.includes(agentName) ?? false;
};
try {
// === Phase 1: Pre-Reconnaissance ===
if (!shouldSkip('pre-recon')) {
state.currentPhase = 'pre-recon';
state.currentAgent = 'pre-recon';
await a.logPhaseTransition(activityInput, 'pre-recon', 'start');
state.agentMetrics['pre-recon'] =
await a.runPreReconAgent(activityInput);
state.completedAgents.push('pre-recon');
await a.logPhaseTransition(activityInput, 'pre-recon', 'complete');
// Run a sequential agent phase (pre-recon, recon)
async function runSequentialPhase(
phaseName: string,
agentName: AgentName,
runAgent: (input: ActivityInput) => Promise<AgentMetrics>
): Promise<void> {
if (!shouldSkip(agentName)) {
state.currentPhase = phaseName;
state.currentAgent = agentName;
await a.logPhaseTransition(activityInput, phaseName, 'start');
state.agentMetrics[agentName] = await runAgent(activityInput);
state.completedAgents.push(agentName);
await a.logPhaseTransition(activityInput, phaseName, 'complete');
} else {
log.info('Skipping pre-recon (already complete)');
state.completedAgents.push('pre-recon');
log.info(`Skipping ${agentName} (already complete)`);
state.completedAgents.push(agentName);
}
}
// === Phase 2: Reconnaissance ===
if (!shouldSkip('recon')) {
state.currentPhase = 'recon';
state.currentAgent = 'recon';
await a.logPhaseTransition(activityInput, 'recon', 'start');
state.agentMetrics['recon'] = await a.runReconAgent(activityInput);
state.completedAgents.push('recon');
await a.logPhaseTransition(activityInput, 'recon', 'complete');
} else {
log.info('Skipping recon (already complete)');
state.completedAgents.push('recon');
}
// === Phases 3-4: Vulnerability Analysis + Exploitation (Pipelined) ===
// Each vuln type runs as an independent pipeline:
// vuln agent → queue check → conditional exploit agent
// This eliminates the synchronization barrier between phases - each exploit
// starts immediately when its vuln agent finishes, not waiting for all.
state.currentPhase = 'vulnerability-exploitation';
state.currentAgent = 'pipelines';
await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'start');
// Helper: Run a single vuln→exploit pipeline with skip logic
async function runVulnExploitPipeline(
vulnType: VulnType,
runVulnAgent: () => Promise<AgentMetrics>,
runExploitAgent: () => Promise<AgentMetrics>
): Promise<VulnExploitPipelineResult> {
const vulnAgentName = `${vulnType}-vuln`;
const exploitAgentName = `${vulnType}-exploit`;
// Step 1: Run vulnerability agent (or skip if completed)
let vulnMetrics: AgentMetrics | null = null;
if (!shouldSkip(vulnAgentName)) {
vulnMetrics = await runVulnAgent();
} else {
log.info(`Skipping ${vulnAgentName} (already complete)`);
}
// Step 2: Check exploitation queue (only if vuln agent ran or completed previously)
const decision = await a.checkExploitationQueue(activityInput, vulnType);
// Step 3: Conditionally run exploit agent (skip if already completed)
let exploitMetrics: AgentMetrics | null = null;
if (decision.shouldExploit) {
if (!shouldSkip(exploitAgentName)) {
exploitMetrics = await runExploitAgent();
} else {
log.info(`Skipping ${exploitAgentName} (already complete)`);
}
}
return {
vulnType,
vulnMetrics,
exploitMetrics,
exploitDecision: {
shouldExploit: decision.shouldExploit,
vulnerabilityCount: decision.vulnerabilityCount,
},
error: null,
};
}
// Determine which pipelines to run (skip if both vuln and exploit completed)
const pipelinesToRun: Array<Promise<VulnExploitPipelineResult>> = [];
// Only run pipeline if at least one agent (vuln or exploit) is incomplete
const pipelineConfigs: Array<{
vulnType: VulnType;
vulnAgent: string;
exploitAgent: string;
runVuln: () => Promise<AgentMetrics>;
runExploit: () => Promise<AgentMetrics>;
}> = [
// Build pipeline configs for the 5 vuln→exploit pairs
function buildPipelineConfigs(): Array<{
vulnType: VulnType;
vulnAgent: string;
exploitAgent: string;
runVuln: () => Promise<AgentMetrics>;
runExploit: () => Promise<AgentMetrics>;
}> {
return [
{
vulnType: 'injection',
vulnAgent: 'injection-vuln',
@@ -312,56 +250,34 @@ export async function pentestPipelineWorkflow(
runExploit: () => a.runAuthzExploitAgent(activityInput),
},
];
}
for (const config of pipelineConfigs) {
const vulnComplete = shouldSkip(config.vulnAgent);
const exploitComplete = shouldSkip(config.exploitAgent);
// Only run pipeline if at least one agent needs to run
if (!vulnComplete || !exploitComplete) {
pipelinesToRun.push(
runVulnExploitPipeline(config.vulnType, config.runVuln, config.runExploit)
);
} else {
log.info(
`Skipping entire ${config.vulnType} pipeline (both agents complete)`
);
// Still need to mark them as completed in state
state.completedAgents.push(config.vulnAgent, config.exploitAgent);
}
}
// Run pipelines in parallel with graceful failure handling
// Promise.allSettled ensures other pipelines continue if one fails
const pipelineResults = await Promise.allSettled(pipelinesToRun);
// Aggregate results from all pipelines
// Aggregate results from settled pipeline promises into workflow state
function aggregatePipelineResults(
results: PromiseSettledResult<VulnExploitPipelineResult>[]
): void {
const failedPipelines: string[] = [];
for (const result of pipelineResults) {
for (const result of results) {
if (result.status === 'fulfilled') {
const { vulnType, vulnMetrics, exploitMetrics } = result.value;
// Record vuln agent
const vulnAgentName = `${vulnType}-vuln`;
if (vulnMetrics) {
state.agentMetrics[vulnAgentName] = vulnMetrics;
state.completedAgents.push(vulnAgentName);
} else if (shouldSkip(vulnAgentName)) {
// Agent was skipped because already complete
state.completedAgents.push(vulnAgentName);
}
// Record exploit agent (if it ran)
const exploitAgentName = `${vulnType}-exploit`;
if (exploitMetrics) {
state.agentMetrics[exploitAgentName] = exploitMetrics;
state.completedAgents.push(exploitAgentName);
} else if (shouldSkip(exploitAgentName)) {
// Agent was skipped because already complete
state.completedAgents.push(exploitAgentName);
}
} else {
// Pipeline failed - log error but continue with others
const errorMsg =
result.reason instanceof Error
? result.reason.message
@@ -370,14 +286,84 @@ export async function pentestPipelineWorkflow(
}
}
// Log any pipeline failures (workflow continues despite failures)
if (failedPipelines.length > 0) {
log.warn(`${failedPipelines.length} pipeline(s) failed`, {
failures: failedPipelines,
});
}
}
try {
// === Phase 1: Pre-Reconnaissance ===
await runSequentialPhase('pre-recon', 'pre-recon', a.runPreReconAgent);
// === Phase 2: Reconnaissance ===
await runSequentialPhase('recon', 'recon', a.runReconAgent);
// === Phases 3-4: Vulnerability Analysis + Exploitation (Pipelined) ===
// Each vuln type runs as an independent pipeline:
// vuln agent → queue check → conditional exploit agent
// Exploits start immediately when their vuln finishes, not waiting for all.
state.currentPhase = 'vulnerability-exploitation';
state.currentAgent = 'pipelines';
await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'start');
// Closure over shouldSkip and activityInput by design (Temporal replay safety)
async function runVulnExploitPipeline(
vulnType: VulnType,
runVulnAgent: () => Promise<AgentMetrics>,
runExploitAgent: () => Promise<AgentMetrics>
): Promise<VulnExploitPipelineResult> {
const vulnAgentName = `${vulnType}-vuln`;
const exploitAgentName = `${vulnType}-exploit`;
let vulnMetrics: AgentMetrics | null = null;
if (!shouldSkip(vulnAgentName)) {
vulnMetrics = await runVulnAgent();
} else {
log.info(`Skipping ${vulnAgentName} (already complete)`);
}
const decision = await a.checkExploitationQueue(activityInput, vulnType);
let exploitMetrics: AgentMetrics | null = null;
if (decision.shouldExploit) {
if (!shouldSkip(exploitAgentName)) {
exploitMetrics = await runExploitAgent();
} else {
log.info(`Skipping ${exploitAgentName} (already complete)`);
}
}
return {
vulnType,
vulnMetrics,
exploitMetrics,
exploitDecision: {
shouldExploit: decision.shouldExploit,
vulnerabilityCount: decision.vulnerabilityCount,
},
error: null,
};
}
const pipelineConfigs = buildPipelineConfigs();
const pipelinesToRun: Array<Promise<VulnExploitPipelineResult>> = [];
for (const config of pipelineConfigs) {
if (!shouldSkip(config.vulnAgent) || !shouldSkip(config.exploitAgent)) {
pipelinesToRun.push(
runVulnExploitPipeline(config.vulnType, config.runVuln, config.runExploit)
);
} else {
log.info(`Skipping entire ${config.vulnType} pipeline (both agents complete)`);
state.completedAgents.push(config.vulnAgent, config.exploitAgent);
}
}
const pipelineResults = await Promise.allSettled(pipelinesToRun);
aggregatePipelineResults(pipelineResults);
// Update phase markers
state.currentPhase = 'exploitation';
state.currentAgent = null;
await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'complete');