diff --git a/src/services/agent-execution.ts b/src/services/agent-execution.ts index 9574739..401f051 100644 --- a/src/services/agent-execution.ts +++ b/src/services/agent-execution.ts @@ -23,7 +23,7 @@ import type { ActivityLogger } from '../types/activity-logger.js'; import { Result, ok, err, isErr } from '../types/result.js'; -import { ErrorCode } from '../types/errors.js'; +import { ErrorCode, type PentestErrorType } from '../types/errors.js'; import { PentestError } from './error-handling.js'; import { isSpendingCapBehavior } from '../utils/billing-detection.js'; import { AGENTS } from '../session-manager.js'; @@ -56,6 +56,17 @@ export interface AgentExecutionInput { attemptNumber: number; } +interface FailAgentOpts { + attemptNumber: number; + result: ClaudePromptResult; + rollbackReason: string; + errorMessage: string; + errorCode: ErrorCode; + category: PentestErrorType; + retryable: boolean; + context: Record; +} + /** * Service for executing agents with full lifecycle management. * @@ -152,73 +163,43 @@ export class AgentExecutionService { if (result.success && (result.turns ?? 0) <= 2 && (result.cost || 0) === 0) { const resultText = result.result || ''; if (isSpendingCapBehavior(result.turns ?? 0, result.cost || 0, resultText)) { - await rollbackGitWorkspace(repoPath, 'spending cap detected', logger); - const endResult: AgentEndResult = { - attemptNumber, - duration_ms: result.duration, - cost_usd: 0, - success: false, - model: result.model, - error: `Spending cap likely reached: ${resultText.slice(0, 100)}`, - }; - await auditSession.endAgent(agentName, endResult); - return err( - new PentestError( - `Spending cap likely reached: ${resultText.slice(0, 100)}`, - 'billing', - true, // Retryable with long backoff - { agentName, turns: result.turns, cost: result.cost }, - ErrorCode.SPENDING_CAP_REACHED - ) - ); + return this.failAgent(agentName, repoPath, auditSession, logger, { + attemptNumber, result, + rollbackReason: 'spending cap detected', + errorMessage: `Spending cap likely reached: ${resultText.slice(0, 100)}`, + errorCode: ErrorCode.SPENDING_CAP_REACHED, + category: 'billing', + retryable: true, + context: { agentName, turns: result.turns, cost: result.cost }, + }); } } // 7. Handle execution failure if (!result.success) { - await rollbackGitWorkspace(repoPath, 'execution failure', logger); - const endResult: AgentEndResult = { - attemptNumber, - duration_ms: result.duration, - cost_usd: result.cost || 0, - success: false, - model: result.model, - error: result.error || 'Execution failed', - }; - await auditSession.endAgent(agentName, endResult); - return err( - new PentestError( - result.error || 'Agent execution failed', - 'validation', - result.retryable ?? true, - { agentName, originalError: result.error }, - ErrorCode.AGENT_EXECUTION_FAILED - ) - ); + return this.failAgent(agentName, repoPath, auditSession, logger, { + attemptNumber, result, + rollbackReason: 'execution failure', + errorMessage: result.error || 'Agent execution failed', + errorCode: ErrorCode.AGENT_EXECUTION_FAILED, + category: 'validation', + retryable: result.retryable ?? true, + context: { agentName, originalError: result.error }, + }); } // 8. Validate output const validationPassed = await validateAgentOutput(result, agentName, repoPath, logger); if (!validationPassed) { - await rollbackGitWorkspace(repoPath, 'validation failure', logger); - const endResult: AgentEndResult = { - attemptNumber, - duration_ms: result.duration, - cost_usd: result.cost || 0, - success: false, - model: result.model, - error: 'Output validation failed', - }; - await auditSession.endAgent(agentName, endResult); - return err( - new PentestError( - `Agent ${agentName} failed output validation`, - 'validation', - true, // Retryable - agent may succeed on retry - { agentName, deliverableFilename: AGENTS[agentName].deliverableFilename }, - ErrorCode.OUTPUT_VALIDATION_FAILED - ) - ); + return this.failAgent(agentName, repoPath, auditSession, logger, { + attemptNumber, result, + rollbackReason: 'validation failure', + errorMessage: `Agent ${agentName} failed output validation`, + errorCode: ErrorCode.OUTPUT_VALIDATION_FAILED, + category: 'validation', + retryable: true, + context: { agentName, deliverableFilename: AGENTS[agentName].deliverableFilename }, + }); } // 9. Success - commit deliverables, then capture checkpoint hash @@ -238,6 +219,36 @@ export class AgentExecutionService { return ok(endResult); } + private async failAgent( + agentName: AgentName, + repoPath: string, + auditSession: AuditSession, + logger: ActivityLogger, + opts: FailAgentOpts + ): Promise> { + await rollbackGitWorkspace(repoPath, opts.rollbackReason, logger); + + const endResult: AgentEndResult = { + attemptNumber: opts.attemptNumber, + duration_ms: opts.result.duration, + cost_usd: opts.result.cost || 0, + success: false, + model: opts.result.model, + error: opts.errorMessage, + }; + await auditSession.endAgent(agentName, endResult); + + return err( + new PentestError( + opts.errorMessage, + opts.category, + opts.retryable, + opts.context, + opts.errorCode + ) + ); + } + /** * Execute an agent, throwing PentestError on failure. * diff --git a/src/temporal/client.ts b/src/temporal/client.ts index ee7eef5..0e35a6e 100644 --- a/src/temporal/client.ts +++ b/src/temporal/client.ts @@ -26,7 +26,7 @@ * TEMPORAL_ADDRESS - Temporal server address (default: localhost:7233) */ -import { Connection, Client, WorkflowNotFoundError } from '@temporalio/client'; +import { Connection, Client, WorkflowNotFoundError, type WorkflowHandle } from '@temporalio/client'; import dotenv from 'dotenv'; import { displaySplashScreen } from '../splash-screen.js'; import { sanitizeHostname } from '../audit/utils.js'; @@ -139,47 +139,58 @@ function showUsage(): void { ); } -async function startPipeline(): Promise { - const args = process.argv.slice(2); +// === CLI Argument Parsing === - if (args.includes('--help') || args.includes('-h') || args.length === 0) { +interface CliArgs { + webUrl: string; + repoPath: string; + configPath?: string; + outputPath?: string; + displayOutputPath?: string; + pipelineTestingMode: boolean; + customWorkflowId?: string; + waitForCompletion: boolean; + resumeFromWorkspace?: string; +} + +function parseCliArgs(argv: string[]): CliArgs { + if (argv.includes('--help') || argv.includes('-h') || argv.length === 0) { showUsage(); process.exit(0); } - // Parse arguments let webUrl: string | undefined; let repoPath: string | undefined; let configPath: string | undefined; let outputPath: string | undefined; - let displayOutputPath: string | undefined; // Host path for display purposes + let displayOutputPath: string | undefined; let pipelineTestingMode = false; let customWorkflowId: string | undefined; let waitForCompletion = false; let resumeFromWorkspace: string | undefined; - for (let i = 0; i < args.length; i++) { - const arg = args[i]; + for (let i = 0; i < argv.length; i++) { + const arg = argv[i]; if (arg === '--config') { - const nextArg = args[i + 1]; + const nextArg = argv[i + 1]; if (nextArg && !nextArg.startsWith('-')) { configPath = nextArg; i++; } } else if (arg === '--output') { - const nextArg = args[i + 1]; + const nextArg = argv[i + 1]; if (nextArg && !nextArg.startsWith('-')) { outputPath = nextArg; i++; } } else if (arg === '--display-output') { - const nextArg = args[i + 1]; + const nextArg = argv[i + 1]; if (nextArg && !nextArg.startsWith('-')) { displayOutputPath = nextArg; i++; } } else if (arg === '--workflow-id') { - const nextArg = args[i + 1]; + const nextArg = argv[i + 1]; if (nextArg && !nextArg.startsWith('-')) { customWorkflowId = nextArg; i++; @@ -187,7 +198,7 @@ async function startPipeline(): Promise { } else if (arg === '--pipeline-testing') { pipelineTestingMode = true; } else if (arg === '--workspace') { - const nextArg = args[i + 1]; + const nextArg = argv[i + 1]; if (nextArg && !nextArg.startsWith('-')) { resumeFromWorkspace = nextArg; i++; @@ -209,7 +220,189 @@ async function startPipeline(): Promise { process.exit(1); } - // Display splash screen + return { + webUrl, repoPath, pipelineTestingMode, waitForCompletion, + ...(configPath && { configPath }), + ...(outputPath && { outputPath }), + ...(displayOutputPath && { displayOutputPath }), + ...(customWorkflowId && { customWorkflowId }), + ...(resumeFromWorkspace && { resumeFromWorkspace }), + }; +} + +// === Workspace Resolution === + +interface WorkspaceResolution { + workflowId: string; + sessionId: string; + isResume: boolean; + terminatedWorkflows: string[]; +} + +async function resolveWorkspace( + client: Client, + args: CliArgs +): Promise { + if (!args.resumeFromWorkspace) { + const hostname = sanitizeHostname(args.webUrl); + const workflowId = args.customWorkflowId || `${hostname}_shannon-${Date.now()}`; + return { + workflowId, + sessionId: workflowId, + isResume: false, + terminatedWorkflows: [], + }; + } + + const workspace = args.resumeFromWorkspace; + const sessionPath = path.join('./audit-logs', workspace, 'session.json'); + const workspaceExists = await fileExists(sessionPath); + + if (workspaceExists) { + console.log('=== RESUME MODE ==='); + console.log(`Workspace: ${workspace}\n`); + + const terminatedWorkflows = await terminateExistingWorkflows(client, workspace); + if (terminatedWorkflows.length > 0) { + console.log(`Terminated ${terminatedWorkflows.length} previous workflow(s)\n`); + } + + const session = await readJson(sessionPath); + if (session.session.webUrl !== args.webUrl) { + console.error('ERROR: URL mismatch with workspace'); + console.error(` Workspace URL: ${session.session.webUrl}`); + console.error(` Provided URL: ${args.webUrl}`); + process.exit(1); + } + + return { + workflowId: `${workspace}_resume_${Date.now()}`, + sessionId: workspace, + isResume: true, + terminatedWorkflows, + }; + } + + if (!isValidWorkspaceName(workspace)) { + console.error(`ERROR: Invalid workspace name: "${workspace}"`); + console.error(' Must be 1-128 characters, alphanumeric/hyphens/underscores, starting with alphanumeric'); + process.exit(1); + } + + console.log('=== NEW NAMED WORKSPACE ==='); + console.log(`Workspace: ${workspace}\n`); + + return { + workflowId: `${workspace}_shannon-${Date.now()}`, + sessionId: workspace, + isResume: false, + terminatedWorkflows: [], + }; +} + +// === Pipeline Input Construction === + +function buildPipelineInput(args: CliArgs, workspace: WorkspaceResolution): PipelineInput { + return { + webUrl: args.webUrl, + repoPath: args.repoPath, + workflowId: workspace.workflowId, + sessionId: workspace.sessionId, + ...(args.configPath && { configPath: args.configPath }), + ...(args.outputPath && { outputPath: args.outputPath }), + ...(args.pipelineTestingMode && { pipelineTestingMode: args.pipelineTestingMode }), + ...(workspace.isResume && args.resumeFromWorkspace && { resumeFromWorkspace: args.resumeFromWorkspace }), + ...(workspace.terminatedWorkflows.length > 0 && { terminatedWorkflows: workspace.terminatedWorkflows }), + }; +} + +// === Display Helpers === + +function displayWorkflowInfo(args: CliArgs, workspace: WorkspaceResolution): void { + console.log(`✓ Workflow started: ${workspace.workflowId}`); + if (workspace.isResume) { + console.log(` (Resuming workspace: ${workspace.sessionId})`); + } + console.log(); + console.log(` Target: ${args.webUrl}`); + console.log(` Repository: ${args.repoPath}`); + console.log(` Workspace: ${workspace.sessionId}`); + if (args.configPath) { + console.log(` Config: ${args.configPath}`); + } + if (args.displayOutputPath) { + console.log(` Output: ${args.displayOutputPath}`); + } + if (args.pipelineTestingMode) { + console.log(` Mode: Pipeline Testing`); + } + console.log(); +} + +function displayMonitoringInfo(args: CliArgs, workspace: WorkspaceResolution): void { + const effectiveDisplayPath = args.displayOutputPath || args.outputPath || './audit-logs'; + const outputDir = `${effectiveDisplayPath}/${workspace.sessionId}`; + + console.log('Monitor progress:'); + console.log(` Web UI: http://localhost:8233/namespaces/default/workflows/${workspace.workflowId}`); + console.log(` Logs: ./shannon logs ID=${workspace.workflowId}`); + console.log(); + console.log('Output:'); + console.log(` Reports: ${outputDir}`); + console.log(); +} + +// === Workflow Result Handling === + +async function waitForWorkflowResult( + handle: WorkflowHandle<(input: PipelineInput) => Promise>, + workspace: WorkspaceResolution +): Promise { + const progressInterval = setInterval(async () => { + try { + const progress = await handle.query(PROGRESS_QUERY); + const elapsed = Math.floor(progress.elapsedMs / 1000); + console.log( + `[${elapsed}s] Phase: ${progress.currentPhase || 'unknown'} | Agent: ${progress.currentAgent || 'none'} | Completed: ${progress.completedAgents.length}/13` + ); + } catch { + // Workflow may have completed + } + }, 30000); + + try { + const result = await handle.result(); + clearInterval(progressInterval); + + console.log('\nPipeline completed successfully!'); + if (result.summary) { + console.log(`Duration: ${Math.floor(result.summary.totalDurationMs / 1000)}s`); + console.log(`Agents completed: ${result.summary.agentCount}`); + console.log(`Total turns: ${result.summary.totalTurns}`); + console.log(`Run cost: $${result.summary.totalCostUsd.toFixed(4)}`); + + if (workspace.isResume) { + try { + const session = await readJson( + path.join('./audit-logs', workspace.sessionId, 'session.json') + ); + console.log(`Cumulative cost: $${session.metrics.total_cost_usd.toFixed(4)}`); + } catch { + // Non-fatal, skip cumulative cost display + } + } + } + } catch (error) { + clearInterval(progressInterval); + console.error('\nPipeline failed:', error); + process.exit(1); + } +} + +// === Main Entry Point === + +async function startPipeline(): Promise { + const args = parseCliArgs(process.argv.slice(2)); await displaySplashScreen(); const address = process.env.TEMPORAL_ADDRESS || 'localhost:7233'; @@ -219,155 +412,24 @@ async function startPipeline(): Promise { const client = new Client({ connection }); try { - let terminatedWorkflows: string[] = []; - let workflowId: string; - let sessionId: string; // Workspace name (persistent directory) - let isResume = false; + const workspace = await resolveWorkspace(client, args); + const input = buildPipelineInput(args, workspace); - if (resumeFromWorkspace) { - const sessionPath = path.join('./audit-logs', resumeFromWorkspace, 'session.json'); - const workspaceExists = await fileExists(sessionPath); - - if (workspaceExists) { - isResume = true; - console.log('=== RESUME MODE ==='); - console.log(`Workspace: ${resumeFromWorkspace}\n`); - - // Terminate any running workflows for this workspace - terminatedWorkflows = await terminateExistingWorkflows(client, resumeFromWorkspace); - - if (terminatedWorkflows.length > 0) { - console.log(`Terminated ${terminatedWorkflows.length} previous workflow(s)\n`); - } - - // Validate URL matches workspace - const session = await readJson(sessionPath); - - if (session.session.webUrl !== webUrl) { - console.error('ERROR: URL mismatch with workspace'); - console.error(` Workspace URL: ${session.session.webUrl}`); - console.error(` Provided URL: ${webUrl}`); - process.exit(1); - } - - // Generate resume workflow ID - workflowId = `${resumeFromWorkspace}_resume_${Date.now()}`; - sessionId = resumeFromWorkspace; - } else { - if (!isValidWorkspaceName(resumeFromWorkspace)) { - console.error(`ERROR: Invalid workspace name: "${resumeFromWorkspace}"`); - console.error(' Must be 1-128 characters, alphanumeric/hyphens/underscores, starting with alphanumeric'); - process.exit(1); - } - - console.log('=== NEW NAMED WORKSPACE ==='); - console.log(`Workspace: ${resumeFromWorkspace}\n`); - - workflowId = `${resumeFromWorkspace}_shannon-${Date.now()}`; - sessionId = resumeFromWorkspace; - } - } else { - const hostname = sanitizeHostname(webUrl); - workflowId = customWorkflowId || `${hostname}_shannon-${Date.now()}`; - sessionId = workflowId; - } - - const input: PipelineInput = { - webUrl, - repoPath, - workflowId, - sessionId, - ...(configPath && { configPath }), - ...(outputPath && { outputPath }), - ...(pipelineTestingMode && { pipelineTestingMode }), - ...(isResume && resumeFromWorkspace && { resumeFromWorkspace }), - ...(terminatedWorkflows.length > 0 && { terminatedWorkflows }), - }; - - // Use displayOutputPath (host path) if provided, otherwise fall back to outputPath or default - const effectiveDisplayPath = displayOutputPath || outputPath || './audit-logs'; - const outputDir = `${effectiveDisplayPath}/${sessionId}`; - - console.log(`✓ Workflow started: ${workflowId}`); - if (isResume) { - console.log(` (Resuming workspace: ${sessionId})`); - } - console.log(); - console.log(` Target: ${webUrl}`); - console.log(` Repository: ${repoPath}`); - console.log(` Workspace: ${sessionId}`); - if (configPath) { - console.log(` Config: ${configPath}`); - } - if (displayOutputPath) { - console.log(` Output: ${displayOutputPath}`); - } - if (pipelineTestingMode) { - console.log(` Mode: Pipeline Testing`); - } - console.log(); - - // Start workflow by name (not by importing the function) const handle = await client.workflow.start<(input: PipelineInput) => Promise>( 'pentestPipelineWorkflow', { taskQueue: 'shannon-pipeline', - workflowId, + workflowId: workspace.workflowId, args: [input], } ); - if (!waitForCompletion) { - console.log('Monitor progress:'); - console.log(` Web UI: http://localhost:8233/namespaces/default/workflows/${workflowId}`); - console.log(` Logs: ./shannon logs ID=${workflowId}`); - console.log(); - console.log('Output:'); - console.log(` Reports: ${outputDir}`); - console.log(); - return; - } + displayWorkflowInfo(args, workspace); - // Poll for progress every 30 seconds - const progressInterval = setInterval(async () => { - try { - const progress = await handle.query(PROGRESS_QUERY); - const elapsed = Math.floor(progress.elapsedMs / 1000); - console.log( - `[${elapsed}s] Phase: ${progress.currentPhase || 'unknown'} | Agent: ${progress.currentAgent || 'none'} | Completed: ${progress.completedAgents.length}/13` - ); - } catch { - // Workflow may have completed - } - }, 30000); - - try { - const result = await handle.result(); - clearInterval(progressInterval); - - console.log('\nPipeline completed successfully!'); - if (result.summary) { - console.log(`Duration: ${Math.floor(result.summary.totalDurationMs / 1000)}s`); - console.log(`Agents completed: ${result.summary.agentCount}`); - console.log(`Total turns: ${result.summary.totalTurns}`); - console.log(`Run cost: $${result.summary.totalCostUsd.toFixed(4)}`); - - // Show cumulative cost from session.json (includes all resume attempts) - if (isResume) { - try { - const session = await readJson( - path.join('./audit-logs', sessionId, 'session.json') - ); - console.log(`Cumulative cost: $${session.metrics.total_cost_usd.toFixed(4)}`); - } catch { - // Non-fatal, skip cumulative cost display - } - } - } - } catch (error) { - clearInterval(progressInterval); - console.error('\nPipeline failed:', error); - process.exit(1); + if (args.waitForCompletion) { + await waitForWorkflowResult(handle, workspace); + } else { + displayMonitoringInfo(args, workspace); } } finally { await connection.close(); diff --git a/src/temporal/workflows.ts b/src/temporal/workflows.ts index 695696f..58f8e72 100644 --- a/src/temporal/workflows.ts +++ b/src/temporal/workflows.ts @@ -186,96 +186,34 @@ export async function pentestPipelineWorkflow( return resumeState?.completedAgents.includes(agentName) ?? false; }; - try { - // === Phase 1: Pre-Reconnaissance === - if (!shouldSkip('pre-recon')) { - state.currentPhase = 'pre-recon'; - state.currentAgent = 'pre-recon'; - await a.logPhaseTransition(activityInput, 'pre-recon', 'start'); - state.agentMetrics['pre-recon'] = - await a.runPreReconAgent(activityInput); - state.completedAgents.push('pre-recon'); - await a.logPhaseTransition(activityInput, 'pre-recon', 'complete'); + // Run a sequential agent phase (pre-recon, recon) + async function runSequentialPhase( + phaseName: string, + agentName: AgentName, + runAgent: (input: ActivityInput) => Promise + ): Promise { + if (!shouldSkip(agentName)) { + state.currentPhase = phaseName; + state.currentAgent = agentName; + await a.logPhaseTransition(activityInput, phaseName, 'start'); + state.agentMetrics[agentName] = await runAgent(activityInput); + state.completedAgents.push(agentName); + await a.logPhaseTransition(activityInput, phaseName, 'complete'); } else { - log.info('Skipping pre-recon (already complete)'); - state.completedAgents.push('pre-recon'); + log.info(`Skipping ${agentName} (already complete)`); + state.completedAgents.push(agentName); } + } - // === Phase 2: Reconnaissance === - if (!shouldSkip('recon')) { - state.currentPhase = 'recon'; - state.currentAgent = 'recon'; - await a.logPhaseTransition(activityInput, 'recon', 'start'); - state.agentMetrics['recon'] = await a.runReconAgent(activityInput); - state.completedAgents.push('recon'); - await a.logPhaseTransition(activityInput, 'recon', 'complete'); - } else { - log.info('Skipping recon (already complete)'); - state.completedAgents.push('recon'); - } - - // === Phases 3-4: Vulnerability Analysis + Exploitation (Pipelined) === - // Each vuln type runs as an independent pipeline: - // vuln agent → queue check → conditional exploit agent - // This eliminates the synchronization barrier between phases - each exploit - // starts immediately when its vuln agent finishes, not waiting for all. - state.currentPhase = 'vulnerability-exploitation'; - state.currentAgent = 'pipelines'; - await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'start'); - - // Helper: Run a single vuln→exploit pipeline with skip logic - async function runVulnExploitPipeline( - vulnType: VulnType, - runVulnAgent: () => Promise, - runExploitAgent: () => Promise - ): Promise { - const vulnAgentName = `${vulnType}-vuln`; - const exploitAgentName = `${vulnType}-exploit`; - - // Step 1: Run vulnerability agent (or skip if completed) - let vulnMetrics: AgentMetrics | null = null; - if (!shouldSkip(vulnAgentName)) { - vulnMetrics = await runVulnAgent(); - } else { - log.info(`Skipping ${vulnAgentName} (already complete)`); - } - - // Step 2: Check exploitation queue (only if vuln agent ran or completed previously) - const decision = await a.checkExploitationQueue(activityInput, vulnType); - - // Step 3: Conditionally run exploit agent (skip if already completed) - let exploitMetrics: AgentMetrics | null = null; - if (decision.shouldExploit) { - if (!shouldSkip(exploitAgentName)) { - exploitMetrics = await runExploitAgent(); - } else { - log.info(`Skipping ${exploitAgentName} (already complete)`); - } - } - - return { - vulnType, - vulnMetrics, - exploitMetrics, - exploitDecision: { - shouldExploit: decision.shouldExploit, - vulnerabilityCount: decision.vulnerabilityCount, - }, - error: null, - }; - } - - // Determine which pipelines to run (skip if both vuln and exploit completed) - const pipelinesToRun: Array> = []; - - // Only run pipeline if at least one agent (vuln or exploit) is incomplete - const pipelineConfigs: Array<{ - vulnType: VulnType; - vulnAgent: string; - exploitAgent: string; - runVuln: () => Promise; - runExploit: () => Promise; - }> = [ + // Build pipeline configs for the 5 vuln→exploit pairs + function buildPipelineConfigs(): Array<{ + vulnType: VulnType; + vulnAgent: string; + exploitAgent: string; + runVuln: () => Promise; + runExploit: () => Promise; + }> { + return [ { vulnType: 'injection', vulnAgent: 'injection-vuln', @@ -312,56 +250,34 @@ export async function pentestPipelineWorkflow( runExploit: () => a.runAuthzExploitAgent(activityInput), }, ]; + } - for (const config of pipelineConfigs) { - const vulnComplete = shouldSkip(config.vulnAgent); - const exploitComplete = shouldSkip(config.exploitAgent); - - // Only run pipeline if at least one agent needs to run - if (!vulnComplete || !exploitComplete) { - pipelinesToRun.push( - runVulnExploitPipeline(config.vulnType, config.runVuln, config.runExploit) - ); - } else { - log.info( - `Skipping entire ${config.vulnType} pipeline (both agents complete)` - ); - // Still need to mark them as completed in state - state.completedAgents.push(config.vulnAgent, config.exploitAgent); - } - } - - // Run pipelines in parallel with graceful failure handling - // Promise.allSettled ensures other pipelines continue if one fails - const pipelineResults = await Promise.allSettled(pipelinesToRun); - - // Aggregate results from all pipelines + // Aggregate results from settled pipeline promises into workflow state + function aggregatePipelineResults( + results: PromiseSettledResult[] + ): void { const failedPipelines: string[] = []; - for (const result of pipelineResults) { + + for (const result of results) { if (result.status === 'fulfilled') { const { vulnType, vulnMetrics, exploitMetrics } = result.value; - // Record vuln agent const vulnAgentName = `${vulnType}-vuln`; if (vulnMetrics) { state.agentMetrics[vulnAgentName] = vulnMetrics; state.completedAgents.push(vulnAgentName); } else if (shouldSkip(vulnAgentName)) { - // Agent was skipped because already complete state.completedAgents.push(vulnAgentName); } - // Record exploit agent (if it ran) const exploitAgentName = `${vulnType}-exploit`; if (exploitMetrics) { state.agentMetrics[exploitAgentName] = exploitMetrics; state.completedAgents.push(exploitAgentName); } else if (shouldSkip(exploitAgentName)) { - // Agent was skipped because already complete state.completedAgents.push(exploitAgentName); } } else { - // Pipeline failed - log error but continue with others const errorMsg = result.reason instanceof Error ? result.reason.message @@ -370,14 +286,84 @@ export async function pentestPipelineWorkflow( } } - // Log any pipeline failures (workflow continues despite failures) if (failedPipelines.length > 0) { log.warn(`${failedPipelines.length} pipeline(s) failed`, { failures: failedPipelines, }); } + } + + try { + // === Phase 1: Pre-Reconnaissance === + await runSequentialPhase('pre-recon', 'pre-recon', a.runPreReconAgent); + + // === Phase 2: Reconnaissance === + await runSequentialPhase('recon', 'recon', a.runReconAgent); + + // === Phases 3-4: Vulnerability Analysis + Exploitation (Pipelined) === + // Each vuln type runs as an independent pipeline: + // vuln agent → queue check → conditional exploit agent + // Exploits start immediately when their vuln finishes, not waiting for all. + state.currentPhase = 'vulnerability-exploitation'; + state.currentAgent = 'pipelines'; + await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'start'); + + // Closure over shouldSkip and activityInput by design (Temporal replay safety) + async function runVulnExploitPipeline( + vulnType: VulnType, + runVulnAgent: () => Promise, + runExploitAgent: () => Promise + ): Promise { + const vulnAgentName = `${vulnType}-vuln`; + const exploitAgentName = `${vulnType}-exploit`; + + let vulnMetrics: AgentMetrics | null = null; + if (!shouldSkip(vulnAgentName)) { + vulnMetrics = await runVulnAgent(); + } else { + log.info(`Skipping ${vulnAgentName} (already complete)`); + } + + const decision = await a.checkExploitationQueue(activityInput, vulnType); + + let exploitMetrics: AgentMetrics | null = null; + if (decision.shouldExploit) { + if (!shouldSkip(exploitAgentName)) { + exploitMetrics = await runExploitAgent(); + } else { + log.info(`Skipping ${exploitAgentName} (already complete)`); + } + } + + return { + vulnType, + vulnMetrics, + exploitMetrics, + exploitDecision: { + shouldExploit: decision.shouldExploit, + vulnerabilityCount: decision.vulnerabilityCount, + }, + error: null, + }; + } + + const pipelineConfigs = buildPipelineConfigs(); + const pipelinesToRun: Array> = []; + + for (const config of pipelineConfigs) { + if (!shouldSkip(config.vulnAgent) || !shouldSkip(config.exploitAgent)) { + pipelinesToRun.push( + runVulnExploitPipeline(config.vulnType, config.runVuln, config.runExploit) + ); + } else { + log.info(`Skipping entire ${config.vulnType} pipeline (both agents complete)`); + state.completedAgents.push(config.vulnAgent, config.exploitAgent); + } + } + + const pipelineResults = await Promise.allSettled(pipelinesToRun); + aggregatePipelineResults(pipelineResults); - // Update phase markers state.currentPhase = 'exploitation'; state.currentAgent = null; await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'complete');