Merge pull request #161 from KeygraphHQ/feat/pipeline-config

feat: add configurable pipeline retry and concurrency settings
This commit is contained in:
Arjun Malleswaran
2026-02-24 10:52:52 -08:00
committed by GitHub
8 changed files with 149 additions and 13 deletions
+16 -3
View File
@@ -26,14 +26,28 @@ Then generate a PR title that:
- Matches the style of recent commits in the repository - Matches the style of recent commits in the repository
Generate a PR body with: Generate a PR body with:
- A `## Summary` section with 1-3 bullet points describing the changes - A `## Summary` section using rich bullets with bold action leads
- A `Closes #X` line for each issue number (if any were provided or detected from branch name) - A `Closes #X` line for each issue number (if any were provided or detected from branch name)
Each Summary bullet must follow this format:
- **Bold action phrase** (imperative verb: "Add X", "Replace Y", "Fix Z") — followed by em dash and a 1-2 sentence conceptual description of what changed and why
- Keep descriptions conceptual — no inline code references (no backticks for function/file names). The diff shows the code
- Use 2-5 bullets, scaling with PR size. Group related changes into single bullets rather than listing every file touched
Example:
```
## Summary
- **Add preflight validation** — validates repo path, config, and credentials before agent execution. Fails fast with actionable errors
- **Replace error strings** — pipe-delimited segments rendered as multi-line blocks with phase context, type, message, and remediation hint
- **Add error classification** — new error codes for repo, auth, and billing failures with proper retry classification
```
Finally, create the PR using the gh CLI: Finally, create the PR using the gh CLI:
``` ```
gh pr create --base main --title "<generated title>" --body "$(cat <<'EOF' gh pr create --base main --title "<generated title>" --body "$(cat <<'EOF'
## Summary ## Summary
<bullet points> <rich bullets>
Closes #<issue1> Closes #<issue1>
Closes #<issue2> Closes #<issue2>
@@ -45,6 +59,5 @@ Note: Omit the "Closes" lines entirely if no issues are associated with this PR.
IMPORTANT: IMPORTANT:
- Do NOT include any Claude Code attribution in the PR - Do NOT include any Claude Code attribution in the PR
- Keep the summary concise (1-3 bullet points maximum)
- Use the conventional commit prefix that best matches the changes (fix, feat, chore, refactor, docs, etc.) - Use the conventional commit prefix that best matches the changes (fix, feat, chore, refactor, docs, etc.)
- The `Closes #X` syntax will automatically close the referenced issues when the PR is merged - The `Closes #X` syntax will automatically close the referenced issues when the PR is merged
+12
View File
@@ -336,6 +336,18 @@ rules:
If your application uses two-factor authentication, simply add the TOTP secret to your config file. The AI will automatically generate the required codes during testing. If your application uses two-factor authentication, simply add the TOTP secret to your config file. The AI will automatically generate the required codes during testing.
#### Subscription Plan Rate Limits
Anthropic subscription plans reset usage on a **rolling 5-hour window**. The default retry strategy (30-min max backoff) will exhaust retries before the window resets. Add this to your config:
```yaml
pipeline:
retry_preset: subscription # Extends max backoff to 6h, 100 retries
max_concurrent_pipelines: 2 # Run 2 of 5 pipelines at a time (reduces burst API usage)
```
`max_concurrent_pipelines` controls how many vulnerability pipelines run simultaneously (1-5, default: 5). Lower values reduce the chance of hitting rate limits but increase wall-clock time.
### [EXPERIMENTAL - UNSUPPORTED] Router Mode (Alternative Providers) ### [EXPERIMENTAL - UNSUPPORTED] Router Mode (Alternative Providers)
Shannon can experimentally route requests through alternative AI providers using claude-code-router. This mode is not officially supported and is intended primarily for: Shannon can experimentally route requests through alternative AI providers using claude-code-router. This mode is not officially supported and is intended primarily for:
+17
View File
@@ -78,6 +78,23 @@
"required": ["login_type", "login_url", "credentials", "success_condition"], "required": ["login_type", "login_url", "credentials", "success_condition"],
"additionalProperties": false "additionalProperties": false
}, },
"pipeline": {
"type": "object",
"description": "Pipeline execution settings for retry behavior and concurrency",
"properties": {
"retry_preset": {
"type": "string",
"enum": ["default", "subscription"],
"description": "Retry preset. 'subscription' extends timeouts for Anthropic subscription rate limit windows (5h+)."
},
"max_concurrent_pipelines": {
"type": "string",
"pattern": "^[1-5]$",
"description": "Max concurrent vulnerability pipelines (1-5, default: 5)"
}
},
"additionalProperties": false
},
"rules": { "rules": {
"type": "object", "type": "object",
"description": "Testing rules that define what to focus on or avoid during penetration testing", "description": "Testing rules that define what to focus on or avoid during penetration testing",
+7 -2
View File
@@ -39,7 +39,12 @@ rules:
- description: "Prioritize beta admin panel subdomain" - description: "Prioritize beta admin panel subdomain"
type: subdomain type: subdomain
url_path: "beta-admin" url_path: "beta-admin"
- description: "Focus on user profile updates" - description: "Focus on user profile updates"
type: path type: path
url_path: "/api/v2/user-profile" url_path: "/api/v2/user-profile"
# Pipeline execution settings (optional)
# pipeline:
# retry_preset: subscription # 'default' or 'subscription' (6h max retry for rate limit recovery)
# max_concurrent_pipelines: 2 # 1-5, default: 5 (reduce to lower API usage spikes)
+30 -2
View File
@@ -32,6 +32,8 @@ import { displaySplashScreen } from '../splash-screen.js';
import { sanitizeHostname } from '../audit/utils.js'; import { sanitizeHostname } from '../audit/utils.js';
import { readJson, fileExists } from '../utils/file-io.js'; import { readJson, fileExists } from '../utils/file-io.js';
import path from 'path'; import path from 'path';
import { parseConfig } from '../config-parser.js';
import type { PipelineConfig } from '../types/config.js';
// Import types only - these don't pull in workflow runtime code // Import types only - these don't pull in workflow runtime code
import type { PipelineInput, PipelineState, PipelineProgress } from './shared.js'; import type { PipelineInput, PipelineState, PipelineProgress } from './shared.js';
@@ -306,7 +308,31 @@ async function resolveWorkspace(
// === Pipeline Input Construction === // === Pipeline Input Construction ===
function buildPipelineInput(args: CliArgs, workspace: WorkspaceResolution): PipelineInput { async function loadPipelineConfig(configPath: string | undefined): Promise<PipelineConfig> {
if (!configPath) return {};
try {
const config = await parseConfig(configPath);
const raw = config.pipeline;
if (!raw) return {};
// FAILSAFE_SCHEMA parses all YAML values as strings — coerce to number
const result: PipelineConfig = {};
if (raw.retry_preset !== undefined) {
result.retry_preset = raw.retry_preset;
}
if (raw.max_concurrent_pipelines !== undefined) {
result.max_concurrent_pipelines = Number(raw.max_concurrent_pipelines);
}
return result;
} catch {
// Config errors surface later in preflight. Don't block workflow start.
return {};
}
}
function buildPipelineInput(
args: CliArgs, workspace: WorkspaceResolution, pipelineConfig: PipelineConfig
): PipelineInput {
return { return {
webUrl: args.webUrl, webUrl: args.webUrl,
repoPath: args.repoPath, repoPath: args.repoPath,
@@ -317,6 +343,7 @@ function buildPipelineInput(args: CliArgs, workspace: WorkspaceResolution): Pipe
...(args.pipelineTestingMode && { pipelineTestingMode: args.pipelineTestingMode }), ...(args.pipelineTestingMode && { pipelineTestingMode: args.pipelineTestingMode }),
...(workspace.isResume && args.resumeFromWorkspace && { resumeFromWorkspace: args.resumeFromWorkspace }), ...(workspace.isResume && args.resumeFromWorkspace && { resumeFromWorkspace: args.resumeFromWorkspace }),
...(workspace.terminatedWorkflows.length > 0 && { terminatedWorkflows: workspace.terminatedWorkflows }), ...(workspace.terminatedWorkflows.length > 0 && { terminatedWorkflows: workspace.terminatedWorkflows }),
...(Object.keys(pipelineConfig).length > 0 && { pipelineConfig }),
}; };
} }
@@ -423,7 +450,8 @@ async function startPipeline(): Promise<void> {
try { try {
// 3. Resolve workspace (new or resume) and build pipeline input // 3. Resolve workspace (new or resume) and build pipeline input
const workspace = await resolveWorkspace(client, args); const workspace = await resolveWorkspace(client, args);
const input = buildPipelineInput(args, workspace); const pipelineConfig = await loadPipelineConfig(args.configPath);
const input = buildPipelineInput(args, workspace, pipelineConfig);
// 4. Start the Temporal workflow // 4. Start the Temporal workflow
const handle = await client.workflow.start<(input: PipelineInput) => Promise<PipelineState>>( const handle = await client.workflow.start<(input: PipelineInput) => Promise<PipelineState>>(
+2
View File
@@ -2,6 +2,7 @@ import { defineQuery } from '@temporalio/workflow';
export type { AgentMetrics } from '../types/metrics.js'; export type { AgentMetrics } from '../types/metrics.js';
import type { AgentMetrics } from '../types/metrics.js'; import type { AgentMetrics } from '../types/metrics.js';
import type { PipelineConfig } from '../types/config.js';
export interface PipelineInput { export interface PipelineInput {
webUrl: string; webUrl: string;
@@ -9,6 +10,7 @@ export interface PipelineInput {
configPath?: string; configPath?: string;
outputPath?: string; outputPath?: string;
pipelineTestingMode?: boolean; pipelineTestingMode?: boolean;
pipelineConfig?: PipelineConfig;
workflowId?: string; // Used for audit correlation workflowId?: string; // Used for audit correlation
sessionId?: string; // Workspace directory name (distinct from workflowId for named workspaces) sessionId?: string; // Workspace directory name (distinct from workflowId for named workspaces)
resumeFromWorkspace?: string; // Workspace name to resume from resumeFromWorkspace?: string; // Workspace name to resume from
+57 -6
View File
@@ -86,6 +86,22 @@ const testActs = proxyActivities<typeof activities>({
retry: TESTING_RETRY, retry: TESTING_RETRY,
}); });
// Retry configuration for subscription plans (5h+ rolling rate limit windows)
const SUBSCRIPTION_RETRY = {
initialInterval: '5 minutes',
maximumInterval: '6 hours',
backoffCoefficient: 2,
maximumAttempts: 100,
nonRetryableErrorTypes: PRODUCTION_RETRY.nonRetryableErrorTypes,
};
// Activity proxy for subscription plan recovery (extended timeouts)
const subscriptionActs = proxyActivities<typeof activities>({
startToCloseTimeout: '8 hours',
heartbeatTimeout: '2 hours',
retry: SUBSCRIPTION_RETRY,
});
// Retry configuration for preflight validation (short timeout, few retries) // Retry configuration for preflight validation (short timeout, few retries)
const PREFLIGHT_RETRY = { const PREFLIGHT_RETRY = {
initialInterval: '10 seconds', initialInterval: '10 seconds',
@@ -121,8 +137,14 @@ export async function pentestPipelineWorkflow(
): Promise<PipelineState> { ): Promise<PipelineState> {
const { workflowId } = workflowInfo(); const { workflowId } = workflowInfo();
// Pipeline testing uses fast retry intervals (10s) for quick iteration // Select activity proxy based on mode: testing (fast), subscription (extended), or default
const a = input.pipelineTestingMode ? testActs : acts; function selectActivityProxy(pipelineInput: PipelineInput) {
if (pipelineInput.pipelineTestingMode) return testActs;
if (pipelineInput.pipelineConfig?.retry_preset === 'subscription') return subscriptionActs;
return acts;
}
const a = selectActivityProxy(input);
const state: PipelineState = { const state: PipelineState = {
status: 'running', status: 'running',
@@ -313,6 +335,33 @@ export async function pentestPipelineWorkflow(
} }
} }
// Run thunks with a concurrency limit, returning PromiseSettledResult for each.
// When limit >= thunks.length (default), all launch concurrently — identical to Promise.allSettled.
// NOTE: Results are in completion order, not input order. Callers must key on value fields, not index.
async function runWithConcurrencyLimit(
thunks: Array<() => Promise<VulnExploitPipelineResult>>,
limit: number
): Promise<PromiseSettledResult<VulnExploitPipelineResult>[]> {
const results: PromiseSettledResult<VulnExploitPipelineResult>[] = [];
const inFlight = new Set<Promise<void>>();
for (const thunk of thunks) {
const slot = thunk().then(
(value) => { results.push({ status: 'fulfilled', value }); },
(reason: unknown) => { results.push({ status: 'rejected', reason }); }
).finally(() => { inFlight.delete(slot); });
inFlight.add(slot);
if (inFlight.size >= limit) {
await Promise.race(inFlight);
}
}
await Promise.allSettled(inFlight);
return results;
}
try { try {
// === Preflight Validation === // === Preflight Validation ===
// Quick sanity checks before committing to expensive agent runs. // Quick sanity checks before committing to expensive agent runs.
@@ -378,13 +427,15 @@ export async function pentestPipelineWorkflow(
}; };
} }
const maxConcurrent = input.pipelineConfig?.max_concurrent_pipelines ?? 5;
const pipelineConfigs = buildPipelineConfigs(); const pipelineConfigs = buildPipelineConfigs();
const pipelinesToRun: Array<Promise<VulnExploitPipelineResult>> = []; const pipelineThunks: Array<() => Promise<VulnExploitPipelineResult>> = [];
for (const config of pipelineConfigs) { for (const config of pipelineConfigs) {
if (!shouldSkip(config.vulnAgent) || !shouldSkip(config.exploitAgent)) { if (!shouldSkip(config.vulnAgent) || !shouldSkip(config.exploitAgent)) {
pipelinesToRun.push( pipelineThunks.push(
runVulnExploitPipeline(config.vulnType, config.runVuln, config.runExploit) () => runVulnExploitPipeline(config.vulnType, config.runVuln, config.runExploit)
); );
} else { } else {
log.info(`Skipping entire ${config.vulnType} pipeline (both agents complete)`); log.info(`Skipping entire ${config.vulnType} pipeline (both agents complete)`);
@@ -392,7 +443,7 @@ export async function pentestPipelineWorkflow(
} }
} }
const pipelineResults = await Promise.allSettled(pipelinesToRun); const pipelineResults = await runWithConcurrencyLimit(pipelineThunks, maxConcurrent);
aggregatePipelineResults(pipelineResults); aggregatePipelineResults(pipelineResults);
state.currentPhase = 'exploitation'; state.currentPhase = 'exploitation';
+8
View File
@@ -51,6 +51,14 @@ export interface Authentication {
export interface Config { export interface Config {
rules?: Rules; rules?: Rules;
authentication?: Authentication; authentication?: Authentication;
pipeline?: PipelineConfig;
}
export type RetryPreset = 'default' | 'subscription';
export interface PipelineConfig {
retry_preset?: RetryPreset;
max_concurrent_pipelines?: number;
} }
export interface DistributedConfig { export interface DistributedConfig {