feat: add configurable pipeline retry and concurrency settings (#157)
- Add `pipeline` config section with `retry_preset` and `max_concurrent_pipelines` options - Add `subscription` retry preset with extended 6h max interval for Anthropic rate limit windows - Replace Promise.allSettled with concurrency-limited runner for vuln/exploit pipelines - Wire pipeline config through client, shared types, and workflow activity proxy selection
This commit is contained in:
@@ -336,6 +336,18 @@ rules:
|
|||||||
|
|
||||||
If your application uses two-factor authentication, simply add the TOTP secret to your config file. The AI will automatically generate the required codes during testing.
|
If your application uses two-factor authentication, simply add the TOTP secret to your config file. The AI will automatically generate the required codes during testing.
|
||||||
|
|
||||||
|
#### Subscription Plan Rate Limits
|
||||||
|
|
||||||
|
Anthropic subscription plans reset usage on a **rolling 5-hour window**. The default retry strategy (30-min max backoff) will exhaust retries before the window resets. Add this to your config:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
pipeline:
|
||||||
|
retry_preset: subscription # Extends max backoff to 6h, 100 retries
|
||||||
|
max_concurrent_pipelines: 2 # Run 2 of 5 pipelines at a time (reduces burst API usage)
|
||||||
|
```
|
||||||
|
|
||||||
|
`max_concurrent_pipelines` controls how many vulnerability pipelines run simultaneously (1-5, default: 5). Lower values reduce the chance of hitting rate limits but increase wall-clock time.
|
||||||
|
|
||||||
### [EXPERIMENTAL - UNSUPPORTED] Router Mode (Alternative Providers)
|
### [EXPERIMENTAL - UNSUPPORTED] Router Mode (Alternative Providers)
|
||||||
|
|
||||||
Shannon can experimentally route requests through alternative AI providers using claude-code-router. This mode is not officially supported and is intended primarily for:
|
Shannon can experimentally route requests through alternative AI providers using claude-code-router. This mode is not officially supported and is intended primarily for:
|
||||||
|
|||||||
@@ -78,6 +78,23 @@
|
|||||||
"required": ["login_type", "login_url", "credentials", "success_condition"],
|
"required": ["login_type", "login_url", "credentials", "success_condition"],
|
||||||
"additionalProperties": false
|
"additionalProperties": false
|
||||||
},
|
},
|
||||||
|
"pipeline": {
|
||||||
|
"type": "object",
|
||||||
|
"description": "Pipeline execution settings for retry behavior and concurrency",
|
||||||
|
"properties": {
|
||||||
|
"retry_preset": {
|
||||||
|
"type": "string",
|
||||||
|
"enum": ["default", "subscription"],
|
||||||
|
"description": "Retry preset. 'subscription' extends timeouts for Anthropic subscription rate limit windows (5h+)."
|
||||||
|
},
|
||||||
|
"max_concurrent_pipelines": {
|
||||||
|
"type": "string",
|
||||||
|
"pattern": "^[1-5]$",
|
||||||
|
"description": "Max concurrent vulnerability pipelines (1-5, default: 5)"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"additionalProperties": false
|
||||||
|
},
|
||||||
"rules": {
|
"rules": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"description": "Testing rules that define what to focus on or avoid during penetration testing",
|
"description": "Testing rules that define what to focus on or avoid during penetration testing",
|
||||||
|
|||||||
@@ -43,3 +43,8 @@ rules:
|
|||||||
- description: "Focus on user profile updates"
|
- description: "Focus on user profile updates"
|
||||||
type: path
|
type: path
|
||||||
url_path: "/api/v2/user-profile"
|
url_path: "/api/v2/user-profile"
|
||||||
|
|
||||||
|
# Pipeline execution settings (optional)
|
||||||
|
# pipeline:
|
||||||
|
# retry_preset: subscription # 'default' or 'subscription' (6h max retry for rate limit recovery)
|
||||||
|
# max_concurrent_pipelines: 2 # 1-5, default: 5 (reduce to lower API usage spikes)
|
||||||
|
|||||||
+30
-2
@@ -32,6 +32,8 @@ import { displaySplashScreen } from '../splash-screen.js';
|
|||||||
import { sanitizeHostname } from '../audit/utils.js';
|
import { sanitizeHostname } from '../audit/utils.js';
|
||||||
import { readJson, fileExists } from '../utils/file-io.js';
|
import { readJson, fileExists } from '../utils/file-io.js';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
import { parseConfig } from '../config-parser.js';
|
||||||
|
import type { PipelineConfig } from '../types/config.js';
|
||||||
// Import types only - these don't pull in workflow runtime code
|
// Import types only - these don't pull in workflow runtime code
|
||||||
import type { PipelineInput, PipelineState, PipelineProgress } from './shared.js';
|
import type { PipelineInput, PipelineState, PipelineProgress } from './shared.js';
|
||||||
|
|
||||||
@@ -306,7 +308,31 @@ async function resolveWorkspace(
|
|||||||
|
|
||||||
// === Pipeline Input Construction ===
|
// === Pipeline Input Construction ===
|
||||||
|
|
||||||
function buildPipelineInput(args: CliArgs, workspace: WorkspaceResolution): PipelineInput {
|
async function loadPipelineConfig(configPath: string | undefined): Promise<PipelineConfig> {
|
||||||
|
if (!configPath) return {};
|
||||||
|
try {
|
||||||
|
const config = await parseConfig(configPath);
|
||||||
|
const raw = config.pipeline;
|
||||||
|
if (!raw) return {};
|
||||||
|
|
||||||
|
// FAILSAFE_SCHEMA parses all YAML values as strings — coerce to number
|
||||||
|
const result: PipelineConfig = {};
|
||||||
|
if (raw.retry_preset !== undefined) {
|
||||||
|
result.retry_preset = raw.retry_preset;
|
||||||
|
}
|
||||||
|
if (raw.max_concurrent_pipelines !== undefined) {
|
||||||
|
result.max_concurrent_pipelines = Number(raw.max_concurrent_pipelines);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
} catch {
|
||||||
|
// Config errors surface later in preflight. Don't block workflow start.
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildPipelineInput(
|
||||||
|
args: CliArgs, workspace: WorkspaceResolution, pipelineConfig: PipelineConfig
|
||||||
|
): PipelineInput {
|
||||||
return {
|
return {
|
||||||
webUrl: args.webUrl,
|
webUrl: args.webUrl,
|
||||||
repoPath: args.repoPath,
|
repoPath: args.repoPath,
|
||||||
@@ -317,6 +343,7 @@ function buildPipelineInput(args: CliArgs, workspace: WorkspaceResolution): Pipe
|
|||||||
...(args.pipelineTestingMode && { pipelineTestingMode: args.pipelineTestingMode }),
|
...(args.pipelineTestingMode && { pipelineTestingMode: args.pipelineTestingMode }),
|
||||||
...(workspace.isResume && args.resumeFromWorkspace && { resumeFromWorkspace: args.resumeFromWorkspace }),
|
...(workspace.isResume && args.resumeFromWorkspace && { resumeFromWorkspace: args.resumeFromWorkspace }),
|
||||||
...(workspace.terminatedWorkflows.length > 0 && { terminatedWorkflows: workspace.terminatedWorkflows }),
|
...(workspace.terminatedWorkflows.length > 0 && { terminatedWorkflows: workspace.terminatedWorkflows }),
|
||||||
|
...(Object.keys(pipelineConfig).length > 0 && { pipelineConfig }),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -423,7 +450,8 @@ async function startPipeline(): Promise<void> {
|
|||||||
try {
|
try {
|
||||||
// 3. Resolve workspace (new or resume) and build pipeline input
|
// 3. Resolve workspace (new or resume) and build pipeline input
|
||||||
const workspace = await resolveWorkspace(client, args);
|
const workspace = await resolveWorkspace(client, args);
|
||||||
const input = buildPipelineInput(args, workspace);
|
const pipelineConfig = await loadPipelineConfig(args.configPath);
|
||||||
|
const input = buildPipelineInput(args, workspace, pipelineConfig);
|
||||||
|
|
||||||
// 4. Start the Temporal workflow
|
// 4. Start the Temporal workflow
|
||||||
const handle = await client.workflow.start<(input: PipelineInput) => Promise<PipelineState>>(
|
const handle = await client.workflow.start<(input: PipelineInput) => Promise<PipelineState>>(
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import { defineQuery } from '@temporalio/workflow';
|
|||||||
|
|
||||||
export type { AgentMetrics } from '../types/metrics.js';
|
export type { AgentMetrics } from '../types/metrics.js';
|
||||||
import type { AgentMetrics } from '../types/metrics.js';
|
import type { AgentMetrics } from '../types/metrics.js';
|
||||||
|
import type { PipelineConfig } from '../types/config.js';
|
||||||
|
|
||||||
export interface PipelineInput {
|
export interface PipelineInput {
|
||||||
webUrl: string;
|
webUrl: string;
|
||||||
@@ -9,6 +10,7 @@ export interface PipelineInput {
|
|||||||
configPath?: string;
|
configPath?: string;
|
||||||
outputPath?: string;
|
outputPath?: string;
|
||||||
pipelineTestingMode?: boolean;
|
pipelineTestingMode?: boolean;
|
||||||
|
pipelineConfig?: PipelineConfig;
|
||||||
workflowId?: string; // Used for audit correlation
|
workflowId?: string; // Used for audit correlation
|
||||||
sessionId?: string; // Workspace directory name (distinct from workflowId for named workspaces)
|
sessionId?: string; // Workspace directory name (distinct from workflowId for named workspaces)
|
||||||
resumeFromWorkspace?: string; // Workspace name to resume from
|
resumeFromWorkspace?: string; // Workspace name to resume from
|
||||||
|
|||||||
@@ -86,6 +86,22 @@ const testActs = proxyActivities<typeof activities>({
|
|||||||
retry: TESTING_RETRY,
|
retry: TESTING_RETRY,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Retry configuration for subscription plans (5h+ rolling rate limit windows)
|
||||||
|
const SUBSCRIPTION_RETRY = {
|
||||||
|
initialInterval: '5 minutes',
|
||||||
|
maximumInterval: '6 hours',
|
||||||
|
backoffCoefficient: 2,
|
||||||
|
maximumAttempts: 100,
|
||||||
|
nonRetryableErrorTypes: PRODUCTION_RETRY.nonRetryableErrorTypes,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Activity proxy for subscription plan recovery (extended timeouts)
|
||||||
|
const subscriptionActs = proxyActivities<typeof activities>({
|
||||||
|
startToCloseTimeout: '8 hours',
|
||||||
|
heartbeatTimeout: '2 hours',
|
||||||
|
retry: SUBSCRIPTION_RETRY,
|
||||||
|
});
|
||||||
|
|
||||||
// Retry configuration for preflight validation (short timeout, few retries)
|
// Retry configuration for preflight validation (short timeout, few retries)
|
||||||
const PREFLIGHT_RETRY = {
|
const PREFLIGHT_RETRY = {
|
||||||
initialInterval: '10 seconds',
|
initialInterval: '10 seconds',
|
||||||
@@ -121,8 +137,14 @@ export async function pentestPipelineWorkflow(
|
|||||||
): Promise<PipelineState> {
|
): Promise<PipelineState> {
|
||||||
const { workflowId } = workflowInfo();
|
const { workflowId } = workflowInfo();
|
||||||
|
|
||||||
// Pipeline testing uses fast retry intervals (10s) for quick iteration
|
// Select activity proxy based on mode: testing (fast), subscription (extended), or default
|
||||||
const a = input.pipelineTestingMode ? testActs : acts;
|
function selectActivityProxy(pipelineInput: PipelineInput) {
|
||||||
|
if (pipelineInput.pipelineTestingMode) return testActs;
|
||||||
|
if (pipelineInput.pipelineConfig?.retry_preset === 'subscription') return subscriptionActs;
|
||||||
|
return acts;
|
||||||
|
}
|
||||||
|
|
||||||
|
const a = selectActivityProxy(input);
|
||||||
|
|
||||||
const state: PipelineState = {
|
const state: PipelineState = {
|
||||||
status: 'running',
|
status: 'running',
|
||||||
@@ -313,6 +335,33 @@ export async function pentestPipelineWorkflow(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run thunks with a concurrency limit, returning PromiseSettledResult for each.
|
||||||
|
// When limit >= thunks.length (default), all launch concurrently — identical to Promise.allSettled.
|
||||||
|
// NOTE: Results are in completion order, not input order. Callers must key on value fields, not index.
|
||||||
|
async function runWithConcurrencyLimit(
|
||||||
|
thunks: Array<() => Promise<VulnExploitPipelineResult>>,
|
||||||
|
limit: number
|
||||||
|
): Promise<PromiseSettledResult<VulnExploitPipelineResult>[]> {
|
||||||
|
const results: PromiseSettledResult<VulnExploitPipelineResult>[] = [];
|
||||||
|
const inFlight = new Set<Promise<void>>();
|
||||||
|
|
||||||
|
for (const thunk of thunks) {
|
||||||
|
const slot = thunk().then(
|
||||||
|
(value) => { results.push({ status: 'fulfilled', value }); },
|
||||||
|
(reason: unknown) => { results.push({ status: 'rejected', reason }); }
|
||||||
|
).finally(() => { inFlight.delete(slot); });
|
||||||
|
|
||||||
|
inFlight.add(slot);
|
||||||
|
|
||||||
|
if (inFlight.size >= limit) {
|
||||||
|
await Promise.race(inFlight);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await Promise.allSettled(inFlight);
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// === Preflight Validation ===
|
// === Preflight Validation ===
|
||||||
// Quick sanity checks before committing to expensive agent runs.
|
// Quick sanity checks before committing to expensive agent runs.
|
||||||
@@ -378,13 +427,15 @@ export async function pentestPipelineWorkflow(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const maxConcurrent = input.pipelineConfig?.max_concurrent_pipelines ?? 5;
|
||||||
|
|
||||||
const pipelineConfigs = buildPipelineConfigs();
|
const pipelineConfigs = buildPipelineConfigs();
|
||||||
const pipelinesToRun: Array<Promise<VulnExploitPipelineResult>> = [];
|
const pipelineThunks: Array<() => Promise<VulnExploitPipelineResult>> = [];
|
||||||
|
|
||||||
for (const config of pipelineConfigs) {
|
for (const config of pipelineConfigs) {
|
||||||
if (!shouldSkip(config.vulnAgent) || !shouldSkip(config.exploitAgent)) {
|
if (!shouldSkip(config.vulnAgent) || !shouldSkip(config.exploitAgent)) {
|
||||||
pipelinesToRun.push(
|
pipelineThunks.push(
|
||||||
runVulnExploitPipeline(config.vulnType, config.runVuln, config.runExploit)
|
() => runVulnExploitPipeline(config.vulnType, config.runVuln, config.runExploit)
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
log.info(`Skipping entire ${config.vulnType} pipeline (both agents complete)`);
|
log.info(`Skipping entire ${config.vulnType} pipeline (both agents complete)`);
|
||||||
@@ -392,7 +443,7 @@ export async function pentestPipelineWorkflow(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const pipelineResults = await Promise.allSettled(pipelinesToRun);
|
const pipelineResults = await runWithConcurrencyLimit(pipelineThunks, maxConcurrent);
|
||||||
aggregatePipelineResults(pipelineResults);
|
aggregatePipelineResults(pipelineResults);
|
||||||
|
|
||||||
state.currentPhase = 'exploitation';
|
state.currentPhase = 'exploitation';
|
||||||
|
|||||||
@@ -51,6 +51,14 @@ export interface Authentication {
|
|||||||
export interface Config {
|
export interface Config {
|
||||||
rules?: Rules;
|
rules?: Rules;
|
||||||
authentication?: Authentication;
|
authentication?: Authentication;
|
||||||
|
pipeline?: PipelineConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type RetryPreset = 'default' | 'subscription';
|
||||||
|
|
||||||
|
export interface PipelineConfig {
|
||||||
|
retry_preset?: RetryPreset;
|
||||||
|
max_concurrent_pipelines?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface DistributedConfig {
|
export interface DistributedConfig {
|
||||||
|
|||||||
Reference in New Issue
Block a user