refactor: extract services layer, Result type, and ErrorCode classification

- Add DI container (src/services/) with AgentExecutionService, ConfigLoaderService, and ExploitationCheckerService — pure domain logic with no Temporal dependencies
- Introduce Result<T, E> type and ErrorCode enum for code-based error classification in classifyErrorForTemporal, replacing scattered string matching
- Consolidate billing/spending cap detection into utils/billing-detection.ts with shared pattern lists across message-handlers, claude-executor, and error-handling
- Extract LogStream abstraction for append-only logging with backpressure, used by both AgentLogger and WorkflowLogger
- Simplify activities.ts from inline lifecycle logic to thin wrappers delegating to services, with heartbeat and error classification
- Expand config-parser with human-readable AJV errors, security validation, and rule type-specific checks
This commit is contained in:
ajmallesh
2026-02-16 16:12:21 -08:00
parent ae69478541
commit d3816a29fa
31 changed files with 1664 additions and 707 deletions
+278
View File
@@ -0,0 +1,278 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Agent Execution Service
*
* Handles the full agent lifecycle:
* - Load config via ConfigLoaderService
* - Load prompt template using AGENTS[agentName].promptTemplate
* - Create git checkpoint
* - Start audit logging
* - Invoke Claude SDK via runClaudePrompt
* - Spending cap check using isSpendingCapBehavior
* - Handle failure (rollback, audit)
* - Validate output using AGENTS[agentName].deliverableFilename
* - Commit on success, log metrics
*
* No Temporal dependencies - pure domain logic.
*/
import chalk from 'chalk';
import { Result, ok, err, isErr } from '../types/result.js';
import { ErrorCode } from '../types/errors.js';
import { PentestError } from '../error-handling.js';
import { isSpendingCapBehavior } from '../utils/billing-detection.js';
import { AGENTS } from '../session-manager.js';
import { loadPrompt } from '../prompts/prompt-manager.js';
import {
runClaudePrompt,
validateAgentOutput,
type ClaudePromptResult,
} from '../ai/claude-executor.js';
import {
createGitCheckpoint,
commitGitSuccess,
rollbackGitWorkspace,
getGitCommitHash,
} from '../utils/git-manager.js';
import { AuditSession } from '../audit/index.js';
import type { AgentEndResult } from '../types/audit.js';
import type { AgentName } from '../types/agents.js';
import type { ConfigLoaderService } from './config-loader.js';
import type { AgentMetrics } from '../types/metrics.js';
/**
* Input for agent execution.
*/
export interface AgentExecutionInput {
webUrl: string;
repoPath: string;
configPath?: string | undefined;
pipelineTestingMode?: boolean | undefined;
attemptNumber: number;
}
/**
* Service for executing agents with full lifecycle management.
*
* NOTE: AuditSession is passed per-execution, NOT stored on the service.
* This is critical for parallel agent execution - each agent needs its own
* AuditSession instance because AuditSession uses instance state (currentAgentName)
* to track which agent is currently logging.
*/
export class AgentExecutionService {
private readonly configLoader: ConfigLoaderService;
constructor(configLoader: ConfigLoaderService) {
this.configLoader = configLoader;
}
/**
* Execute an agent with full lifecycle management.
*
* @param agentName - Name of the agent to execute
* @param input - Execution input parameters
* @param auditSession - Audit session for this specific agent execution
* @returns Result containing AgentEndResult on success, PentestError on failure
*/
async execute(
agentName: AgentName,
input: AgentExecutionInput,
auditSession: AuditSession
): Promise<Result<AgentEndResult, PentestError>> {
const { webUrl, repoPath, configPath, pipelineTestingMode = false, attemptNumber } = input;
// 1. Load config (if provided)
const configResult = await this.configLoader.loadOptional(configPath);
if (isErr(configResult)) {
return configResult;
}
const distributedConfig = configResult.value;
// 2. Load prompt
const promptTemplate = AGENTS[agentName].promptTemplate;
let prompt: string;
try {
prompt = await loadPrompt(
promptTemplate,
{ webUrl, repoPath },
distributedConfig,
pipelineTestingMode
);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
return err(
new PentestError(
`Failed to load prompt for ${agentName}: ${errorMessage}`,
'prompt',
false,
{ agentName, promptTemplate, originalError: errorMessage },
ErrorCode.PROMPT_LOAD_FAILED
)
);
}
// 3. Create git checkpoint before execution
try {
await createGitCheckpoint(repoPath, agentName, attemptNumber);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
return err(
new PentestError(
`Failed to create git checkpoint for ${agentName}: ${errorMessage}`,
'filesystem',
false,
{ agentName, repoPath, originalError: errorMessage },
ErrorCode.GIT_CHECKPOINT_FAILED
)
);
}
// 4. Start audit logging
await auditSession.startAgent(agentName, prompt, attemptNumber);
// 5. Execute agent
const result: ClaudePromptResult = await runClaudePrompt(
prompt,
repoPath,
'', // context
agentName, // description
agentName,
chalk.cyan,
auditSession
);
// 6. Spending cap check - defense-in-depth
if (result.success && (result.turns ?? 0) <= 2 && (result.cost || 0) === 0) {
const resultText = result.result || '';
if (isSpendingCapBehavior(result.turns ?? 0, result.cost || 0, resultText)) {
await rollbackGitWorkspace(repoPath, 'spending cap detected');
const endResult: AgentEndResult = {
attemptNumber,
duration_ms: result.duration,
cost_usd: 0,
success: false,
model: result.model,
error: `Spending cap likely reached: ${resultText.slice(0, 100)}`,
};
await auditSession.endAgent(agentName, endResult);
return err(
new PentestError(
`Spending cap likely reached: ${resultText.slice(0, 100)}`,
'billing',
true, // Retryable with long backoff
{ agentName, turns: result.turns, cost: result.cost },
ErrorCode.SPENDING_CAP_REACHED
)
);
}
}
// 7. Handle execution failure
if (!result.success) {
await rollbackGitWorkspace(repoPath, 'execution failure');
const endResult: AgentEndResult = {
attemptNumber,
duration_ms: result.duration,
cost_usd: result.cost || 0,
success: false,
model: result.model,
error: result.error || 'Execution failed',
};
await auditSession.endAgent(agentName, endResult);
return err(
new PentestError(
result.error || 'Agent execution failed',
'validation',
result.retryable ?? true,
{ agentName, originalError: result.error },
ErrorCode.AGENT_EXECUTION_FAILED
)
);
}
// 8. Validate output
const validationPassed = await validateAgentOutput(result, agentName, repoPath);
if (!validationPassed) {
await rollbackGitWorkspace(repoPath, 'validation failure');
const endResult: AgentEndResult = {
attemptNumber,
duration_ms: result.duration,
cost_usd: result.cost || 0,
success: false,
model: result.model,
error: 'Output validation failed',
};
await auditSession.endAgent(agentName, endResult);
return err(
new PentestError(
`Agent ${agentName} failed output validation`,
'validation',
true, // Retryable - agent may succeed on retry
{ agentName, deliverableFilename: AGENTS[agentName].deliverableFilename },
ErrorCode.OUTPUT_VALIDATION_FAILED
)
);
}
// 9. Success - commit deliverables, then capture checkpoint hash
await commitGitSuccess(repoPath, agentName);
const commitHash = await getGitCommitHash(repoPath);
const endResult: AgentEndResult = {
attemptNumber,
duration_ms: result.duration,
cost_usd: result.cost || 0,
success: true,
model: result.model,
...(commitHash && { checkpoint: commitHash }),
};
await auditSession.endAgent(agentName, endResult);
return ok(endResult);
}
/**
* Execute an agent, throwing PentestError on failure.
*
* This is the preferred method for Temporal activities, which need to
* catch errors and classify them into ApplicationFailure. Avoids requiring
* activities to import Result utilities, keeping the boundary clean.
*
* @param agentName - Name of the agent to execute
* @param input - Execution input parameters
* @param auditSession - Audit session for this specific agent execution
* @returns AgentEndResult on success
* @throws PentestError on failure
*/
async executeOrThrow(
agentName: AgentName,
input: AgentExecutionInput,
auditSession: AuditSession
): Promise<AgentEndResult> {
const result = await this.execute(agentName, input, auditSession);
if (isErr(result)) {
throw result.error;
}
return result.value;
}
/**
* Convert AgentEndResult to AgentMetrics for workflow state.
*/
static toMetrics(endResult: AgentEndResult, result: ClaudePromptResult): AgentMetrics {
return {
durationMs: endResult.duration_ms,
inputTokens: null, // Not currently exposed by SDK wrapper
outputTokens: null,
costUsd: endResult.cost_usd,
numTurns: result.turns ?? null,
model: result.model,
};
}
}
+75
View File
@@ -0,0 +1,75 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Config Loader Service
*
* Wraps parseConfig + distributeConfig with Result type for explicit error handling.
* Pure service with no Temporal dependencies.
*/
import { parseConfig, distributeConfig } from '../config-parser.js';
import { PentestError } from '../error-handling.js';
import { Result, ok, err } from '../types/result.js';
import { ErrorCode } from '../types/errors.js';
import type { DistributedConfig } from '../types/config.js';
/**
* Service for loading and distributing configuration files.
*
* Provides a Result-based API for explicit error handling,
* allowing callers to decide how to handle failures.
*/
export class ConfigLoaderService {
/**
* Load and distribute a configuration file.
*
* @param configPath - Path to the YAML configuration file
* @returns Result containing DistributedConfig on success, PentestError on failure
*/
async load(configPath: string): Promise<Result<DistributedConfig, PentestError>> {
try {
const config = await parseConfig(configPath);
const distributed = distributeConfig(config);
return ok(distributed);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
// Determine appropriate error code based on error message
let errorCode = ErrorCode.CONFIG_PARSE_ERROR;
if (errorMessage.includes('not found') || errorMessage.includes('ENOENT')) {
errorCode = ErrorCode.CONFIG_NOT_FOUND;
} else if (errorMessage.includes('validation failed')) {
errorCode = ErrorCode.CONFIG_VALIDATION_FAILED;
}
return err(
new PentestError(
`Failed to load config ${configPath}: ${errorMessage}`,
'config',
false,
{ configPath, originalError: errorMessage },
errorCode
)
);
}
}
/**
* Load config if path is provided, otherwise return null config.
*
* @param configPath - Optional path to the YAML configuration file
* @returns Result containing DistributedConfig (or null) on success, PentestError on failure
*/
async loadOptional(
configPath: string | undefined
): Promise<Result<DistributedConfig | null, PentestError>> {
if (!configPath) {
return ok(null);
}
return this.load(configPath);
}
}
+117
View File
@@ -0,0 +1,117 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Dependency Injection Container
*
* Provides a per-workflow container for service instances.
* Services are wired with explicit constructor injection.
*
* Usage:
* const container = getOrCreateContainer(workflowId, sessionMetadata);
* const auditSession = new AuditSession(sessionMetadata); // Per-agent
* await auditSession.initialize(workflowId);
* const result = await container.agentExecution.executeOrThrow(agentName, input, auditSession);
*/
import type { SessionMetadata } from '../audit/utils.js';
import { AgentExecutionService } from './agent-execution.js';
import { ConfigLoaderService } from './config-loader.js';
import { ExploitationCheckerService } from './exploitation-checker.js';
/**
* Dependencies required to create a Container.
*
* NOTE: AuditSession is NOT stored in the container.
* Each agent execution receives its own AuditSession instance
* because AuditSession uses instance state (currentAgentName) that
* cannot be shared across parallel agents.
*/
export interface ContainerDependencies {
readonly sessionMetadata: SessionMetadata;
}
/**
* DI Container for a single workflow.
*
* Holds all service instances for the workflow lifecycle.
* Services are instantiated once and reused across agent executions.
*
* NOTE: AuditSession is NOT stored here - it's passed per agent execution
* to support parallel agents each having their own logging context.
*/
export class Container {
readonly sessionMetadata: SessionMetadata;
readonly agentExecution: AgentExecutionService;
readonly configLoader: ConfigLoaderService;
readonly exploitationChecker: ExploitationCheckerService;
constructor(deps: ContainerDependencies) {
this.sessionMetadata = deps.sessionMetadata;
// Wire services with explicit constructor injection
this.configLoader = new ConfigLoaderService();
this.exploitationChecker = new ExploitationCheckerService();
this.agentExecution = new AgentExecutionService(this.configLoader);
}
}
/**
* Map of workflowId to Container instance.
* Each workflow gets its own container scoped to its lifecycle.
*/
const containers = new Map<string, Container>();
/**
* Get or create a Container for a workflow.
*
* If a container already exists for the workflowId, returns it.
* Otherwise, creates a new container with the provided dependencies.
*
* @param workflowId - Unique workflow identifier
* @param sessionMetadata - Session metadata for audit paths
* @returns Container instance for the workflow
*/
export function getOrCreateContainer(
workflowId: string,
sessionMetadata: SessionMetadata
): Container {
let container = containers.get(workflowId);
if (!container) {
container = new Container({ sessionMetadata });
containers.set(workflowId, container);
}
return container;
}
/**
* Remove a Container when a workflow completes.
*
* Should be called in logWorkflowComplete to clean up resources.
*
* @param workflowId - Unique workflow identifier
*/
export function removeContainer(workflowId: string): void {
containers.delete(workflowId);
}
/**
* Get an existing Container for a workflow, if one exists.
*
* Unlike getOrCreateContainer, this does NOT create a new container.
* Returns undefined if no container exists for the workflowId.
*
* Useful for lightweight activities that can benefit from an existing
* container but don't need to create one.
*
* @param workflowId - Unique workflow identifier
* @returns Container instance or undefined
*/
export function getContainer(workflowId: string): Container | undefined {
return containers.get(workflowId);
}
+74
View File
@@ -0,0 +1,74 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Exploitation Checker Service
*
* Pure domain logic for determining whether exploitation should run.
* Reads queue file, parses JSON, returns decision.
*
* No Temporal dependencies - this is pure business logic.
*/
import chalk from 'chalk';
import {
validateQueueSafe,
type VulnType,
type ExploitationDecision,
} from '../queue-validation.js';
import { isOk } from '../types/result.js';
/**
* Service for checking exploitation queue decisions.
*
* Determines whether an exploit agent should run based on
* the vulnerability analysis deliverables and queue files.
*/
export class ExploitationCheckerService {
/**
* Check if exploitation should run for a given vulnerability type.
*
* Reads the vulnerability queue file and returns the decision.
* This is pure domain logic - reads queue file, parses JSON, returns decision.
*
* @param vulnType - Type of vulnerability (injection, xss, auth, ssrf, authz)
* @param repoPath - Path to the repository containing deliverables
* @returns ExploitationDecision indicating whether to exploit
* @throws PentestError if validation fails and is retryable
*/
async checkQueue(vulnType: VulnType, repoPath: string): Promise<ExploitationDecision> {
const result = await validateQueueSafe(vulnType, repoPath);
if (isOk(result)) {
const decision = result.value;
console.log(
chalk.blue(
` ${vulnType}: ${decision.shouldExploit ? `${decision.vulnerabilityCount} vulnerabilities found` : 'no vulnerabilities, skipping exploitation'}`
)
);
return decision;
}
// Validation failed - check if we should retry or skip
const error = result.error;
if (error.retryable) {
// Re-throw retryable errors so caller can handle retry
console.log(chalk.yellow(` ${vulnType}: ${error.message} (retryable)`));
throw error;
}
// Non-retryable error - skip exploitation gracefully
console.log(
chalk.yellow(` ${vulnType}: ${error.message}, skipping exploitation`)
);
return {
shouldExploit: false,
shouldRetry: false,
vulnerabilityCount: 0,
vulnType,
};
}
}
+20
View File
@@ -0,0 +1,20 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Services Module
*
* Exports DI container and service classes for Shannon agent execution.
* Services are pure domain logic with no Temporal dependencies.
*/
export { Container, getOrCreateContainer, removeContainer } from './container.js';
export type { ContainerDependencies } from './container.js';
export { ConfigLoaderService } from './config-loader.js';
export { ExploitationCheckerService } from './exploitation-checker.js';
export { AgentExecutionService } from './agent-execution.js';
export type { AgentExecutionInput } from './agent-execution.js';